Amazon MWAA(Managed Workflows for Apache Airflow)

目次

初心者から実務者向けの包括的解説

Amazon Managed Workflows for Apache Airflow(MWAA) は、Apache Airflow をフルマネージドのサーバーレスサービスとして提供し、データパイプライン・ETL ジョブ・機械学習ワークフローを Python ベースの DAG で自動化・オーケストレーションできます。インフラ管理・スケーリング・パッチ適用を AWS が完全に担当し、データエンジニアリングチームは DAG 開発に専念できます。このページでは、MWAA の概念・アーキテクチャ・実装・運用・近年の動向を体系的に解説します。

このページの目的

このページでは以下を対象としています。

  • 初心者向け:Airflow とは何か、MWAA が解決する課題を学びたい方
  • データエンジニア向け:DAG 開発・S3 デプロイ・AWS サービス統合を実装したい方
  • DevOps/SRE 向け:Environment 構成・Auto Scaling・監視・ロギングを運用したい方
  • アーキテクト向け:Step Functions・Glue・EventBridge との使い分け・ベストプラクティス
  • 意思決定者向け:Astronomer・セルフホスト Airflow との投資判断

2025-2026 年の MWAA エコシステム

  • Apache Airflow 2.8 / 2.9 サポート:最新エンジンで Deferrable Task・Dynamic Task Mapping 対応
  • Private Webserver モード:VPC 内からのみアクセス可能な Private Webserver
  • IAM Authentication:IAM Roles for Service Accounts(IRSA)による細粒度認証
  • CloudWatch Logs 統合強化:スケジューラー・ワーカー・ウェブサーバーのログを一元管理
  • 環境クラス拡張:mw1.xlarge・mw1.2xlarge で大規模 DAG 対応
  • プラグイン・依存関係の自動検出:requirements.txt 変更時の自動再デプロイ
  • Cross-Account DAG デプロイ:AWS CodePipeline との統合で CI/CD 自動化

概要

初心者向けメモ:MWAA は「Apache Airflow をマネージドサービス化したもの」です。Airflow は複数のステップを持つワークフロー(DAG = Directed Acyclic Graph)を定義・スケジュール・監視するオープンソースツール。MWAA を使えば、Airflow の EC2 クラスター・データベース・スケーリング管理が不要になり、Python コードで DAG を書いて S3 に置くだけで実行できます。Glue ETL・Redshift・SageMaker などの AWS サービスと統合し、データパイプラインの自動化に特化しています。

MWAA の位置づけ:

graph LR
    DataSource["データソース<br/>S3/RDS/API"]
    MWAA["Amazon MWAA<br/>DAG Orchestration"]
    Tasks["並列タスク処理<br/>Glue/Redshift/ECS"]
    Output["出力・分析<br/>S3/Redshift/Athena"]

    DataSource -->|監視・トリガー| MWAA
    MWAA -->|実行| Tasks
    Tasks -->|結果格納| Output

    Monitor["CloudWatch Logs<br/>Metrics/Alarms"]
    MWAA -.->|ログ・メトリクス| Monitor

MWAA が解決する課題

課題 1: Airflow インフラ管理の複雑性

状況:セルフホスト Airflow でメタデータ DB(PostgreSQL)・スケジューラー・ワーカー・Web サーバーを EC2/Kubernetes で運用。スケーリング・パッチ適用・HA 設定が手動。

MWAA による解決

  • AWS が Environment・Instance Class・Worker Scaling を自動管理
  • Aurora PostgreSQL メタデータ DB が MWAA 専用(マネージド)
  • マルチ AZ HA が標準装備
  • 運用工数を 80% 削減

課題 2: Airflow DAG の依存関係・リトライ・監視

状況:複数の Glue ETL → Redshift COPY → SageMaker 学習 → データ検証というパイプラインで、任意のステップが失敗時に全体が停止。リトライ・スキップ・アラートを手動管理。

MWAA による解決

  • DAG で依存関係を宣言的に定義
  • 自動リトライ・タイムアウト・リトライ間隔を設定可能
  • CloudWatch Logs でスケジューラー・ワーカー・タスクログを一元管理
  • Slack・SNS・EventBridge 連携でアラート自動化

課題 3: 開発環境・本番環境の一貫性

状況:ローカル Airflow で DAG テスト後、本番に deploy すると scheduler timeout や plugin エラーが発生。バージョン・dependency が環境ごとに異なる。

