𝕏 f B! L
案件・求人数 12,345
案件を探す(準備中) エージェントを探す(準備中) お役立ち情報 ログイン
案件・求人数 12,345
AWS MWAA(Amazon Managed Apache Airflow)でデータパイプラインを構築する方法|SESエンジニア実践ガイド

AWS MWAA(Amazon Managed Apache Airflow)でデータパイプラインを構築する方法|SESエンジニア実践ガイド

AWSMWAAApache AirflowデータパイプラインETL
目次

データエンジニアリングの需要が急増する中、Apache Airflowを使ったデータパイプラインの構築・運用スキルは、SES市場で最も価値の高いスキルの一つとなっています。AWS MWAA(Amazon Managed Workflows for Apache Airflow)は、Airflowのインフラ管理を不要にし、エンジニアがパイプラインのロジックに集中できる環境を提供します。

本記事では、AWS MWAAを使ったデータパイプライン構築の基本から実践的な運用まで、SES案件で即活用できるスキルを体系的に解説します。DAG設計のベストプラクティス、S3・Redshift・Glue連携、監視・トラブルシューティングまで、現場で必要なノウハウをお伝えします。

AWS MWAAデータパイプラインアーキテクチャ

AWS MWAAとは:Apache Airflowのマネージドサービス

Apache Airflowの基本概念

Apache Airflowは、データパイプラインのオーケストレーションツールです。Pythonでワークフロー(DAG: Directed Acyclic Graph)を定義し、タスクの依存関係を管理しながら実行します。

Airflowの主要コンポーネント:

  • DAG(有向非巡回グラフ): ワークフロー全体を定義するPythonファイル
  • Task: DAG内の個々の処理ステップ
  • Operator: タスクの実行内容を定義する部品(BashOperator、PythonOperator等)
  • Scheduler: DAGのスケジュール管理と実行制御
  • Executor: タスクの実行環境(CeleryExecutor、LocalExecutor等)
  • Metastore: DAGの実行状態を管理するデータベース

AWS MWAA vs 自前構築

項目AWS MWAA自前構築(EC2/EKS)
インフラ管理不要(フルマネージド)必要(サーバー管理)
スケーリング自動手動設定
アップグレードAWSが管理自前で対応
セキュリティVPC統合・IAM連携自前で設定
コスト環境サイズに応じた固定費EC2/EKSのコスト
カスタマイズ性一部制限あり完全にカスタマイズ可能
可用性マルチAZ対応自前で構成
初期構築時間約30分数日〜1週間

MWAA環境のサイズと料金(2026年時点)

環境クラスWorker数月額概算適用シナリオ
mw1.small1-5約$350開発・検証環境
mw1.medium1-10約$700中規模パイプライン
mw1.large1-20約$1,200大規模本番環境
mw1.xlarge1-25約$2,000ハイパフォーマンス
mw1.2xlarge1-25約$3,500超大規模処理

MWAA環境のセットアップ

Terraformによる環境構築

# MWAA環境
resource "aws_mwaa_environment" "main" {
  name               = "data-pipeline-prod"
  airflow_version    = "2.10.4"
  environment_class  = "mw1.medium"
  
  source_bucket_arn          = aws_s3_bucket.mwaa.arn
  dag_s3_path                = "dags/"
  requirements_s3_path       = "requirements.txt"
  plugins_s3_path            = "plugins.zip"
  
  execution_role_arn = aws_iam_role.mwaa.arn
  
  network_configuration {
    security_group_ids = [aws_security_group.mwaa.id]
    subnet_ids         = var.private_subnet_ids
  }
  
  logging_configuration {
    dag_processing_logs {
      enabled   = true
      log_level = "INFO"
    }
    scheduler_logs {
      enabled   = true
      log_level = "WARNING"
    }
    task_logs {
      enabled   = true
      log_level = "INFO"
    }
    webserver_logs {
      enabled   = true
      log_level = "WARNING"
    }
    worker_logs {
      enabled   = true
      log_level = "WARNING"
    }
  }
  
  airflow_configuration_options = {
    "core.default_timezone"           = "Asia/Tokyo"
    "core.parallelism"                = "32"
    "core.max_active_runs_per_dag"    = "3"
    "celery.worker_concurrency"       = "8"
    "scheduler.catchup_by_default"    = "false"
    "webserver.default_ui_timezone"   = "Asia/Tokyo"
  }
  
  max_workers = 10
  min_workers = 1
  
  tags = {
    Environment = "production"
    Project     = "data-pipeline"
  }
}

