𝕏 f B! L
案件・求人数 12,345
案件を探す(準備中) エージェントを探す(準備中) お役立ち情報 ログイン
案件・求人数 12,345
Google Cloud Dataproc × Spark × BigQuery連携ガイド|大規模データ処理パイプライン構築の実践手法

Google Cloud Dataproc × Spark × BigQuery連携ガイド|大規模データ処理パイプライン構築の実践手法

Google CloudDataprocApache SparkBigQueryデータエンジニアリング
目次

大規模データの処理・分析において、Apache Sparkは事実上の業界標準です。Google Cloud Dataproc は、Sparkクラスタのプロビジョニングを数分で完了し、BigQueryとのシームレスな連携を提供するマネージドサービスです。SES案件でもデータエンジニアリング領域では、Dataproc + Spark + BigQueryのスキルセットが高い需要を持ちます。

本記事では、Google Cloud Dataprocを使ったSparkベースの大規模データ処理パイプラインの構築方法を解説します。クラスタ設計、Spark SQLによるデータ変換、BigQuery連携、Dataproc Serverless、コスト最適化まで、実践で必要な知識を体系的にお伝えします。

Dataproc × Spark × BigQueryアーキテクチャ

Google Cloud Dataprocとは

Dataprocの基本

Google Cloud Dataprocは、Apache Spark、Apache Hadoop、Apache Flink等のオープンソースデータ処理フレームワークを実行するためのマネージドクラスタサービスです。

Dataprocの特徴:

  • 高速プロビジョニング: クラスタの起動が90秒以内
  • 自動スケーリング: ワークロードに応じてWorkerノードを自動増減
  • BigQuery統合: SparkからBigQueryテーブルを直接読み書き
  • GCSネイティブ: HDFS代わりにGoogle Cloud Storageを使用
  • Serverlessモード: クラスタ管理不要のサーバーレス実行
  • コスト最適化: 秒単位課金、プリエンプティブVMサポート

Dataproc vs 他サービス比較

項目DataprocDataflowBigQuery
エンジンSpark/HadoopApache BeamBigQuery Engine
ユースケースバッチ/MLストリーム/バッチ分析クエリ
スケーリング手動/自動完全自動完全自動
言語Python/Scala/Java/RJava/PythonSQL
学習コストSpark知識必要Beam API学習SQL知識で十分
コスト効率大規模バッチに最適ストリームに最適アドホッククエリに最適
カスタマイズ高い中程度低い

Dataprocクラスタの設計と構築

Terraformによるクラスタ構築

# Dataprocクラスタ
resource "google_dataproc_cluster" "spark_cluster" {
  name    = "spark-etl-cluster"
  region  = "asia-northeast1"
  project = var.project_id

  cluster_config {
    staging_bucket = google_storage_bucket.dataproc_staging.name

    # マスターノード
    master_config {
      num_instances = 1
      machine_type  = "n2-standard-4"
      
      disk_config {
        boot_disk_type    = "pd-ssd"
        boot_disk_size_gb = 100
      }
    }

    # ワーカーノード
    worker_config {
      num_instances = 4
      machine_type  = "n2-standard-8"
      
      disk_config {
        boot_disk_type    = "pd-ssd"
        boot_disk_size_gb = 200
        num_local_ssds    = 1
      }
    }

    # プリエンプティブワーカー(コスト削減)
    preemptible_worker_config {
      num_instances = 2
    }

    # 自動スケーリング
    autoscaling_config {
      policy_uri = google_dataproc_autoscaling_policy.spark.name
    }

    # ソフトウェア設定
    software_config {
      image_version = "2.2-debian12"
      
      override_properties = {
        "spark:spark.sql.adaptive.enabled"              = "true"
        "spark:spark.sql.adaptive.coalescePartitions.enabled" = "true"
        "spark:spark.dynamicAllocation.enabled"          = "true"
        "spark:spark.dynamicAllocation.minExecutors"     = "2"
        "spark:spark.dynamicAllocation.maxExecutors"     = "20"
        "spark:spark.serializer"                         = "org.apache.spark.serializer.KryoSerializer"
        "spark:spark.sql.sources.partitionOverwriteMode" = "dynamic"
      }
      
      optional_components = [
        "JUPYTER",
        "DOCKER",
      ]
    }

    # GCSコネクタ設定
    gce_cluster_config {
      service_account = google_service_account.dataproc.email
      
      service_account_scopes = [
        "https://www.googleapis.com/auth/cloud-platform",
      ]
      
      subnetwork = var.subnetwork
      
      internal_ip_only = true
      
      metadata = {
        "bigquery-connector-version" = "0.36.0"
        "spark-bigquery-connector-version" = "0.36.0"
      }
    }

    # 初期化スクリプト
    initialization_action {
      script      = "gs://${google_storage_bucket.dataproc_staging.name}/init/install-deps.sh"
      timeout_sec = 300
    }
  }

  labels = {
    environment = "production"
    team        = "data-engineering"
  }
}