MWAA による解決

  • Apache Airflow バージョンを 2.0 / 2.4 / 2.8 等で明示的に指定
  • requirements.txt を S3 に置いて pip install 自動管理
  • plugins.zip をアップロードして全ワーカーで共有
  • 本番前に mwaa test コマンドで DAG syntax check

主な特徴

特徴 説明
フルマネージド インフラ・OS パッチ・Airflow アップグレードを AWS 管理
サーバーレススケーリング mw1.micro~mw1.2xlarge の Instance Class で固定スペック
Auto Scaling min-workers / max-workers 設定で自動スケール
S3 統合 DAG・plugins・requirements を S3 bucket から自動同期
AWS サービス統合 Glue・Redshift・SageMaker・ECS 等を Operator で直接連携
Secrets Manager 統合 Airflow Connection を Secrets Manager で一元管理
CloudWatch Logs スケジューラー・ワーカー・Web サーバーログを自動収集
Private / Public Webserver VPC 内or インターネット経由でのアクセス制御
IAM Authentication IAM Roles for Service Accounts(IRSA)で細粒度認証

アーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│ Amazon MWAA Environment                                     │
│                                                             │
│  ┌────────────────────────────────────────────────────┐   │
│  │ Apache Airflow Control Plane                       │   │
│  │  ├─ Web Server(Public/Private)                   │   │
│  │  ├─ Scheduler(マルチスケジューラー対応)           │   │
│  │  └─ Metadata DB(Aurora PostgreSQL)               │   │
│  └────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌────────────────────────────────────────────────────┐   │
│  │ Worker Fleet(Fargate)                            │   │
│  │  ├─ min_workers ~ max_workers 自動スケール        │   │
│  │  ├─ Task Execution                                 │   │
│  │  └─ CloudWatch Logs emit                           │   │
│  └────────────────────────────────────────────────────┘   │
│                                                             │
│  ストレージ層:                                             │
│  ├─ S3 Bucket(DAGs / plugins.zip / requirements.txt)    │
│  ├─ Aurora Metadata DB                                    │
│  └─ CloudWatch Logs / X-Ray Traces                        │
└─────────────────────────────────────────────────────────────┘
       ↕
   ┌─ AWS VPC (Private Subnets)
   │  ├─ NAT Gateway → Internet
   │  └─ VPC Endpoint (S3, Secrets Manager)
   │
   └─ IAM Role(Execution Role for Workers)
      ├─ S3 Read(DAGs / plugins)
      ├─ S3 Write(Logs)
      ├─ Secrets Manager Read(Connections)
      └─ AWS Service Permissions(Glue, Redshift, etc.)

コアコンポーネント

1. Environment(環境)

MWAA 環境は Airflow インスタンスの最小単位。1 つのアカウント・リージョンで複数の Environment を作成できます。

# Environment 作成
aws mwaa create-environment \
  --name prod-etl-pipeline \
  --airflow-version 2.8.1 \
  --environment-class mw1.large

2. Instance Class

Class vCPU Memory 用途
mw1.micro 0.5 1 GB 開発・テスト(非本番)
mw1.small 1 2 GB 小規模本番(<100 DAG)
mw1.medium 2 4 GB 標準本番(推奨)
mw1.large 4 8 GB 大規模(100-500 DAG)
mw1.xlarge 8 16 GB 超大規模・複雑 DAG
mw1.2xlarge 16 32 GB エンタープライズ

3. Scheduler

  • Airflow DAG のスケジュール・トリガー・task 依存関係を管理
  • Multi-Scheduler 構成(Airflow 2.x 以降)で高可用性
  • DAG Parse Interval:デフォルト 30 秒(DAG 変更を検知・更新)
  • Catchup:デフォルト False(バックフィル機能の有効化)

4. Workers

  • Fargate コンテナで実行される Task Executor
  • min_workers / max_workers で Auto Scaling
  • CloudWatch Logs に task stdout/stderr を自動出力

5. DAG(Directed Acyclic Graph)

Python で定義されたワークフロー。Task の依存関係・並列実行・条件分岐を記述。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id='simple_pipeline',
    start_date=datetime(2025, 1, 1),
    schedule_interval='0 2 * * *',  # 毎日 2 AM
    catchup=False,
) as dag:

    def extract:
        print("Extracting data...")

    def load:
        print("Loading data...")

    t1 = PythonOperator(task_id='extract_task', python_callable=extract)
    t2 = PythonOperator(task_id='load_task', python_callable=load)

    t1 >> t2  # 依存関係