# S3バケット(DAG・プラグイン格納)
resource "aws_s3_bucket" "mwaa" {
  bucket = "mwaa-data-pipeline-prod"
}

resource "aws_s3_bucket_versioning" "mwaa" {
  bucket = aws_s3_bucket.mwaa.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_public_access_block" "mwaa" {
  bucket                  = aws_s3_bucket.mwaa.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# IAMロール
resource "aws_iam_role" "mwaa" {
  name = "mwaa-execution-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = ["airflow.amazonaws.com", "airflow-env.amazonaws.com"]
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_role_policy" "mwaa" {
  name = "mwaa-execution-policy"
  role = aws_iam_role.mwaa.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject*",
          "s3:PutObject*",
          "s3:ListBucket",
          "s3:DeleteObject"
        ]
        Resource = [
          aws_s3_bucket.mwaa.arn,
          "${aws_s3_bucket.mwaa.arn}/*",
          "arn:aws:s3:::data-lake-*",
          "arn:aws:s3:::data-lake-*/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogStream",
          "logs:CreateLogGroup",
          "logs:PutLogEvents",
          "logs:GetLogEvents",
          "logs:GetLogRecord"
        ]
        Resource = "arn:aws:logs:*:*:log-group:airflow-*"
      },
      {
        Effect = "Allow"
        Action = [
          "glue:*Job*",
          "glue:*Crawler*",
          "glue:GetDatabase*",
          "glue:GetTable*",
          "glue:GetPartition*"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "redshift-data:ExecuteStatement",
          "redshift-data:GetStatementResult",
          "redshift-data:DescribeStatement",
          "redshift-data:ListStatements"
        ]
        Resource = "*"
      }
    ]
  })
}

DAG設計のベストプラクティス

基本的なETL DAG

