大規模データの処理・分析において、Apache Sparkは事実上の業界標準です。Google Cloud Dataproc は、Sparkクラスタのプロビジョニングを数分で完了し、BigQueryとのシームレスな連携を提供するマネージドサービスです。SES案件でもデータエンジニアリング領域では、Dataproc + Spark + BigQueryのスキルセットが高い需要を持ちます。
本記事では、Google Cloud Dataprocを使ったSparkベースの大規模データ処理パイプラインの構築方法を解説します。クラスタ設計、Spark SQLによるデータ変換、BigQuery連携、Dataproc Serverless、コスト最適化まで、実践で必要な知識を体系的にお伝えします。

Google Cloud Dataprocとは
Dataprocの基本
Google Cloud Dataprocは、Apache Spark、Apache Hadoop、Apache Flink等のオープンソースデータ処理フレームワークを実行するためのマネージドクラスタサービスです。
Dataprocの特徴:
- 高速プロビジョニング: クラスタの起動が90秒以内
- 自動スケーリング: ワークロードに応じてWorkerノードを自動増減
- BigQuery統合: SparkからBigQueryテーブルを直接読み書き
- GCSネイティブ: HDFS代わりにGoogle Cloud Storageを使用
- Serverlessモード: クラスタ管理不要のサーバーレス実行
- コスト最適化: 秒単位課金、プリエンプティブVMサポート
Dataproc vs 他サービス比較
| 項目 | Dataproc | Dataflow | BigQuery |
|---|---|---|---|
| エンジン | Spark/Hadoop | Apache Beam | BigQuery Engine |
| ユースケース | バッチ/ML | ストリーム/バッチ | 分析クエリ |
| スケーリング | 手動/自動 | 完全自動 | 完全自動 |
| 言語 | Python/Scala/Java/R | Java/Python | SQL |
| 学習コスト | Spark知識必要 | Beam API学習 | SQL知識で十分 |
| コスト効率 | 大規模バッチに最適 | ストリームに最適 | アドホッククエリに最適 |
| カスタマイズ | 高い | 中程度 | 低い |
Dataprocクラスタの設計と構築
Terraformによるクラスタ構築
# Dataprocクラスタ
resource "google_dataproc_cluster" "spark_cluster" {
name = "spark-etl-cluster"
region = "asia-northeast1"
project = var.project_id
cluster_config {
staging_bucket = google_storage_bucket.dataproc_staging.name
# マスターノード
master_config {
num_instances = 1
machine_type = "n2-standard-4"
disk_config {
boot_disk_type = "pd-ssd"
boot_disk_size_gb = 100
}
}
# ワーカーノード
worker_config {
num_instances = 4
machine_type = "n2-standard-8"
disk_config {
boot_disk_type = "pd-ssd"
boot_disk_size_gb = 200
num_local_ssds = 1
}
}
# プリエンプティブワーカー(コスト削減)
preemptible_worker_config {
num_instances = 2
}
# 自動スケーリング
autoscaling_config {
policy_uri = google_dataproc_autoscaling_policy.spark.name
}
# ソフトウェア設定
software_config {
image_version = "2.2-debian12"
override_properties = {
"spark:spark.sql.adaptive.enabled" = "true"
"spark:spark.sql.adaptive.coalescePartitions.enabled" = "true"
"spark:spark.dynamicAllocation.enabled" = "true"
"spark:spark.dynamicAllocation.minExecutors" = "2"
"spark:spark.dynamicAllocation.maxExecutors" = "20"
"spark:spark.serializer" = "org.apache.spark.serializer.KryoSerializer"
"spark:spark.sql.sources.partitionOverwriteMode" = "dynamic"
}
optional_components = [
"JUPYTER",
"DOCKER",
]
}
# GCSコネクタ設定
gce_cluster_config {
service_account = google_service_account.dataproc.email
service_account_scopes = [
"https://www.googleapis.com/auth/cloud-platform",
]
subnetwork = var.subnetwork
internal_ip_only = true
metadata = {
"bigquery-connector-version" = "0.36.0"
"spark-bigquery-connector-version" = "0.36.0"
}
}
# 初期化スクリプト
initialization_action {
script = "gs://${google_storage_bucket.dataproc_staging.name}/init/install-deps.sh"
timeout_sec = 300
}
}
labels = {
environment = "production"
team = "data-engineering"
}
}
# 自動スケーリングポリシー
resource "google_dataproc_autoscaling_policy" "spark" {
policy_id = "spark-autoscaling"
location = "asia-northeast1"
basic_algorithm {
yarn_config {
scale_up_factor = 1.0
scale_down_factor = 1.0
scale_up_min_worker_fraction = 0.0
graceful_decommission_timeout = "3600s"
}
cooldown_period = "120s"
}
worker_config {
min_instances = 2
max_instances = 20
weight = 1
}
secondary_worker_config {
min_instances = 0
max_instances = 10
weight = 1
}
}
# サービスアカウント
resource "google_service_account" "dataproc" {
account_id = "dataproc-spark"
display_name = "Dataproc Spark Service Account"
}
# BigQuery書き込み権限
resource "google_project_iam_member" "dataproc_bq" {
project = var.project_id
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.dataproc.email}"
}
# GCS読み書き権限
resource "google_project_iam_member" "dataproc_gcs" {
project = var.project_id
role = "roles/storage.objectAdmin"
member = "serviceAccount:${google_service_account.dataproc.email}"
}
PySpark × BigQuery連携の実装
ETLジョブの実装
"""
PySpark ETLジョブ: GCS → Spark変換 → BigQuery
売上データの日次集計パイプライン
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField, StringType,
DoubleType, TimestampType, IntegerType
)
import sys
from datetime import datetime
def create_spark_session():
"""BigQueryコネクタ付きSparkSession作成"""
return SparkSession.builder \
.appName("daily-sales-etl") \
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar") \
.config("spark.sql.adaptive.enabled", "true") \
.config("temporaryGcsBucket", "dataproc-temp-bucket") \
.getOrCreate()
def read_raw_data(spark, date_str):
"""GCSからCSVデータを読み込む"""
schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("product_name", StringType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), False),
StructField("unit_price", DoubleType(), False),
StructField("discount_rate", DoubleType(), True),
StructField("order_date", TimestampType(), False),
StructField("region", StringType(), True),
])
input_path = f"gs://data-lake-raw/sales/{date_str}/*.csv"
df = spark.read \
.option("header", "true") \
.option("encoding", "UTF-8") \
.schema(schema) \
.csv(input_path)
print(f"📥 読み込み完了: {df.count()} 行")
return df
def transform_data(df):
"""データ変換・集計"""
# 基本的なクレンジング
cleaned = df \
.filter(F.col("order_id").isNotNull()) \
.filter(F.col("quantity") > 0) \
.filter(F.col("unit_price") > 0) \
.withColumn("discount_rate",
F.coalesce(F.col("discount_rate"), F.lit(0.0))) \
.withColumn("region",
F.coalesce(F.col("region"), F.lit("unknown")))
# 売上金額の計算
with_amount = cleaned.withColumn(
"total_amount",
F.col("quantity") * F.col("unit_price") * (1 - F.col("discount_rate"))
).withColumn(
"discount_amount",
F.col("quantity") * F.col("unit_price") * F.col("discount_rate")
)
# 日次集計
daily_summary = with_amount.groupBy(
F.date_format("order_date", "yyyy-MM-dd").alias("date"),
"category",
"region"
).agg(
F.count("order_id").alias("order_count"),
F.countDistinct("customer_id").alias("unique_customers"),
F.sum("total_amount").alias("total_revenue"),
F.sum("discount_amount").alias("total_discount"),
F.avg("total_amount").alias("avg_order_value"),
F.sum("quantity").alias("total_quantity"),
).withColumn(
"processed_at", F.current_timestamp()
)
print(f"📊 集計完了: {daily_summary.count()} グループ")
return with_amount, daily_summary
def write_to_bigquery(df, table_name, write_mode="overwrite"):
"""BigQueryにデータを書き込む"""
df.write \
.format("bigquery") \
.option("table", f"analytics_dataset.{table_name}") \
.option("temporaryGcsBucket", "dataproc-temp-bucket") \
.option("partitionField", "date") \
.option("partitionType", "DAY") \
.option("clusteredFields", "category,region") \
.mode(write_mode) \
.save()
print(f"✅ BigQuery書き込み完了: {table_name}")
def write_to_gcs_parquet(df, output_path):
"""GCSにParquet形式で保存"""
df.write \
.mode("overwrite") \
.partitionBy("category") \
.parquet(output_path)
print(f"✅ GCS書き込み完了: {output_path}")
def data_quality_check(spark, date_str):
"""BigQuery上でのデータ品質チェック"""
result = spark.read \
.format("bigquery") \
.option("table", "analytics_dataset.daily_sales_summary") \
.option("filter", f"date = '{date_str}'") \
.load()
count = result.count()
total_revenue = result.agg(F.sum("total_revenue")).collect()[0][0]
print(f"🔍 品質チェック:")
print(f" レコード数: {count}")
print(f" 合計売上: ¥{total_revenue:,.0f}")
# 異常検知
if count == 0:
raise ValueError("❌ データが0件です")
if total_revenue and total_revenue < 0:
raise ValueError("❌ 合計売上が負の値です")
print("✅ 品質チェック通過")
def main():
"""メイン処理"""
date_str = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime("%Y-%m-%d")
print(f"=== 日次売上ETL: {date_str} ===")
spark = create_spark_session()
try:
# 1. データ読み込み
raw_df = read_raw_data(spark, date_str)
# 2. 変換・集計
detail_df, summary_df = transform_data(raw_df)
# 3. GCSに詳細データ保存
write_to_gcs_parquet(
detail_df,
f"gs://data-lake-processed/sales/{date_str}"
)
# 4. BigQueryに集計データ保存
write_to_bigquery(summary_df, "daily_sales_summary", "append")
# 5. データ品質チェック
data_quality_check(spark, date_str)
print(f"=== ETL完了: {date_str} ===")
except Exception as e:
print(f"❌ ETLエラー: {e}")
raise
finally:
spark.stop()
if __name__ == "__main__":
main()
ジョブの実行
# Dataprocクラスタでジョブを実行
gcloud dataproc jobs submit pyspark \
gs://dataproc-scripts/etl/daily_sales_etl.py \
--cluster=spark-etl-cluster \
--region=asia-northeast1 \
--properties="spark.dynamicAllocation.enabled=true,spark.sql.adaptive.enabled=true" \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar \
-- 2026-04-11
Dataproc Serverless:クラスタ管理不要のSparkジョブ実行
Serverlessバッチの実行
# Dataproc Serverlessでジョブ実行(クラスタ不要)
gcloud dataproc batches submit pyspark \
gs://dataproc-scripts/etl/daily_sales_etl.py \
--region=asia-northeast1 \
--deps-bucket=gs://dataproc-deps \
--properties="spark.sql.adaptive.enabled=true" \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar \
--subnet=projects/my-project/regions/asia-northeast1/subnetworks/default \
--service-account=dataproc-spark@my-project.iam.gserviceaccount.com \
--batch=daily-sales-$(date +%Y%m%d) \
-- 2026-04-11
Dataprocクラスタ vs Serverless比較
| 項目 | Dataprocクラスタ | Dataproc Serverless |
|---|---|---|
| クラスタ管理 | 必要 | 不要 |
| 起動時間 | 90秒〜 | 30秒〜 |
| 課金 | クラスタ稼働中は課金 | ジョブ実行時間のみ |
| カスタマイズ | 高い(初期化スクリプト) | 制限あり |
| 常時起動 | 可能 | 不可 |
| 適用シナリオ | 長時間ジョブ、インタラクティブ | バッチジョブ、アドホック |
パフォーマンス最適化
Sparkジョブの最適化テクニック
# 1. パーティション最適化
df = spark.read.parquet("gs://data-lake/large-dataset/")
# 適切なパーティション数に調整
df = df.repartition(200) # 大規模データ
# or
df = df.coalesce(50) # パーティション数を減らす(シャッフルなし)
# 2. ブロードキャストジョイン(小テーブルのジョイン最適化)
from pyspark.sql.functions import broadcast
small_df = spark.read.format("bigquery").option("table", "master.regions").load()
large_df = spark.read.parquet("gs://data-lake/sales/")
result = large_df.join(broadcast(small_df), "region_id")
# 3. キャッシュの活用
frequently_used = df.filter(F.col("status") == "active").cache()
frequently_used.count() # キャッシュをトリガー
# 4. データスキューの対応
from pyspark.sql.functions import when, rand
# スキューしたキーにソルトを追加
salted_df = df.withColumn(
"salted_key",
F.concat(F.col("skewed_key"), F.lit("_"), (F.rand() * 10).cast("int"))
)
# 5. 述語プッシュダウン(BigQuery)
df = spark.read \
.format("bigquery") \
.option("table", "analytics.sales_fact") \
.option("filter", "date >= '2026-01-01'") \
.load()
コスト最適化戦略
- プリエンプティブVM活用: Workerの30-50%をプリエンプティブVMにして、最大80%のコスト削減
- 自動スケーリング: ワークロードに応じたWorker数の動的調整
- Serverless活用: 不定期のバッチジョブはServerlessで実行してアイドルコストを削減
- データ局所性: GCSバケットとDataprocクラスタを同じリージョンに配置
- 圧縮フォーマット: ParquetやORC等の列指向フォーマットを使用してI/Oを削減
Cloud Composerとの統合:ワークフローオーケストレーション
"""
Cloud Composer (Airflow) からDataprocジョブを実行するDAG
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import (
DataprocSubmitPySparkJobOperator,
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
)
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='dataproc_daily_etl',
default_args=default_args,
schedule_interval='0 3 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['dataproc', 'spark', 'etl'],
) as dag:
# 1. エフェメラルクラスタの作成
create_cluster = DataprocCreateClusterOperator(
task_id='create_cluster',
project_id='my-project',
cluster_name='ephemeral-etl-{{ ds_nodash }}',
region='asia-northeast1',
cluster_config={
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n2-standard-4',
},
'worker_config': {
'num_instances': 4,
'machine_type_uri': 'n2-standard-8',
},
'software_config': {
'image_version': '2.2',
},
},
)
# 2. Sparkジョブ実行
submit_spark_job = DataprocSubmitPySparkJobOperator(
task_id='submit_etl_job',
main='gs://dataproc-scripts/etl/daily_sales_etl.py',
cluster_name='ephemeral-etl-{{ ds_nodash }}',
region='asia-northeast1',
arguments=['{{ ds }}'],
dataproc_jars=[
'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.0.jar',
],
)
# 3. クラスタ削除
delete_cluster = DataprocDeleteClusterOperator(
task_id='delete_cluster',
project_id='my-project',
cluster_name='ephemeral-etl-{{ ds_nodash }}',
region='asia-northeast1',
trigger_rule='all_done', # 成功・失敗に関わらず削除
)
create_cluster >> submit_spark_job >> delete_cluster
SES案件でのDataproc活用
よくある案件パターン
- データレイク構築: GCS + Dataproc + BigQueryでエンタープライズデータレイクを構築
- ETL移行: オンプレミスのHadoopクラスタからDataprocへの移行
- ML前処理: 大規模データの特徴量エンジニアリングをSparkで実行
- ログ分析: アクセスログ・アプリケーションログの大規模バッチ分析
SES市場での単価傾向(2026年)
- データエンジニア(Spark + GCP): 月額75〜100万円
- データプラットフォーム(Dataproc + BigQuery + Composer): 月額85〜110万円
- Hadoop→GCP移行: 月額80〜105万円
- MLエンジニア(Spark ML + Vertex AI): 月額85〜115万円
まとめ:Dataproc × Spark × BigQueryでデータ処理を高速化する
Google Cloud Dataprocを使った大規模データ処理パイプラインのポイントをまとめます。
- マネージドSpark環境: 90秒でクラスタを起動し、インフラ管理から解放される
- BigQueryシームレス連携: SparkからBigQueryの読み書きが数行のコードで完了
- Serverless活用: 不定期のバッチジョブはServerlessで実行しコストを最適化
- パフォーマンス最適化: AQE・ブロードキャストジョイン・パーティション戦略で高速化
- Cloud Composer統合: Airflowでワークフローをオーケストレーションし、エフェメラルクラスタで効率運用
データエンジニアリングスキルはSES市場で最も成長率が高い分野です。Dataproc + Sparkをマスターして、高単価案件を獲得しましょう。