Environment・Instance Class・ワーカースケーリング

Environment の作成(詳細)

aws mwaa create-environment \
  --name production-etl \
  --airflow-version 2.8.1 \
  --environment-class mw1.medium \
  --max-workers 20 \
  --min-workers 2 \
  --dag-s3-path s3://my-mwaa-bucket/dags/ \
  --plugins-s3-path s3://my-mwaa-bucket/plugins.zip \
  --requirements-s3-path s3://my-mwaa-bucket/requirements.txt \
  --execution-role-arn arn:aws:iam::123456789012:role/MWAAExecutionRole \
  --network-configuration \
    SubnetIds=subnet-private-a,subnet-private-b \
    SecurityGroupIds=sg-mwaa \
  --source-bucket-arn arn:aws:s3:::my-mwaa-bucket \
  --webserver-access-mode PRIVATE_ONLY \
  --weekly-maintenance-window-start MON:03:00 \
  --airflow-configuration-options \
    core.default_task_retries=3 \
    core.default_view=tree \
    celery.worker_autoscale=10,1 \
    logging.logging_level=INFO

Auto Scaling 戦略

# DAG 内で resource requirements を指定
from airflow.models import Variable

# min_workers = 2(常時実行)
# max_workers = 20(ピーク時)
#
# スケーリング判断:
# - Active Task 数が閾値を超えたら workers 追加
# - 1 分以上 task がない状態が続いたら workers 削減

Instance Class の選択基準

# 環境クラス選択フロー

if num_dag < 50 and daily_tasks < 100:
    instance_class = "mw1.small"  # 開発・小規模本番
elif num_dag < 200 and daily_tasks < 500:
    instance_class = "mw1.medium"  # 標準本番(推奨)
elif num_dag < 500 and daily_tasks < 1500:
    instance_class = "mw1.large"   # 大規模
else:
    instance_class = "mw1.xlarge"  # エンタープライズ

DAG の開発・デプロイメント

DAG 開発フロー

# 1. ローカルで DAG を開発・テスト
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

with DAG(
    dag_id='ecommerce_etl',
    start_date=datetime(2025, 1, 1),
    schedule_interval='0 1 * * *',  # 毎日 1 AM UTC
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'execution_timeout': timedelta(hours=2),
    },
    tags=['etl', 'production', 'ecommerce'],
) as dag:

    # タスク 1: S3 入力ファイルを監視
    wait_input = S3KeySensor(
        task_id='wait_for_input_file',
        bucket_name='ecommerce-data',
        bucket_key='raw/{{ ds }}/orders.csv',
        aws_conn_id='aws_default',
        timeout=3600,
        poke_interval=60,
    )

    # タスク 2: Glue ETL を実行
    run_glue_etl = GlueJobOperator(
        task_id='run_glue_etl_job',
        job_name='ecommerce-etl-process',
        script_args={
            '--input_path': 's3://ecommerce-data/raw/{{ ds }}/',
            '--output_path': 's3://ecommerce-data/processed/{{ ds }}/',
        },
        aws_conn_id='aws_default',
    )

    # タスク 3: Redshift にロード
    load_redshift = RedshiftSQLOperator(
        task_id='load_to_redshift',
        sql="""
            COPY analytics.orders
            FROM 's3://ecommerce-data/processed/{{ ds }}/orders/'
            IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
            FORMAT AS PARQUET
            MANIFEST;
        """,
        redshift_conn_id='redshift_default',
    )

    # 依存関係を定義
    wait_input >> run_glue_etl >> load_redshift

mwaa test コマンド(DAG Validation)

# ローカルで DAG をテスト(syntax & import 確認)
python -m py_compile dags/ecommerce_etl.py

# または、Airflow CLI でテスト
airflow dags test ecommerce_etl 2025-01-01

# MWAA 環境と同じバージョンで テスト実行
docker run -it -v $(pwd):/mwaa \
  amazon/mwaa:2.8.1 \
  airflow dags test ecommerce_etl 2025-01-01

S3 統合によるコード管理

S3 バケット構造

my-mwaa-bucket/
├── dags/
│   ├── ecommerce_etl.py
│   ├── data_quality_check.py
│   ├── common/
│   │   ├── __init__.py
│   │   └── utils.py
│   └── config.py
│
├── plugins.zip
│   └── (custom_operators, hooks, sensors)
│
└── requirements.txt
    ├── apache-airflow-providers-amazon==6.2.0
    ├── pydantic>=2.0
    └── requests==2.31.0