"""
ETL DAG: S3 → Glue → Redshift のデータパイプライン
日次実行で売上データを処理する
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.empty import EmptyOperator

# デフォルト引数
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['data-alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='daily_sales_etl',
    default_args=default_args,
    description='日次売上データETLパイプライン',
    schedule_interval='0 2 * * *',  # 毎日AM2:00 JST
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'sales', 'daily'],
) as dag:

    # DAG開始マーカー
    start = EmptyOperator(task_id='start')

    # 1. S3にソースデータが到着するのを待機
    wait_for_source = S3KeySensor(
        task_id='wait_for_source_data',
        bucket_name='data-lake-raw',
        bucket_key='sales/{{ ds }}/sales_data.csv',
        aws_conn_id='aws_default',
        timeout=3600,  # 1時間待機
        poke_interval=60,  # 1分ごとにチェック
        mode='poke',
    )

    # 2. データ品質チェック
    def validate_source_data(**context):
        """ソースデータの品質チェック"""
        import boto3
        import csv
        from io import StringIO
        
        s3 = boto3.client('s3')
        ds = context['ds']
        
        response = s3.get_object(
            Bucket='data-lake-raw',
            Key=f'sales/{ds}/sales_data.csv'
        )
        content = response['Body'].read().decode('utf-8')
        reader = csv.DictReader(StringIO(content))
        rows = list(reader)
        
        # 品質チェック
        assert len(rows) > 0, "データが空です"
        
        required_columns = ['order_id', 'amount', 'customer_id', 'order_date']
        actual_columns = set(rows[0].keys())
        missing = set(required_columns) - actual_columns
        assert not missing, f"必須カラムが不足: {missing}"
        
        # NULLチェック
        null_count = sum(1 for row in rows if not row.get('order_id'))
        assert null_count == 0, f"order_idにNULLが{null_count}件あります"
        
        # 金額の妥当性チェック
        invalid_amounts = [
            row for row in rows 
            if float(row.get('amount', 0)) < 0
        ]
        assert len(invalid_amounts) == 0, f"負の金額が{len(invalid_amounts)}件あります"
        
        context['ti'].xcom_push(key='row_count', value=len(rows))
        print(f"✅ データ品質チェック通過: {len(rows)}行")

    validate_data = PythonOperator(
        task_id='validate_source_data',
        python_callable=validate_source_data,
    )

    # 3. Glue ETLジョブ実行
    run_glue_job = GlueJobOperator(
        task_id='run_glue_etl',
        job_name='sales-data-etl',
        script_location='s3://glue-scripts/sales_etl.py',
        aws_conn_id='aws_default',
        script_args={
            '--source_path': 's3://data-lake-raw/sales/{{ ds }}/',
            '--target_path': 's3://data-lake-processed/sales/{{ ds }}/',
            '--execution_date': '{{ ds }}',
        },
        num_of_dpus=5,
        region_name='ap-northeast-1',
    )

    # 4. 処理済みデータのバリデーション
    def validate_processed_data(**context):
        """処理済みデータの検証"""
        import boto3
        
        s3 = boto3.client('s3')
        ds = context['ds']
        
        # Parquetファイルの存在確認
        response = s3.list_objects_v2(
            Bucket='data-lake-processed',
            Prefix=f'sales/{ds}/',
        )
        
        files = response.get('Contents', [])
        assert len(files) > 0, "処理済みファイルが見つかりません"
        
        total_size = sum(f['Size'] for f in files)
        source_rows = context['ti'].xcom_pull(
            task_ids='validate_source_data', key='row_count'
        )
        
        print(f"✅ 処理済みファイル: {len(files)}個, 合計サイズ: {total_size/1024:.1f}KB")
        print(f"   ソース行数: {source_rows}")

    validate_processed = PythonOperator(
        task_id='validate_processed_data',
        python_callable=validate_processed_data,
    )

    # 5. Redshiftへのロード
    load_to_redshift = RedshiftDataOperator(
        task_id='load_to_redshift',
        cluster_identifier='analytics-cluster',
        database='analytics',
        sql="""
            COPY sales_staging
            FROM 's3://data-lake-processed/sales/{{ ds }}/'
            IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-copy-role'
            FORMAT AS PARQUET;
            
            -- ステージング→本テーブルへのマージ
            BEGIN TRANSACTION;
            
            DELETE FROM sales_fact
            WHERE order_date = '{{ ds }}'::date;
            
            INSERT INTO sales_fact
            SELECT * FROM sales_staging;
            
            TRUNCATE TABLE sales_staging;
            
            END TRANSACTION;
        """,
        await_result=True,
        poll_interval=10,
    )

    # 6. データ品質チェック(Redshift上)
    quality_check_redshift = RedshiftDataOperator(
        task_id='quality_check_redshift',
        cluster_identifier='analytics-cluster',
        database='analytics',
        sql="""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT order_id) as unique_orders,
                SUM(amount) as total_amount,
                MIN(order_date) as min_date,
                MAX(order_date) as max_date
            FROM sales_fact
            WHERE order_date = '{{ ds }}'::date;
        """,
        await_result=True,
    )

    # DAG完了マーカー
    end = EmptyOperator(task_id='end')

    # タスク依存関係
    start >> wait_for_source >> validate_data >> run_glue_job >> \
    validate_processed >> load_to_redshift >> quality_check_redshift >> end

データ品質モニタリングDAG

"""
データ品質モニタリングDAG
各テーブルのデータ品質を定期チェックし、異常を検出する
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.hooks.sns import SnsHook