# 自動スケーリングポリシー
resource "google_dataproc_autoscaling_policy" "spark" {
  policy_id = "spark-autoscaling"
  location  = "asia-northeast1"

  basic_algorithm {
    yarn_config {
      scale_up_factor              = 1.0
      scale_down_factor            = 1.0
      scale_up_min_worker_fraction = 0.0
      graceful_decommission_timeout = "3600s"
    }

    cooldown_period = "120s"
  }

  worker_config {
    min_instances = 2
    max_instances = 20
    weight        = 1
  }

  secondary_worker_config {
    min_instances = 0
    max_instances = 10
    weight        = 1
  }
}

# サービスアカウント
resource "google_service_account" "dataproc" {
  account_id   = "dataproc-spark"
  display_name = "Dataproc Spark Service Account"
}

# BigQuery書き込み権限
resource "google_project_iam_member" "dataproc_bq" {
  project = var.project_id
  role    = "roles/bigquery.dataEditor"
  member  = "serviceAccount:${google_service_account.dataproc.email}"
}

# GCS読み書き権限
resource "google_project_iam_member" "dataproc_gcs" {
  project = var.project_id
  role    = "roles/storage.objectAdmin"
  member  = "serviceAccount:${google_service_account.dataproc.email}"
}

PySpark × BigQuery連携の実装

ETLジョブの実装