S3 Auto-sync メカニズム

# Environment 作成時に DAG S3 path を指定
--dag-s3-path s3://my-mwaa-bucket/dags/

# MWAA Scheduler が定期的に polling(デフォルト 30 秒)
# → S3 内の DAG を自動検出・reload
# → plugins.zip を自動展開
# → requirements.txt を pip install

# デプロイフロー:
# git commit → CodePipeline → S3 PUT → MWAA 自動反映

DAG Deploy ベストプラクティス

# CI/CD パイプラインから S3 に sync

# 1. DAG を git にcommit
git add dags/new_pipeline.py
git commit -m "feat: add new pipeline"

# 2. CodePipeline で自動テスト
pytest tests/test_dags.py

# 3. S3 に sync(テスト成功時のみ)
aws s3 sync ./dags s3://my-mwaa-bucket/dags/ \
  --delete \
  --exclude "*.pyc" \
  --exclude "__pycache__/*"

# 4. requirements.txt アップロード(pip install 自動トリガー)
aws s3 cp requirements.txt s3://my-mwaa-bucket/requirements.txt

# 5. MWAA が自動反映(30 秒以内)
# → Scheduler logs で確認
aws logs tail /aws/mwaa/prod-etl-pipeline/scheduler --follow

AWS サービス統合

Glue ETL Operator

from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

glue_task = GlueJobOperator(
    task_id='run_glue_etl',
    job_name='my-glue-job',
    script_args={
        '--input_bucket': 's3://raw-data',
        '--output_bucket': 's3://processed-data',
        '--date': '{{ ds }}',
    },
    aws_conn_id='aws_default',
    wait_for_completion=True,
)

Redshift SQL Operator

from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

redshift_task = RedshiftSQLOperator(
    task_id='redshift_analysis',
    sql="""
        INSERT INTO analytics.daily_summary
        SELECT
            date,
            COUNT(*) as order_count,
            SUM(amount) as total_revenue
        FROM staging.orders
        WHERE date = '{{ ds }}'
        GROUP BY date;
    """,
    redshift_conn_id='redshift_default',
)

SageMaker Training Operator

from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator

sagemaker_task = SageMakerTrainingOperator(
    task_id='train_ml_model',
    config={
        'TrainingJobName': 'ml-model-{{ ds }}',
        'RoleArn': 'arn:aws:iam::123456789012:role/SageMakerRole',
        'AlgorithmSpecification': {
            'TrainingImage': '382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
            'TrainingInputMode': 'File',
        },
        'InputDataConfig': [
            {
                'ChannelName': 'training',
                'DataSource': {
                    'S3DataSource': {
                        'S3Uri': 's3://ml-data/train/{{ ds }}/',
                        'S3DataType': 'S3Prefix',
                        'S3DataDistributionType': 'FullyReplicated',
                    }
                },
            }
        ],
        'OutputDataConfig': {
            'S3OutputPath': 's3://ml-models/output/',
        },
    },
    wait_for_completion=True,
)

Lambda Operator

from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator

lambda_task = LambdaInvokeFunctionOperator(
    task_id='invoke_data_validation',
    function_name='data_quality_check',
    payload='{{"bucket": "processed-data", "key": "{{ ds }}/data.parquet"}}',
    aws_conn_id='aws_default',
)

ECS Task Operator

from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

ecs_task = EcsRunTaskOperator(
    task_id='run_ecs_container',
    cluster='ecs-prod-cluster',
    task_definition='custom_data_processing',
    launch_type='FARGATE',
    network_configuration={
        'awsvpcConfiguration': {
            'subnets': ['subnet-private-a', 'subnet-private-b'],
            'securityGroups': ['sg-ecs-tasks'],
            'assignPublicIp': 'DISABLED',
        }
    },
    overrides={
        'containerOverrides': [
            {
                'name': 'data_processor',
                'environment': [
                    {'name': 'DATE', 'value': '{{ ds }}'},
                ],
            }
        ]
    },
    awslogs_group='/ecs/data-processing',
    awslogs_stream_prefix='mwaa-',
)

Airflow Connections・Secrets Manager 統合

Secrets Manager での Connection 管理