default_args = {
    'owner': 'data-quality',
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

def check_data_freshness(**context):
    """データの鮮度チェック"""
    from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
    
    hook = RedshiftDataHook(aws_conn_id='aws_default')
    
    tables_to_check = [
        ('sales_fact', 'order_date', 1),       # 1日以内
        ('user_events', 'event_timestamp', 1),  # 1日以内
        ('inventory', 'updated_at', 7),         # 7日以内
    ]
    
    alerts = []
    for table, date_col, max_age_days in tables_to_check:
        result = hook.execute_query(
            cluster_identifier='analytics-cluster',
            database='analytics',
            sql=f"""
                SELECT 
                    DATEDIFF(day, MAX({date_col}), GETDATE()) as age_days
                FROM {table}
            """,
        )
        
        age = result[0][0] if result else None
        if age and age > max_age_days:
            alerts.append(f"⚠️ {table}: 最新データが{age}日前(閾値: {max_age_days}日)")
    
    if alerts:
        context['ti'].xcom_push(key='alerts', value=alerts)
        return 'send_alert'
    return 'no_alert'

def send_quality_alert(**context):
    """品質アラートをSNSで送信"""
    alerts = context['ti'].xcom_pull(
        task_ids='check_freshness', key='alerts'
    )
    
    hook = SnsHook(aws_conn_id='aws_default')
    hook.publish_to_target(
        target_arn='arn:aws:sns:ap-northeast-1:123456789012:data-quality-alerts',
        message='\n'.join(alerts),
        subject='[データ品質] 異常検出',
    )

with DAG(
    dag_id='data_quality_monitoring',
    default_args=default_args,
    schedule_interval='0 */6 * * *',  # 6時間ごと
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['monitoring', 'data-quality'],
) as dag:

    check_freshness = BranchPythonOperator(
        task_id='check_freshness',
        python_callable=check_data_freshness,
    )

    alert = PythonOperator(
        task_id='send_alert',
        python_callable=send_quality_alert,
    )

    no_alert = PythonOperator(
        task_id='no_alert',
        python_callable=lambda: print("✅ データ品質正常"),
    )

    check_freshness >> [alert, no_alert]

運用・監視のベストプラクティス

CloudWatchによるMWAA監視

# CloudWatchダッシュボード設定(AWS CDK)
from aws_cdk import (
    aws_cloudwatch as cw,
    Duration,
)

dashboard = cw.Dashboard(self, "MWAADashboard",
    dashboard_name="mwaa-monitoring"
)

# DAG実行成功率
dag_success_widget = cw.GraphWidget(
    title="DAG実行結果",
    left=[
        cw.Metric(
            namespace="AmazonMWAA",
            metric_name="DAGProcessingSuccess",
            dimensions_map={"Environment": "data-pipeline-prod"},
            statistic="Sum",
            period=Duration.minutes(5),
        ),
        cw.Metric(
            namespace="AmazonMWAA",
            metric_name="DAGProcessingFailure",
            dimensions_map={"Environment": "data-pipeline-prod"},
            statistic="Sum",
            period=Duration.minutes(5),
        ),
    ],
    width=12,
)

# Worker使用率
worker_widget = cw.GraphWidget(
    title="Worker使用率",
    left=[
        cw.Metric(
            namespace="AmazonMWAA",
            metric_name="RunningTasks",
            dimensions_map={"Environment": "data-pipeline-prod"},
            statistic="Average",
            period=Duration.minutes(5),
        ),
        cw.Metric(
            namespace="AmazonMWAA",
            metric_name="QueuedTasks",
            dimensions_map={"Environment": "data-pipeline-prod"},
            statistic="Average",
            period=Duration.minutes(5),
        ),
    ],
    width=12,
)

dashboard.add_widgets(dag_success_widget, worker_widget)

トラブルシューティングガイド

症状考えられる原因対応方法
DAGが表示されない構文エラー、import失敗CloudWatch Logsのdag-processing-logsを確認
タスクがキューで滞留Worker不足、メモリ不足Worker数を増加、環境クラスをアップグレード
タスクがタイムアウト処理時間超過、外部サービス遅延execution_timeoutを調整、リトライ戦略を見直し
OOM(メモリ不足)大量データのインメモリ処理チャンク処理に変更、Glueに処理を移譲
S3アクセス拒否IAMロール権限不足実行ロールのポリシーを確認・更新

SES案件でのデータエンジニアリングスキル

需要の高いスキルセット

  1. Apache Airflow / MWAA: DAG設計、運用、トラブルシューティング
  2. AWS Glue: ETLジョブ、Crawlerの設計
  3. Amazon Redshift: データウェアハウス設計、クエリ最適化
  4. dbt: データ変換、テスト、ドキュメンテーション
  5. Spark / EMR: 大規模データ処理

SES市場での単価傾向(2026年)

  • データエンジニア(Airflow + AWS): 月額75〜100万円
  • データプラットフォームエンジニア: 月額85〜110万円
  • MLOps / データパイプライン: 月額80〜105万円
  • dbt + Airflow: 月額70〜90万円

まとめ:AWS MWAAでデータパイプラインを構築するポイント

AWS MWAAを使ったデータパイプライン構築の要点をまとめます。

  1. マネージド環境の活用: インフラ管理をAWSに任せ、DAG開発に集中
  2. DAG設計のベストプラクティス: 冪等性、データ品質チェック、エラーハンドリングを組み込む
  3. AWS統合: S3、Glue、Redshift、SNSとの連携でフルマネージドなパイプラインを実現
  4. 監視・アラート: CloudWatch + SNSで問題を早期検出
  5. コスト最適化: Worker数の動的調整、環境クラスの適切な選択

データエンジニアリングスキルは、SES市場で最も成長率が高い分野の一つです。AWS MWAAをマスターして、高単価案件を獲得しましょう。

関連記事

SES案件をお探しですか?

SES記事をもっと読む →
🏗️

SES BASE 編集長

SES業界歴10年以上のメンバーが在籍する編集チーム。SES企業での営業・エンジニア経験、フリーランス独立経験を持つメンバーが、業界のリアルな情報をお届けします。

📊 業界データに基づく記事制作 🔍 IPA・経済産業省データ参照 💼 SES実務経験者が執筆・監修