"""
PySpark ETLジョブ: GCS → Spark変換 → BigQuery
売上データの日次集計パイプライン
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, 
    DoubleType, TimestampType, IntegerType
)
import sys
from datetime import datetime

def create_spark_session():
    """BigQueryコネクタ付きSparkSession作成"""
    return SparkSession.builder \
        .appName("daily-sales-etl") \
        .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("temporaryGcsBucket", "dataproc-temp-bucket") \
        .getOrCreate()

def read_raw_data(spark, date_str):
    """GCSからCSVデータを読み込む"""
    schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("customer_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("quantity", IntegerType(), False),
        StructField("unit_price", DoubleType(), False),
        StructField("discount_rate", DoubleType(), True),
        StructField("order_date", TimestampType(), False),
        StructField("region", StringType(), True),
    ])
    
    input_path = f"gs://data-lake-raw/sales/{date_str}/*.csv"
    
    df = spark.read \
        .option("header", "true") \
        .option("encoding", "UTF-8") \
        .schema(schema) \
        .csv(input_path)
    
    print(f"📥 読み込み完了: {df.count()} 行")
    return df

def transform_data(df):
    """データ変換・集計"""
    # 基本的なクレンジング
    cleaned = df \
        .filter(F.col("order_id").isNotNull()) \
        .filter(F.col("quantity") > 0) \
        .filter(F.col("unit_price") > 0) \
        .withColumn("discount_rate", 
            F.coalesce(F.col("discount_rate"), F.lit(0.0))) \
        .withColumn("region", 
            F.coalesce(F.col("region"), F.lit("unknown")))
    
    # 売上金額の計算
    with_amount = cleaned.withColumn(
        "total_amount",
        F.col("quantity") * F.col("unit_price") * (1 - F.col("discount_rate"))
    ).withColumn(
        "discount_amount",
        F.col("quantity") * F.col("unit_price") * F.col("discount_rate")
    )
    
    # 日次集計
    daily_summary = with_amount.groupBy(
        F.date_format("order_date", "yyyy-MM-dd").alias("date"),
        "category",
        "region"
    ).agg(
        F.count("order_id").alias("order_count"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.sum("total_amount").alias("total_revenue"),
        F.sum("discount_amount").alias("total_discount"),
        F.avg("total_amount").alias("avg_order_value"),
        F.sum("quantity").alias("total_quantity"),
    ).withColumn(
        "processed_at", F.current_timestamp()
    )
    
    print(f"📊 集計完了: {daily_summary.count()} グループ")
    return with_amount, daily_summary

def write_to_bigquery(df, table_name, write_mode="overwrite"):
    """BigQueryにデータを書き込む"""
    df.write \
        .format("bigquery") \
        .option("table", f"analytics_dataset.{table_name}") \
        .option("temporaryGcsBucket", "dataproc-temp-bucket") \
        .option("partitionField", "date") \
        .option("partitionType", "DAY") \
        .option("clusteredFields", "category,region") \
        .mode(write_mode) \
        .save()
    
    print(f"✅ BigQuery書き込み完了: {table_name}")

def write_to_gcs_parquet(df, output_path):
    """GCSにParquet形式で保存"""
    df.write \
        .mode("overwrite") \
        .partitionBy("category") \
        .parquet(output_path)
    
    print(f"✅ GCS書き込み完了: {output_path}")

def data_quality_check(spark, date_str):
    """BigQuery上でのデータ品質チェック"""
    result = spark.read \
        .format("bigquery") \
        .option("table", "analytics_dataset.daily_sales_summary") \
        .option("filter", f"date = '{date_str}'") \
        .load()
    
    count = result.count()
    total_revenue = result.agg(F.sum("total_revenue")).collect()[0][0]
    
    print(f"🔍 品質チェック:")
    print(f"   レコード数: {count}")
    print(f"   合計売上: ¥{total_revenue:,.0f}")
    
    # 異常検知
    if count == 0:
        raise ValueError("❌ データが0件です")
    if total_revenue and total_revenue < 0:
        raise ValueError("❌ 合計売上が負の値です")
    
    print("✅ 品質チェック通過")

def main():
    """メイン処理"""
    date_str = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime("%Y-%m-%d")
    
    print(f"=== 日次売上ETL: {date_str} ===")
    
    spark = create_spark_session()
    
    try:
        # 1. データ読み込み
        raw_df = read_raw_data(spark, date_str)
        
        # 2. 変換・集計
        detail_df, summary_df = transform_data(raw_df)
        
        # 3. GCSに詳細データ保存
        write_to_gcs_parquet(
            detail_df,
            f"gs://data-lake-processed/sales/{date_str}"
        )
        
        # 4. BigQueryに集計データ保存
        write_to_bigquery(summary_df, "daily_sales_summary", "append")
        
        # 5. データ品質チェック
        data_quality_check(spark, date_str)
        
        print(f"=== ETL完了: {date_str} ===")
        
    except Exception as e:
        print(f"❌ ETLエラー: {e}")
        raise
    finally:
        spark.stop()

if __name__ == "__main__":
    main()

ジョブの実行

# Dataprocクラスタでジョブを実行
gcloud dataproc jobs submit pyspark \
    gs://dataproc-scripts/etl/daily_sales_etl.py \
    --cluster=spark-etl-cluster \
    --region=asia-northeast1 \
    --properties="spark.dynamicAllocation.enabled=true,spark.sql.adaptive.enabled=true" \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar \
    -- 2026-04-11

Dataproc Serverless:クラスタ管理不要のSparkジョブ実行

Serverlessバッチの実行

# Dataproc Serverlessでジョブ実行(クラスタ不要)
gcloud dataproc batches submit pyspark \
    gs://dataproc-scripts/etl/daily_sales_etl.py \
    --region=asia-northeast1 \
    --deps-bucket=gs://dataproc-deps \
    --properties="spark.sql.adaptive.enabled=true" \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar \
    --subnet=projects/my-project/regions/asia-northeast1/subnetworks/default \
    --service-account=dataproc-spark@my-project.iam.gserviceaccount.com \
    --batch=daily-sales-$(date +%Y%m%d) \
    -- 2026-04-11

Dataprocクラスタ vs Serverless比較

項目DataprocクラスタDataproc Serverless
クラスタ管理必要不要
起動時間90秒〜30秒〜
課金クラスタ稼働中は課金ジョブ実行時間のみ
カスタマイズ高い(初期化スクリプト)制限あり
常時起動可能不可
適用シナリオ長時間ジョブ、インタラクティブバッチジョブ、アドホック

パフォーマンス最適化

Sparkジョブの最適化テクニック

# 1. パーティション最適化
df = spark.read.parquet("gs://data-lake/large-dataset/")

# 適切なパーティション数に調整
df = df.repartition(200)  # 大規模データ
# or
df = df.coalesce(50)  # パーティション数を減らす(シャッフルなし)

# 2. ブロードキャストジョイン(小テーブルのジョイン最適化)
from pyspark.sql.functions import broadcast

small_df = spark.read.format("bigquery").option("table", "master.regions").load()
large_df = spark.read.parquet("gs://data-lake/sales/")

result = large_df.join(broadcast(small_df), "region_id")

# 3. キャッシュの活用
frequently_used = df.filter(F.col("status") == "active").cache()
frequently_used.count()  # キャッシュをトリガー

# 4. データスキューの対応
from pyspark.sql.functions import when, rand

# スキューしたキーにソルトを追加
salted_df = df.withColumn(
    "salted_key",
    F.concat(F.col("skewed_key"), F.lit("_"), (F.rand() * 10).cast("int"))
)

# 5. 述語プッシュダウン(BigQuery)
df = spark.read \
    .format("bigquery") \
    .option("table", "analytics.sales_fact") \
    .option("filter", "date >= '2026-01-01'") \
    .load()

コスト最適化戦略

  1. プリエンプティブVM活用: Workerの30-50%をプリエンプティブVMにして、最大80%のコスト削減
  2. 自動スケーリング: ワークロードに応じたWorker数の動的調整
  3. Serverless活用: 不定期のバッチジョブはServerlessで実行してアイドルコストを削減
  4. データ局所性: GCSバケットとDataprocクラスタを同じリージョンに配置
  5. 圧縮フォーマット: ParquetやORC等の列指向フォーマットを使用してI/Oを削減

Cloud Composerとの統合:ワークフローオーケストレーション

"""
Cloud Composer (Airflow) からDataprocジョブを実行するDAG
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocSubmitPySparkJobOperator,
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
)
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='dataproc_daily_etl',
    default_args=default_args,
    schedule_interval='0 3 * * *',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['dataproc', 'spark', 'etl'],
) as dag:

    # 1. エフェメラルクラスタの作成
    create_cluster = DataprocCreateClusterOperator(
        task_id='create_cluster',
        project_id='my-project',
        cluster_name='ephemeral-etl-{{ ds_nodash }}',
        region='asia-northeast1',
        cluster_config={
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n2-standard-4',
            },
            'worker_config': {
                'num_instances': 4,
                'machine_type_uri': 'n2-standard-8',
            },
            'software_config': {
                'image_version': '2.2',
            },
        },
    )

    # 2. Sparkジョブ実行
    submit_spark_job = DataprocSubmitPySparkJobOperator(
        task_id='submit_etl_job',
        main='gs://dataproc-scripts/etl/daily_sales_etl.py',
        cluster_name='ephemeral-etl-{{ ds_nodash }}',
        region='asia-northeast1',
        arguments=['{{ ds }}'],
        dataproc_jars=[
            'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar',
        ],
    )

    # 3. クラスタ削除
    delete_cluster = DataprocDeleteClusterOperator(
        task_id='delete_cluster',
        project_id='my-project',
        cluster_name='ephemeral-etl-{{ ds_nodash }}',
        region='asia-northeast1',
        trigger_rule='all_done',  # 成功・失敗に関わらず削除
    )

    create_cluster >> submit_spark_job >> delete_cluster