# AWS Secrets Manager にシークレットを保存
import json
import boto3

secret_dict = {
    "conn_type": "postgres",
    "host": "prod-redshift.xxx.redshift.amazonaws.com",
    "login": "redshift_user",
    "password": "SecurePassword123!",
    "port": 5439,
    "schema": "analytics",
}

boto3.client('secretsmanager').create_secret(
    Name='airflow/connections/redshift_default',
    SecretString=json.dumps(secret_dict)
)

Secrets Manager バックエンド有効化

# MWAA Environment を更新(Secrets Manager バックエンドを有効化)
aws mwaa update-environment \
  --name prod-etl-pipeline \
  --airflow-configuration-options \
    'secrets.backend=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' \
    'secrets.backend_kwargs={"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'

# これ以降、Airflow は Connections を Secrets Manager から自動取得

Variable の Secrets Manager 管理

from airflow.models import Variable

# Secrets Manager に保存されたシークレットを取得
database_host = Variable.get('db_host')  # airflow/variables/db_host から取得
api_key = Variable.get('api_key')        # airflow/variables/api_key から取得

主要ユースケース

1. ETL パイプラインの自動化

S3 Data Lake
    ↓
Glue Data Catalog(テーブル登録)
    ↓
MWAA DAG
    ├→ Glue ETL(Spark 変換)
    ├→ Redshift COPY(DW ロード)
    └→ Athena Query(検証)
    ↓
CloudWatch Logs(監視・アラート)

2. 機械学習パイプラインのオーケストレーション

Data Preparation (Glue)
    ↓
Feature Engineering (SageMaker Processing)
    ↓
Model Training (SageMaker Training)
    ↓
Model Evaluation (Lambda)
    ↓
Model Registry (SageMaker Model Registry)
    ↓
Batch Inference (SageMaker Batch Transform)

3. データクオリティーチェック

Glue ETL → Output
    ↓
MWAA Data Quality DAG
    ├→ Great Expectations
    ├→ Custom Validation Rules
    └→ SNS Notification(失敗時)

4. マルチテナント SaaS データパイプライン

Customer Data Ingestion
    ↓
MWAA (Tenant-specific DAGs)
    ├→ Tenant A: Extract → Transform → Load
    ├→ Tenant B: Extract → Transform → Load
    └→ Tenant C: Extract → Transform → Load
    ↓
Isolated DynamoDB Instances

5. ストリーミングデータ + Batch 処理

Kinesis → Lambda (buffering)
    ↓
S3 (parquetized)
    ↓
MWAA DAG (hourly batch)
    ├→ Athena query
    └→ Redshift update

6. リアルタイムレコメンデーション更新

User Behavior Events
    ↓
Kinesis Firehose → S3
    ↓
MWAA (nightly DAG)
    ├→ Glue ETL (features preparation)
    ├→ SageMaker Training
    └→ ElastiCache (recommendations cache)

7. ログ集約・監視

Application Logs (CloudWatch Logs)
    ↓
MWAA Log Processing DAG
    ├→ Athena (ad-hoc queries)
    ├→ OpenSearch (indexing)
    └→ S3 Data Lake (archive)

8. 依存関係管理された複雑 DAG

External API Call
    ↓ (success → proceed / fail → retry)
    ↓
Parallel Processing (Map)
    ├→ Process A
    ├→ Process B
    └→ Process C
    ↓
Aggregation
    ↓
Report Generation (SES)

9. 条件分岐・動的 DAG

Data Check Task
    ↓
BranchOperator
    ├→ データ品質 OK → Analysis
    └→ データ品質 NG → Alert + Rollback

10. クロスアカウント DAG 実行

Account A (MWAA Environment)
    ↓ AssumeRole
Account B (Cross-Account Resource)
    ├→ Redshift Cluster
    └→ S3 Bucket

11. スケジュール変動への対応

MWAA Backfill (catchup=True)
    ↓
月末決算処理(特殊スケジュール)
    ↓
年度末監査(1 回限りの大規模処理)

12. ガバナンス・監査ログ

MWAA DAG Execution
    ↓
CloudTrail(IAM 操作)
    ↓
CloudWatch Logs(Scheduler/Worker logs)
    ↓
S3(ログアーカイブ)
    ↓
QuickSight(ダッシュボード)

CLI・SDK・IaC による操作例

AWS CLI での操作

# 1. Environment 作成
aws mwaa create-environment \
  --name my-airflow-env \
  --airflow-version 2.8.1 \
  --environment-class mw1.medium \
  --max-workers 10 \
  --min-workers 1 \
  --dag-s3-path s3://bucket/dags/ \
  --execution-role-arn arn:aws:iam::123456789012:role/MWAARole \
  --network-configuration SubnetIds=subnet-xxx,subnet-yyy \
  --source-bucket-arn arn:aws:s3:::bucket

# 2. DAG 実行をトリガー
aws mwaa create-cli-token --name my-airflow-env
# → token 取得 → curl で Airflow API 呼び出し

# 3. Environment 情報確認
aws mwaa get-environment --name my-airflow-env

# 4. Logs を確認
aws logs tail /aws/mwaa/my-airflow-env/scheduler --follow

# 5. Environment 削除
aws mwaa delete-environment --name my-airflow-env

boto3 SDK での操作

import boto3
import json

mwaa = boto3.client('mwaa', region_name='us-east-1')

# Environment 作成
response = mwaa.create_environment(
    Name='prod-etl',
    AirflowVersion='2.8.1',
    EnvironmentClass='mw1.medium',
    MaxWorkers=20,
    MinWorkers=2,
    DagS3Path='s3://my-bucket/dags/',
    ExecutionRoleArn='arn:aws:iam::123456789012:role/MWAARole',
    NetworkConfiguration={
        'SubnetIds': ['subnet-a', 'subnet-b'],
        'SecurityGroupIds': ['sg-mwaa'],
    },
    AirflowConfigurationOptions={
        'core.default_task_retries': '3',
        'core.max_active_runs_per_dag': '1',
        'logging.logging_level': 'INFO',
    }
)

print(json.dumps(response, indent=2, default=str))

Terraform / CDK による IaC

# Terraform
resource "aws_mwaa_environment" "main" {
  name              = "prod-etl"
  airflow_version   = "2.8.1"
  environment_class = "mw1.medium"
  max_workers       = 20
  min_workers       = 2

  dag_s3_path              = "s3://${aws_s3_bucket.mwaa.id}/dags/"
  plugins_s3_path          = "s3://${aws_s3_bucket.mwaa.id}/plugins.zip"
  requirements_s3_path     = "s3://${aws_s3_bucket.mwaa.id}/requirements.txt"
  execution_role_arn       = aws_iam_role.mwaa_role.arn

  vpc_config {
    subnet_ids             = aws_subnet.private[*].id
    security_group_ids     = [aws_security_group.mwaa.id]
  }

  airflow_configuration_options = {
    "core.default_task_retries" = "3"
    "logging.logging_level"      = "INFO"
  }
}
# CDK
from aws_cdk import (
    aws_mwaa as mwaa,
    aws_iam as iam,
    aws_s3 as s3,
    core,
)

class MWAAStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs):
        super.__init__(scope, id, **kwargs)

        # S3 Bucket
        bucket = s3.Bucket(
            self, "MWAABucket",
            block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
        )

        # IAM Role
        role = iam.Role(
            self, "MWAARole",
            assumed_by=iam.ServicePrincipal("mwaa.amazonaws.com"),
        )
        role.add_to_policy(iam.PolicyStatement(
            actions=["s3:*"],
            resources=[bucket.arn_for_objects("*")],
        ))

        # MWAA Environment
        env = mwaa.CfnEnvironment(
            self, "MWAAEnvironment",
            name="prod-etl",
            airflow_version="2.8.1",
            environment_class="mw1.medium",
            max_workers=20,
            min_workers=2,
            dag_s3_path=f"s3://{bucket.bucket_name}/dags/",
            execution_role_arn=role.role_arn,
            network_configuration=mwaa.CfnEnvironment.NetworkConfigurationProperty(
                subnet_ids=["subnet-a", "subnet-b"],
                security_group_ids=["sg-mwaa"],
            ),
        )

CloudWatch Logs・X-Ray・メトリクス

CloudWatch Logs 構成

/aws/mwaa/{EnvironmentName}/
├── scheduler/
│   └── 2025-01-01T10:00:00Z_*.log  (DAG parse / task schedule)
├── worker/
│   └── 2025-01-01T10:00:00Z_*.log  (Task execution)
└── webserver/
    └── 2025-01-01T10:00:00Z_*.log  (Web UI access)

ログクエリ例