SES案件でのDataproc活用

よくある案件パターン

  1. データレイク構築: GCS + Dataproc + BigQueryでエンタープライズデータレイクを構築
  2. ETL移行: オンプレミスのHadoopクラスタからDataprocへの移行
  3. ML前処理: 大規模データの特徴量エンジニアリングをSparkで実行
  4. ログ分析: アクセスログ・アプリケーションログの大規模バッチ分析

SES市場での単価傾向(2026年)

  • データエンジニア(Spark + GCP): 月額75〜100万円
  • データプラットフォーム(Dataproc + BigQuery + Composer): 月額85〜110万円
  • Hadoop→GCP移行: 月額80〜105万円
  • MLエンジニア(Spark ML + Vertex AI): 月額85〜115万円

まとめ:Dataproc × Spark × BigQueryでデータ処理を高速化する

Google Cloud Dataprocを使った大規模データ処理パイプラインのポイントをまとめます。

  1. マネージドSpark環境: 90秒でクラスタを起動し、インフラ管理から解放される
  2. BigQueryシームレス連携: SparkからBigQueryの読み書きが数行のコードで完了
  3. Serverless活用: 不定期のバッチジョブはServerlessで実行しコストを最適化
  4. パフォーマンス最適化: AQE・ブロードキャストジョイン・パーティション戦略で高速化
  5. Cloud Composer統合: Airflowでワークフローをオーケストレーションし、エフェメラルクラスタで効率運用

データエンジニアリングスキルはSES市場で最も成長率が高い分野です。Dataproc + Sparkをマスターして、高単価案件を獲得しましょう。

関連記事

SES案件をお探しですか?

SES記事をもっと読む →
🏗️

SES BASE 編集長

SES業界歴10年以上のメンバーが在籍する編集チーム。SES企業での営業・エンジニア経験、フリーランス独立経験を持つメンバーが、業界のリアルな情報をお届けします。

📊 業界データに基づく記事制作 🔍 IPA・経済産業省データ参照 💼 SES実務経験者が執筆・監修