# Scheduler エラーを確認
aws logs filter-log-events \
  --log-group-name /aws/mwaa/prod-etl/scheduler \
  --filter-pattern "[ERROR]" \
  --start-time $(date -d '1 hour ago' +%s)000

# Worker 内で失敗したタスクを検索
aws logs filter-log-events \
  --log-group-name /aws/mwaa/prod-etl/worker \
  --filter-pattern "\"[FAILED]\"" \
  --start-time $(date -d '1 hour ago' +%s)000

CloudWatch メトリクス

メトリクス 説明
NumDagProcessFailures DAG Parse エラー数
NumTasksFailed 失敗したタスク数
NumTasksSuccess 成功したタスク数
DagDuration DAG 実行時間
TaskDuration タスク実行時間
NumSchedulerHeartbeats Scheduler ハートビート

X-Ray トレーシング

# MWAA Environment で X-Ray トレーシングを有効化
aws mwaa update-environment \
  --name prod-etl \
  --airflow-configuration-options \
    'logging.remote_logging=True' \
    'logging.remote_base_log_folder=s3://bucket/logs/'

Private Webserver・IAM 認証・セキュリティ

Private Webserver モード

# Private Webserver 設定(VPC 内からのみアクセス可能)
aws mwaa create-environment \
  --name prod-etl \
  --webserver-access-mode PRIVATE_ONLY \
  --network-configuration \
    SubnetIds=subnet-private-a,subnet-private-b

IAM 認証によるアクセス制御

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:user/data-engineer"
      },
      "Action": [
        "mwaa:GetEnvironment",
        "airflow:DescribeDAG",
        "airflow:ListDAGs"
      ],
      "Resource": "arn:aws:mwaa:us-east-1:123456789012:environment/prod-etl"
    }
  ]
}

セキュリティベストプラクティス

対策 説明
Private Webserver VPC 内からのみアクセス可能に設定
IAM Authentication IAM Roles for Service Accounts で credentials 不要
Secrets Manager Connection passwords を暗号化管理
VPC Endpoints S3・Secrets Manager への通信を VPC 内に閉じ込める
Encryption at Rest KMS で DAG・Log を暗号化
Execution Role 最小権限の IAM Role を割り当てる

VPC 構成・Network Security

VPC エンドポイント構成

# S3 Gateway Endpoint(NAT Gateway 不要)
aws ec2 create-vpc-endpoint \
  --vpc-id vpc-xxx \
  --service-name com.amazonaws.us-east-1.s3 \
  --route-table-ids rtb-xxx

# Secrets Manager Interface Endpoint
aws ec2 create-vpc-endpoint \
  --vpc-id vpc-xxx \
  --vpc-endpoint-type Interface \
  --service-name com.amazonaws.us-east-1.secretsmanager \
  --subnet-ids subnet-private-a subnet-private-b \
  --security-group-ids sg-vpc-endpoint

Security Group 設定

# MWAA Security Group
aws ec2 create-security-group \
  --group-name mwaa-sg \
  --description "MWAA Workers & Scheduler" \
  --vpc-id vpc-xxx

# 許可ルール(内部通信)
aws ec2 authorize-security-group-ingress \
  --group-id sg-mwaa \
  --protocol tcp \
  --port 5432 \
  --source-group sg-mwaa  # PostgreSQL metadata DB

# RDS への通信
aws ec2 authorize-security-group-ingress \
  --group-id sg-rds \
  --protocol tcp \
  --port 5432 \
  --source-group sg-mwaa

類似サービス比較

観点 MWAA Step Functions Astronomer Google Cloud Composer Prefect Dagster
インフラ管理 フルマネージド フルマネージド 手動(VM/K8s) フルマネージド 手動 / Cloud 手動 / Cloud
言語 Python DAG JSON/YAML ASL Python DAG Python DAG Python Python
スケーリング Auto(Worker) 自動(State Machine) 手動 Auto 手動 手動
AWS統合 ネイティブ ネイティブ Provider Limited Hook Resource
学習曲線 中(Airflow知識) 低(JSON宣言的)
コスト 中(Environment料金) 低(実行従量課金) 高(SaaS+管理) 低(オンプレ) 低(オンプレ)
複雑DAG対応 優秀 不向き 優秀 優秀 優秀 優秀
動的Task DynamicTaskMap Map State Limited Limited 優秀 優秀

ベストプラクティス

推奨事項(✅)

  • ✅ DAG ファイルをバージョン管理(git)で管理
  • ✅ requirements.txt で 明示的に依存パッケージを管理
  • ✅ S3 ベースのコード deploy で CI/CD 自動化
  • ✅ Secrets Manager で credentials 一元管理
  • ✅ CloudWatch Logs で スケジューラー・ワーカー・アプリログを監視
  • ✅ Task Retries・Timeout を適切に設定
  • ✅ Monitoring alerts(SNS・Slack)で障害通知
  • ✅ Private Webserver + IAM 認証で セキュリティ確保
  • ✅ Instance Class を 負荷パターンに応じて選定
  • ✅ max_active_runs_per_dag で リソース枯渇を防止

アンチパターン(❌)

  • ❌ DAG をコンソール直接編集(バージョン管理外)
  • ❌ Hard-coded credentials を DAG に埋め込む
  • ❌ Public Webserver を本番運用(認証なし)
  • ❌ Task リトライなし(デフォルト)で本番運用
  • ❌ 監視アラートなし(障害検知遅延)
  • ❌ mw1.micro で本番運用(リソース不足)
  • ❌ catchup=True で全期間バックフィル(リソース枯渇)
  • ❌ 複雑な依存関係を task 内ロジックで実装(保守困難)
  • ❌ Logging level=DEBUG で本番運用(ログ出力過多)

トラブルシューティング表

症状 原因 解決方法
DAG が表示されない S3 パスが違う / DAG syntax エラー aws s3 ls s3://bucket/dags/ で確認 / airflow dags test で syntax チェック
Task が stuck(実行されない) Worker スケーリング遅延 / Resource 枯渇 max_workers 上限確認 / Task duration 短縮
Task timeout Timeout 設定が短すぎる / 実行時間増加 execution_timeout 延長 / Task 分割
Redshift COPY 失敗 IAM Role 権限不足 / S3 path 誤り aws s3 ls s3://bucket/path/ で S3 確認 / IAM Role policy 確認
Scheduler restart ループ DAG Parse エラー / Memory 枯渇 CloudWatch Logs で scheduler error 確認 / Instance Class 上げる
Connection 取得失敗 Secrets Manager path 誤り / 暗号化キー権限なし aws secretsmanager get-secret-value でシークレット確認
Worker ログが出力されない Logging level 設定 / CloudWatch Logs 権限なし logging.logging_level=INFO で明示的に設定

近年の動向

  • Apache Airflow 3.0 対応予定:Deferrable task・Dynamic Task Mapping の標準化
  • Kubernetes Executor 強化:EKS との統合強化で大規模 DAG サポート
  • Bedrock 統合:生成 AI をワークフローに組み込む能力
  • DataOps 統合:dbt・Great Expectations との Native 統合
  • Multi-Region MWAA:クロスリージョンフェイルオーバー対応予定
  • Cost Optimization:Spot Instances for Worker Fleet でコスト 40% 削減

学習リソース

公式リソース

OSS / コミュニティ

AWS ベンダーリソース


実装例・チェックリスト

MWAA 導入チェックリスト

  • [ ] Airflow DAG 開発経験がある
  • [ ] S3 バケットを作成・IAM Policy を設定
  • [ ] Execution Role に必要な IAM permissions を追加
  • [ ] VPC・Subnet・Security Group を構成
  • [ ] requirements.txt で依存パッケージを指定
  • [ ] DAG を airflow dags test でローカル検証
  • [ ] S3 に DAG / plugins.zip / requirements.txt をアップロード
  • [ ] MWAA Environment を CLI / CDK で作成
  • [ ] Airflow Web UI にアクセス(Public / Private Webserver 確認)
  • [ ] CloudWatch Logs で Scheduler ログを確認
  • [ ] DAG をトリガーして Task 実行確認
  • [ ] Slack / SNS アラートを設定
  • [ ] 本番 Environment に昇格

まとめ

Amazon MWAA は 「Apache Airflow のフルマネージドサービス」 で、複雑なデータパイプライン・ETL ワークフロー・機械学習オーケストレーションを Python DAG で自動化できます。S3 ベースのコード管理・AWS サービス統合・CloudWatch Logs 監視を提供し、Glue・Redshift・SageMaker・Lambda との連携に優れています。Instance Class を負荷に応じて選定し、Auto Scaling・Secrets Manager 統合・IAM 認証でセキュリティを確保することで、エンタープライズグレードのデータエンジニアリングプラットフォームを実現できます。