Amazon EMR
Elastic MapReduce: ビッグデータ分析の統合プラットフォーム
Amazon EMR(Elastic MapReduce) は、Apache Spark・Hadoop・Hive・Presto・Flink・HBase などのオープンソース分析フレームワークを AWS 上で実行するフルマネージドクラスタープラットフォーム です。ペタバイト規模のデータ処理・機械学習・ストリーミング分析をスケーラブルに実行し、EC2・EKS・Serverless の 3 つのデプロイオプションで柔軟に対応します。このページでは EMR の概念・アーキテクチャ・実装・近年の動向を網羅的に解説します。
このページの目的
このページでは以下を対象としています。
- 初心者向け:EMR とは何か、Apache Spark とは何かを学びたい方
- データエンジニア向け:EMR on EC2 / EKS / Serverless を使い分けたい方
- SRE/DevOps 向け:EMR クラスターのコスト最適化・運用・セキュリティ
- アーキテクト向け:Databricks・Cloudera・Synapse Spark Pool との比較判断
- 意思決定者向け:EMR vs Glue vs Athena の投資判断
概要と本質
Amazon EMR は、自前で Spark/Hadoop クラスターを管理するのではなく、マネージドな分散処理基盤を提供するサービス です。
初心者向けメモ:
- EMR = マネージド Spark・Hadoop の実行環境
- インフラ管理(スケーリング・パッチ・アップグレード)を AWS が引き受ける
- データエンジニアはアプリケーション開発に集中できる
- ペタバイト規模のデータ処理・複雑な ETL・ML パイプラインに最適
EMR の位置づけ
graph LR
Data["データソース<br/>S3/RDS/Kinesis"]
EMR["Amazon EMR<br/>Spark/Hadoop/Hive/Presto"]
Results["クエリ結果<br/>S3/Redshift"]
ML["機械学習<br/>SageMaker"]
Viz["可視化<br/>QuickSight"]
Data -->|ETL| EMR
EMR -->|変換済みデータ| Results
EMR -->|特徴量| ML
Results -->|SSOT| Viz
EMR が解決する課題
| 課題 |
従来のアプローチ |
EMR による解決 |
| Spark クラスター管理 |
自前でマシン・OS・Java・Spark を構築・運用 |
マネージド EMR クラスター:セットアップ時間を数時間から数分に短縮 |
| スケーリング |
手動でノード追加・YARN 設定・リバランス |
自動スケーリング・Instance Fleets で複数インスタンスタイプに対応 |
| コスト最適化 |
オンデマンド EC2 インスタンスで運用 |
スポットインスタンス活用で最大 90% コスト削減 |
| パッチ管理 |
手動で OS・Spark・ライブラリをアップグレード |
AWS が定期的にセキュリティ更新・主要バージョン対応 |
| 高可用性 |
Hadoop HA・多数のノード管理が必要 |
マネージドで HA 構成・フェイルオーバー自動対応 |
| ペタバイト規模処理 |
大量メモリ・ストレージ管理が複雑 |
数千ノードまでスケール・EMRFS で S3 を シームレス利用 |
| ML パイプライン |
別途の ML インフラが必要 |
MLlib・SageMaker 統合で ML 対応 |
主な特徴
1. 3 つのデプロイオプション
| オプション |
説明 |
利点 |
欠点 |
| EMR on EC2 |
EC2 インスタンス上で実行 |
最高の柔軟性・細かい制御 |
インフラ管理負担 |
| EMR on EKS |
Kubernetes Pod で実行 |
K8s エコシステム統合・マルチテナント対応 |
学習コスト・EKS 管理 |
| EMR Serverless |
サーバーレス・自動スケール |
アイドル時間ゼロ・完全マネージド |
コスト予測困難・カスタマイズ制限 |
2. 複数フレームワーク対応
Apache Spark(最も一般的)
├── Spark SQL:ANSI SQL 準拠クエリ
├── Spark Streaming:リアルタイム処理
├── PySpark / Scala / Java / R
├── MLlib:分散機械学習
└── Spark on Ray(新):Python ネイティブ並列実行
Apache Hadoop & YARN
├── MapReduce:レガシーバッチ処理
├── YARN:リソースマネージャー
└── HDFS:分散ストレージ
Apache Hive
├── HiveQL:SQL ライク DSL
└── Metastore:スキーマ管理
Apache Presto / Trino
├── インタラクティブ SQL
├── 複数データソース連携
└── 低レイテンシ分析
その他
├── Apache Flink:ストリーミング処理
├── Apache HBase:NoSQL
├── Apache Hudi:ACID トランザクション・デルタ機能
├── Apache Iceberg v3:ACID・タイムトラベル・スナップショット
└── Delta Lake:Unity Catalog 連携
3. スポットインスタンスによるコスト最適化
EMR は Task ノードにスポットインスタンスを活用できます:
EC2 料金:$0.0928/時間(m5.xlarge オンデマンド)
スポット料金:$0.0278/時間(同じスペック)
→ 70% コスト削減
スポット中断時:Task ノードのみ終了(Core ノード・計算継続)
→ ジョブは再試行・自動リスケジュール
4. EMRFS:S3 をネイティブストレージとして利用
クラスター終了後もデータ永続
→ S3 は AWS マネージドストレージ・長期保存可能
HDFS より高速・柔軟
→ S3 Select:効率的なフィルタリング
→ 複数クラスター間でのデータ共有
階層化ストレージ対応
→ S3 Intelligent-Tiering でコスト最適化
アーキテクチャ
EMR on EC2 アーキテクチャ図
graph TB
subgraph Cluster["EMR Cluster(EC2)"]
Master["Master Node(1台)<br/>YARN ResourceManager<br/>HDFS NameNode<br/>Hadoop JobTracker"]
Core["Core ノード(1台以上)<br/>HDFS DataNode<br/>YARN NodeManager<br/>Spark Executor"]
Task["Task ノード(オプション)<br/>スポットインスタンス推奨<br/>Spark Executor のみ"]
end
subgraph Storage["ストレージ"]
EMRFS["EMRFS<br/>S3 を HDFS のように利用"]
S3["Amazon S3<br/>永続ストレージ"]
end
subgraph Framework["フレームワーク"]
Spark["Apache Spark"]
Hive["Apache Hive"]
Presto["Presto/Trino"]
end
Master --> |YARN リソース管理| Core
Master --> |YARN リソース管理| Task
Core --> EMRFS
Task --> EMRFS
EMRFS --> S3
Spark -.-> Framework
Hive -.-> Framework
Presto -.-> Framework
クラスター構成パターン
パターン 1:バッチ処理用クラスター
Master(1台・m5.xlarge)
↓ YARN ResourceManager
Core(3台・r5.2xlarge)
↓ HDFS DataNode / Spark Executor
Task(5-20台・スポット・m5.4xlarge)
↓ Spark Executor のみ・中断時は再起動
用途:夜間バッチ・日次 ETL・アドホッククエリ
特徴:処理終了後にクラスター削除(使い捨て)
パターン 2:長期実行クラスター
Master(1台・m5.xlarge)
Core(5台・r5.4xlarge)
↓ 常時起動・HDFS 永続
Task(2台オンデマンド+スケーリング用スポット)
用途:ストリーミング・24/7 ワークロード・複数ジョブの並行実行
特徴:クラスターを保持・複数チームで共有
リスク:未使用時も課金(スケーリングポリシーで最小化)
パターン 3:EMR Serverless
アプリケーション作成
→ Spark Job 送信
→ Worker 自動起動
→ ジョブ実行
→ 自動終了(数分後)
特徴:クラスター管理不要・アイドル時間ゼロ
課金:vCPU-時間 + メモリ-時間(ジョブ実行時間のみ)
デプロイオプション
1. EMR on EC2:最高の制御と柔軟性
特徴
タイプ: 完全管理 EC2 クラスター
初期化: ノード選択・ブートストラップスクリプト・カスタム設定可能
スケーリング: 自動スケーリング・Manual Resizing
コスト: EC2 料金 + EMR 上乗せ料金($0.048/時間・m5.xlarge)
管理: Master ノード への SSH 接続で細かいチューニング可能
ユースケース
- 大規模バッチ処理(1-10K ノード)
- カスタム設定が必要(特定 Spark バージョン・ローカルライブラリ)
- 他の EC2 サービスとの密結合(VPC・IAM ロール・セキュリティグループ)
- Hadoop エコシステム(Hive Metastore・HBase・複数フレームワーク)
実装例
aws emr create-cluster \
--name "spark-etl-prod" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--applications Name=Spark Name=Hive Name=Presto \
--bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
--log-uri s3://my-bucket/emr-logs/ \
--use-glue-catalog \
--enable-glue-data-catalog
aws emr add-steps \
--cluster-id j-xxxxxx \
--steps Type=Spark,Name="ETL Job",\
ActionOnFailure=TERMINATE_CLUSTER,\
Args=[s3://my-bucket/etl.py,--input,s3://data/,--output,s3://output/]
2. EMR on EKS:Kubernetes 統合
特徴
基盤: Amazon EKS(マネージド Kubernetes)
Pod 実行: 一時的な Spark Pod・自動スケーリング
リソース: EKS ノードプール上で実行・他のワークロードと共存
スケーリング: Pod オートスケーラー・Karpenter 対応
コスト: EKS 管理料 + EC2 ノード料金(クラスターレベル)
ユースケース
- Kubernetes 統合環境への Spark 統合
- マルチテナント・複数チームが同じ EKS クラスター利用
- Pod ライフサイクル管理が必要
- Istio・Prometheus などの K8s 監視スタック統合
実装例
aws emr-containers create-virtual-cluster \
--name spark-on-eks \
--container-provider '{
"type": "EKS",
"id": "my-eks-cluster",
"info": {
"eksConnectorConfig": {
"enabled": true
}
}
}'
aws emr-containers start-job-run \
--virtual-cluster-id 000000000000-00000 \
--name "etl-job" \
--execution-role-arn arn:aws:iam::123456789:role/EMRJobRole \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://my-bucket/etl.py",
"sparkSubmitParameters": "--conf spark.executor.memory=4g --conf spark.executor.cores=2"
}
}'
3. EMR Serverless:インフラレス実行
特徴
構造: 完全サーバーレス・自動スケーリング
初期化時間: 数十秒(クラスター不要)
スケーリング: Worker の自動プロビジョニング・アイドル時自動終了
課金: vCPU-時間 ($0.052624) + メモリ-時間 ($0.0057785/GB)
管理: CLI/API/SDK のみ・コンソール SSH なし
ユースケース
- 断続的・バースト処理(定期バッチ・イベント駆動)
- コスト管理が重要(未使用時間ゼロ)
- インフラ管理を最小化したい
- 数分以下の短時間ジョブ
実装例
aws emr-serverless create-application \
--name spark-etl-app \
--release-label emr-7.11.0 \
--type SPARK
aws emr-serverless start-job-run \
--application-id 00000000000000000 \
--execution-role-arn arn:aws:iam::123456789:role/EMRServerlessRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://my-bucket/etl.py",
"sparkSubmitParameters": "--conf spark.executor.memory=4g"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://my-bucket/logs/"
}
}
}'
aws emr-serverless create-application \
--name notebook-app \
--release-label emr-spark-8.0-preview \
--type SPARK
コアコンポーネント
1. クラスターマネージャー
YARN(Yet Another Resource Negotiator)
├── ResourceManager:リソース割り当て・スケジューリング
├── NodeManager:各ノードのリソース監視
├── ApplicationMaster:ジョブのライフサイクル管理
└── 出力:HDFS / S3 に結果保存
Spark StandaloneManager(EMR on EKS)
├── Kubernetes API:Pod 管理
├── Driver Pod:ジョブの親プロセス
└── Executor Pod:並列タスク実行
2. ストレージレイヤー
EMRFS(EMR File System)
├── S3 ネイティブ統合:s3a:// URI スキーム
├── 一貫性強化:DynamoDB でメタデータ管理
├── 暗号化:S3 KMS / SSE-S3
└── マルチパートアップロード最適化
HDFS(分散ストレージ)
├── NameNode(Master):ファイルシステム管理
├── DataNode(Core/Task):実データ保存
└── 用途:高速中間ストレージ・キャッシング
ローカルディスク
├── SSD / HDD インスタンス
├── スピンアップ ストレージ(コスト効率)
└── スピルオーバー用(メモリ不足時)
3. 主要フレームワーク
Apache Spark 4.0(2026 最新)
spark.sql("""
SELECT
data:customer.name AS name,
data:order.total::DECIMAL(10, 2) AS total
FROM json_table
WHERE data:region = 'Tokyo'
""")
df = spark.read.format("iceberg").load("s3://lake/products")
df.filter("price > 1000").write.format("iceberg")\
.mode("overwrite").save("s3://lake/products_v3")
spark.conf.set("spark.sql.iceberg.row_lineage.enabled", "true")
Apache Hive 4.0(SQL ウェアハウス)
CREATE EXTERNAL TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
created_at TIMESTAMP
)
PARTITIONED BY (region STRING)
STORED AS PARQUET
LOCATION 's3://lake/orders/';
SET hive.execution.mode=llap;
SELECT region, SUM(amount) as total
FROM orders
WHERE YEAR(created_at) = 2026
GROUP BY region;
Apache Presto / Trino(インタラクティブ SQL)
SELECT
o.order_id,
c.name,
o.amount
FROM hive.default.orders o
JOIN postgres.public.customers c
ON o.customer_id = c.id
WHERE o.created_at > CURRENT_DATE - INTERVAL '7' DAY;
Apache Flink(ストリーミング処理)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment;
DataStream<String> stream = env.addSource(
new FlinkKinesisConsumer<>("order-stream",
new SimpleStringSchema,
consumerConfig));
stream.map(new MapFunction<String, Order> {
public Order map(String value) {
return parseOrder(value);
}
})
.filter(o -> o.amount > 1000)
.sinkTo(new S3CommitSink("s3://lake/filtered-orders/"));
env.execute("Streaming ETL");
Apache Hudi・Iceberg・Delta Lake(ACID テーブル)
df.write.format("hudi") \
.option("hoodie.table.name", "orders") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "order_id") \
.option("hoodie.datasource.write.partitionpath.field", "region") \
.save("s3://lake/orders-hudi")
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("iceberg").getOrCreate
df = spark.read.option("as-of-timestamp", "2026-04-26 10:00:00") \
.format("iceberg").load("s3://lake/products")
EMR on EC2
クラスター構成の詳細
Master ノード設定:
インスタンスタイプ: m5.xlarge(4 vCPU, 16 GB)以上推奨
用途:
- YARN ResourceManager
- HDFS NameNode
- Hive Metastore
- Spark Driver(スタンドアロン実行時)
特性: 単一障害点・高可用性には複数マスター(本番環境)
ストレージ: 100 GB EBS(ログ・メタデータ)
Core ノード設定:
インスタンスタイプ: r5.2xlarge(8 vCPU, 64 GB)~ r6.4xlarge(16 vCPU, 128 GB)
用途:
- HDFS DataNode
- YARN NodeManager・Executor ホスト
- HDFS レプリケーション保持
最小台数: 3 台(HDFS レプリケーション = 3)
特性: クラスター削除時にデータ손실(S3 EMRFS 使用で回避)
ストレージ: 大容量 EBS + インスタンスストア
Task ノード設定:
インスタンスタイプ: m5.4xlarge(16 vCPU, 64 GB)~ c6.9xlarge(36 vCPU)
用途: Spark Executor コンピュート・HDFS レプリケーション不要
スポット推奨: Task ノード = スポットインスタンス活用で コスト 70-90% 削減
スケーリング: 自動スケーリングポリシー(CPU・メモリに基づく)
フェイルオーバー: Task 노드 중단 → 다른 노드로 재시작
Instance Fleet(インスタンスタイプの組み合わせ)
import boto3
emr = boto3.client('emr')
response = emr.create_cluster(
Name='instance-fleet-cluster',
ReleaseLabel='emr-7.11.0',
Applications=[{'Name': 'Spark'}, {'Name': 'Hive'}],
Instances={
'InstanceFleets': [
{
'Name': 'Master Fleet',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{'InstanceType': 'm5.xlarge'},
{'InstanceType': 'm5.2xlarge'}
]
},
{
'Name': 'Core Fleet',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 2,
'TargetSpotCapacity': 8,
'InstanceTypeConfigs': [
{'InstanceType': 'r5.2xlarge', 'WeightedCapacity': '2'},
{'InstanceType': 'r5.4xlarge', 'WeightedCapacity': '4'},
{'InstanceType': 'r6.2xlarge', 'WeightedCapacity': '2'}
]
},
{
'Name': 'Task Fleet',
'InstanceFleetType': 'TASK',
'TargetOnDemandCapacity': 0,
'TargetSpotCapacity': 20,
'InstanceTypeConfigs': [
{'InstanceType': 'm5.4xlarge', 'BidPrice': '0.50'},
{'InstanceType': 'm6.4xlarge', 'BidPrice': '0.60'},
{'InstanceType': 'c5.4xlarge', 'BidPrice': '0.45'}
]
}
]
}
)
ブートストラップアクション(初期化スクリプト)
#!/bin/bash
pip install pandas numpy scikit-learn xgboost
cat >> /etc/spark/conf/spark-defaults.conf <<EOF
spark.driver.maxResultSize=4g
spark.sql.shuffle.partitions=200
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.kryoserializer.buffer.max=512m
EOF
echo "log4j.rootCategory=WARN, console" > /etc/spark/conf/log4j.properties
systemctl start cloudwatch-agent
ステップ(Job)投入
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Spark,Name="Data ETL",\
Args=[--class,com.example.DataETL,\
s3://my-bucket/etl-job.jar,\
--input,s3://raw-data/,\
--output,s3://processed-data/]
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Hive,Name="Aggregation",\
HiveScript=s3://my-bucket/query.sql,\
Args=[-d,INPUT_PATH=s3://data/,-d,OUTPUT_PATH=s3://output/]
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=Pig,Name="Data Transformation",\
PigScript=s3://my-bucket/transform.pig
aws emr add-steps --cluster-id j-xxxxxx \
--steps Type=ShellCommand,Name="Cleanup",\
Args=[s3://my-bucket/cleanup.sh,s3://output/]
EMR on EKS
仮想クラスター(Virtual Cluster)管理
aws emr-containers create-virtual-cluster \
--name "data-science-vpc" \
--container-provider \
type=EKS,\
id=my-eks-cluster,\
info="{eksConnectorConfig: {enabled: true}}"
kubectl create namespace spark-jobs
kubectl label namespace spark-jobs spark-enabled=true
aws iam create-role --role-name EMRJobExecutionRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "emr-containers.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}'
aws iam put-role-policy --role-name EMRJobExecutionRole \
--policy-name S3Access --policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": "*"
}]
}'
Spark ジョブ投入(EMR on EKS)
import boto3
emr_containers = boto3.client('emr-containers')
response = emr_containers.start_job_run(
virtualClusterId='000000000000-00000',
name='ml-feature-engineering',
releaseLabel='emr-7.11.0-latest',
executionRoleArn='arn:aws:iam::123456789:role/EMRJobExecutionRole',
jobDriver={
'sparkSubmitJobDriver': {
'entryPoint': 's3://my-bucket/feature-engineering.py',
'sparkSubmitParameters': (
'--conf spark.executor.memory=8g '
'--conf spark.executor.cores=4 '
'--conf spark.driver.memory=2g '
'--conf spark.sql.adaptive.enabled=true'
),
'entryPointArguments': ['--input', 's3://raw-data/', '--output', 's3://features/']
}
},
configurationOverrides={
'monitoringConfiguration': {
'persistentAppUI': 'ENABLED',
's3MonitoringConfiguration': {
'logUri': 's3://my-bucket/emr-eks-logs/'
},
'cloudWatchMonitoringConfiguration': {
'logGroupName': '/emr-on-eks/jobs',
'logStreamNamePrefix': 'feature-eng'
}
}
},
tags={
'environment': 'production',
'team': 'data-science'
}
)
print(f"Job ID: {response['id']}")
EMR Serverless
アプリケーション管理
import boto3
emr_serverless = boto3.client('emr-serverless')
app_response = emr_serverless.create_application(
name='batch-etl-serverless',
releaseLabel='emr-7.11.0',
type='SPARK',
clientToken='unique-request-id-12345',
tags={
'environment': 'production',
'cost-center': 'data-eng'
}
)
app_id = app_response['applicationId']
print(f"Created application: {app_id}")
app_details = emr_serverless.get_application(applicationId=app_id)
print(f"Status: {app_details['application']['status']}")
バッチジョブ投入(EMR Serverless)
job_response = emr_serverless.start_job_run(
applicationId=app_id,
clientToken='unique-job-token-67890',
executionRoleArn='arn:aws:iam::123456789:role/EMRServerlessRole',
jobDriver={
'sparkSubmit': {
'entryPoint': 's3://my-bucket/etl-pipeline.py',
'entryPointArguments': [
'--date', '2026-04-26',
'--input-bucket', 's3://raw-data/',
'--output-bucket', 's3://processed-data/'
],
'sparkSubmitParameters': (
'--conf spark.executor.memory=4g '
'--conf spark.executor.cores=2 '
'--conf spark.driver.memory=2g '
'--conf spark.dynamicAllocation.enabled=true'
)
}
},
configurationOverrides={
'monitoringConfiguration': {
's3MonitoringConfiguration': {
'logUri': 's3://my-bucket/serverless-logs/',
'encryptionKeyArn': 'arn:aws:kms:ap-northeast-1:123456789:key/12345678-1234-1234-1234-123456789012'
},
'cloudWatchMonitoringConfiguration': {
'logGroupName': '/emr-serverless/jobs',
'logStreamNamePrefix': 'etl-batch'
}
},
'workerTypeSpecifications': {
'DRIVER': {
'diskSize': 30
},
'EXECUTOR': {
'diskSize': 100
}
}
},
tags={
'pipeline': 'daily-etl',
'env': 'prod'
}
)
print(f"Job ID: {job_response['jobRunId']}")
Pre-initialized Capacity(キャッシュ)
emr_serverless.create_application(
name='low-latency-app',
releaseLabel='emr-7.11.0',
type='SPARK',
initialCapacity={
'DRIVER': {
'workerCount': 1,
'workerConfiguration': {
'cpu': '2',
'memory': '8GB',
'disk': '30GB'
}
},
'EXECUTOR': {
'workerCount': 10,
'workerConfiguration': {
'cpu': '4',
'memory': '16GB',
'disk': '100GB'
}
}
}
)
EMR Studio:統合開発環境
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
spark = SparkSession.builder.appName("clustering").getOrCreate
df = spark.read.parquet("s3://lake/customers/")
assembler = VectorAssembler(
inputCols=["age", "salary", "purchases"],
outputCol="features"
)
features = assembler.transform(df)
kmeans = KMeans(k=5, seed=42)
model = kmeans.fit(features)
model.transform(features).write.parquet("s3://lake/customer-segments/")
セキュリティ
暗号化
転送中の暗号化:
TLS 1.2+: ノード間・Spark 通信
Kerberos: クラスター内認証(オプション)
保存時の暗号化:
S3: KMS CMK / SSE-S3
EBS: AWS KMS キー
HDFS: Hadoop 暗号化ゾーン
アクセス制御:
IAM Role: EC2 インスタンス・EMR ジョブに付与
Security Group: ノード間・外部通信制限
Lake Formation: 列・行レベルのアクセス制御
VPC Endpoint: プライベート接続(S3・DynamoDB)
Kerberos 有効化
aws emr create-cluster \
--release-label emr-7.11.0 \
--security-configuration kerberos-config \
--ec2-key-name my-keypair
aws emr create-security-configuration \
--name kerberos-config \
--security-configuration '{
"KerberosAttributes": {
"KdcAdminPassword": "MyKdcPassword",
"Realm": "AWS.EXAMPLE.COM"
}
}'
spark.sql("""
GRANT SELECT (order_id, customer_id, amount)
ON TABLE orders
TO ROLE analyst_role;
""")
spark.sql("""
GRANT SELECT (id, name, region)
ON TABLE customers
TO ROLE analyst_role;
""")
ネットワーク統合
VPC 内配置
推奨構成:
Master・Core ノード: プライベートサブネット
NAT Gateway: アウトバウンド通信(インターネット)
VPC Gateway Endpoint: S3・DynamoDB への低レイテンシアクセス
Security Group: YARN(8088)・Spark UI(4040)制限
VPC Endpoint 設定:
- S3: ゲートウェイエンドポイント(無料)
- DynamoDB: ゲートウェイエンドポイント(無料)
- CloudWatch Logs: インターフェースエンドポイント
- ECR: Docker イメージ取得時
セキュリティグループ
Master ノード受信ルール:
- 22/tcp(SSH): 管理者のみ
- 8088/tcp(YARN Web): VPC 内のみ
- 9870/tcp(NameNode): VPC 内のみ
- 4040/tcp(Spark UI): VPC 内のみ
Core/Task ノード受信ルール:
- 受信:Master / Core ノードからの全トラフィック
- 受信:VPC 内のクライアント(ポート 1024-65535)
- 送信:すべて許可(S3・ログ出力)
コスト管理
コスト構成
EMR on EC2:
EC2 インスタンス料金
Master: m5.xlarge × $0.192/時間
Core: r5.2xlarge × 3 × $0.504/時間 = $1.512/時間
Task(スポット): m5.4xlarge × 10 × $0.0928/時間 = $0.928/時間
小計:$2.632/時間 × 720 時間(月間) = $1,895/月
EMR 上乗せ料金:$0.048/時間 × 13 インスタンス = $0.624/時間
月間:$0.624/時間 × 720 = $449/月
S3 ストレージ:$0.023/GB-月
CloudWatch Logs:$0.50/GB
月間合計(常時起動):$2,900~$3,200
最適化戦略:
1. 使い捨てクラスター:処理終了後に削除
2. スポットインスタンス活用:Task ノード 70-90% 削減
3. EMR Serverless:短時間ジョブ・バースト処理に限定
4. Spot Instance Interruption Handling:Task ノード損失時の自動再試行
5. CloudWatch カスタムメトリクス:効率的なスケーリングルール
コスト削減のベストプラクティス
| 対策 |
削減率 |
効果 |
実装難度 |
| スポットインスタンス活用 |
70-90% |
Task ノードの大部分を置換可 |
中 |
| 使い捨てクラスター |
60% |
長期間クラスター保有排除 |
低 |
| EMR Serverless |
50-80% |
アイドル時間ゼロ |
低 |
| オンデマンド容量予約 |
30% |
月次予測可能 |
中 |
| ダウンタイムスケーリング |
40-60% |
夜間・休日自動縮小 |
高 |
| ローカルキャッシング |
20% |
中間データ HDFS 優先 |
中 |
| Glue ETL への移行 |
40-50% |
サーバーレスで管理削減 |
高 |
比較
EMR vs Glue vs Athena
| 項目 |
EMR |
Glue |
Athena |
| 用途 |
大規模 ETL・複雑な Spark 処理 |
中規模 ETL・自動化 |
Ad hoc SQL 分析 |
| エンジン |
Spark・Hadoop・Hive・Presto |
Spark(サーバーレス) |
Presto/Trino |
| 管理 |
クラスター構築・運用 |
サーバーレス・自動スケール |
サーバーレス・即実行 |
| 柔軟性 |
最高(細かい制御可) |
中(カスタム Spark コード) |
低(SQL のみ) |
| スケール |
ペタバイト対応 |
テラバイト~ペタバイト |
テラバイト対応 |
| 学習コスト |
高(Spark・Hadoop 知識必要) |
中 |
低(SQL のみ) |
| コスト |
安定運用で安い / 管理負担 |
中程度 |
データ量に応じた従量制 |
| データ発見 |
なし |
Glue Catalog |
Glue Catalog 統合 |
| 採用判断 |
複雑な ML パイプライン・カスタマイズ |
一般的な ETL・自動化 |
探索的分析・ダッシュボード |
EMR vs Databricks
| 項目 |
EMR |
Databricks |
| プロバイダー |
AWS マネージド |
SaaS・マルチクラウド |
| 基盤 |
Spark on EC2/EKS |
最適化 Spark + SQLとML |
| Lakehouse 機能 |
Delta Lake / Iceberg / Hudi |
Unity Catalog(統一メタストア) |
| AI/ML 統合 |
SageMaker 連携 |
搭載・AutoML 提供 |
| Notebooks |
EMR Studio |
Databricks Notebooks(優位) |
| コスト |
インフラ管理が必要 |
SaaS 従量制・高マークアップ |
| Lock-in |
AWS エコシステム |
マルチクラウド対応・脱出容易 |
| 採用 |
AWS 専一企業・コスト重視 |
マルチクラウド・エンタープライズ |
| 項目 |
EMR |
Cloudera |
| 運用 |
AWS マネージド |
オンプレ・ハイブリッド |
| Hadoop/Spark |
主要バージョン対応 |
エンタープライズ版 |
| セキュリティ |
IAM・KMS 統合 |
Ranger・LDAP 統合 |
| Data Governance |
Lake Formation |
Hue・Ranger RBAC |
| Cloud Native |
クラウド中心 |
オンプレ中心・クラウド対応 |
| コスト |
変動費・スケール有利 |
ライセンス費用 |
| 採用 |
AWS 環境 |
オンプレ・規制業界 |
主要ユースケース
1. リアルタイムストリーミング分析
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("streaming-etl").getOrCreate
kinesis_df = spark \
.readStream \
.format("kinesis") \
.option("streamName", "order-stream") \
.option("region", "ap-northeast-1") \
.option("initialPosition", "TRIM_HORIZON") \
.load
result = kinesis_df \
.select(
from_json(col("data").cast("string"),
"order_id INT, customer_id INT, amount DOUBLE, timestamp TIMESTAMP"
).alias("event")
) \
.select("event.*") \
.filter(col("amount") > 1000) \
.groupBy(window("timestamp", "1 minute"), "customer_id") \
.agg(sum("amount").alias("total_amount"))
result.writeStream \
.format("parquet") \
.option("path", "s3://lake/high-value-orders/") \
.option("checkpointLocation", "s3://lake/checkpoints/kinesis-streaming") \
.start \
.awaitTermination
2. 機械学習特徴量エンジニアリング
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, when, to_date, datediff
spark = SparkSession.builder.appName("feature-engineering").getOrCreate
customers = spark.read.parquet("s3://lake/customers/")
orders = spark.read.parquet("s3://lake/orders/")
products = spark.read.parquet("s3://lake/products/")
customer_features = orders \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("lifetime_value"),
avg("amount").alias("avg_order_value"),
max("created_at").alias("last_purchase_date"),
datediff(current_timestamp, max("created_at")).alias("days_since_purchase")
)
product_preferences = orders \
.join(products, "product_id") \
.groupBy("customer_id") \
.agg(
collect_set("category").alias("categories_purchased"),
count_distinct("product_id").alias("unique_products")
)
final_features = customer_features \
.join(product_preferences, "customer_id") \
.join(customers, "customer_id")
assembler = VectorAssembler(
inputCols=["total_orders", "lifetime_value", "avg_order_value", "days_since_purchase"],
outputCol="features"
)
features = assembler.transform(final_features)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features"
)
model = scaler.fit(features)
scaled_features = model.transform(features)
scaled_features.write \
.format("iceberg") \
.mode("overwrite") \
.save("s3://feature-store/customer-features/")
3. 多言語データ処理(ETL パイプライン)
spark.sql("""
-- Hive で大規模テーブル初期化
CREATE TABLE IF NOT EXISTS order_summary
USING PARQUET
PARTITIONED BY (region)
AS
SELECT
order_id,
customer_id,
SUM(amount) as total,
COUNT(*) as item_count,
YEAR(order_date) as year,
MONTH(order_date) as month,
QUARTER(order_date) as quarter,
region
FROM raw_orders
GROUP BY order_id, customer_id, region, YEAR(order_date), MONTH(order_date), QUARTER(order_date)
""")
from pyspark.sql.window import Window
df = spark.read.table("order_summary")
window_spec = Window \
.partitionBy("region") \
.orderBy("year", "month") \
.rangeBetween(-3, 0)
result = df.withColumn(
"moving_avg_3m",
avg("total").over(window_spec)
)
spark.sql("""
SELECT
order_id,
json_data:customer.name AS customer_name,
CAST(json_data:metadata.created_at AS TIMESTAMP) AS created_date,
CAST(json_data:pricing.total::DECIMAL(10,2) AS DECIMAL(10,2)) AS total_price
FROM json_orders
WHERE YEAR(CAST(json_data:metadata.created_at AS TIMESTAMP)) = 2026
""")
CLI 実装例
クラスター作成・管理
CLUSTER_ID=$(aws emr create-cluster \
--name "data-pipeline-prod" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--applications Name=Spark Name=Hive Name=Presto Name=Hadoop \
--bootstrap-actions Path=s3://my-bucket/bootstrap.sh \
--log-uri s3://my-bucket/emr-logs/ \
--use-glue-catalog \
--enable-glue-data-catalog \
--tags Key=Environment,Value=production Key=Team,Value=data-eng \
--query 'ClusterId' \
--output text)
echo "Cluster created: $CLUSTER_ID"
aws emr describe-cluster --cluster-id $CLUSTER_ID \
--query 'Cluster.[Id,Name,Status.State,MasterPublicDNSName]' \
--output table
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps Type=Spark,Name="ETL Job",ActionOnFailure=TERMINATE_CLUSTER,\
Args=[--master,yarn,\
--deploy-mode,cluster,\
--executor-memory,4g,\
--executor-cores,2,\
--num-executors,8,\
s3://my-bucket/etl.py,\
--input,s3://raw-data/,\
--output,s3://processed/]
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps Type=Hive,Name="Data Aggregation",ActionOnFailure=CONTINUE,\
HiveScript=s3://my-bucket/queries/aggregation.sql,\
Args=[-d,INPUT_DATE=2026-04-26]
aws emr terminate-clusters --cluster-ids $CLUSTER_ID
自動スケーリング設定
aws emr create-cluster \
--name "autoscaling-cluster" \
--release-label emr-7.11.0 \
--instance-type m5.xlarge \
--instance-count 10 \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--configurations '[
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.job.reduces": "0"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true"
}
}
]'
aws emr put-auto-scaling-policy \
--cluster-id j-xxxxxx \
--instance-group-type TASK \
--auto-scaling-policy '{
"Constraints": {
"MinCapacity": 2,
"MaxCapacity": 50
},
"Rules": [
{
"Name": "Scale Out",
"Description": "CPU 使用率 80% 以上でスケールアウト",
"Action": {
"Market": "SPOT",
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ADD_CAPACITY",
"ScalingAdjustment": 5,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Namespace": "AWS/EC2",
"Statistic": "AVERAGE",
"Unit": "PERCENT",
"Dimensions": [{"Name": "InstanceGroup", "Value": "TASK"}],
"Period": 300,
"EvaluationPeriods": 2,
"Threshold": 80,
"ComparisonOperator": "GREATER_THAN_OR_EQUAL"
}
}
},
{
"Name": "Scale In",
"Description": "CPU 使用率 20% 以下でスケールイン",
"Action": {
"Market": "SPOT",
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "REMOVE_CAPACITY",
"ScalingAdjustment": 5,
"CoolDown": 600
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Namespace": "AWS/EC2",
"Statistic": "AVERAGE",
"Unit": "PERCENT",
"Period": 300,
"EvaluationPeriods": 3,
"Threshold": 20,
"ComparisonOperator": "LESS_THAN"
}
}
}
]
}'
SDK 実装例
Python Boto3
import boto3
from datetime import datetime
emr = boto3.client('emr', region_name='ap-northeast-1')
def create_emr_cluster(cluster_name, num_instances=10):
"""EMR クラスター作成"""
response = emr.create_cluster(
Name=cluster_name,
ReleaseLabel='emr-7.11.0',
Applications=[
{'Name': 'Spark'},
{'Name': 'Hive'},
{'Name': 'Presto'},
{'Name': 'Hadoop'},
{'Name': 'Hudi'},
{'Name': 'Iceberg'}
],
Instances={
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'r5.2xlarge',
'InstanceCount': 3,
},
{
'Name': 'Task',
'Market': 'SPOT',
'InstanceRole': 'TASK',
'InstanceType': 'm5.4xlarge',
'InstanceCount': num_instances - 4,
'BidPrice': '0.50'
}
],
'Ec2KeyName': 'my-key-pair',
'KeepJobFlowAliveWhenNoSteps': False
},
LogUri='s3://my-bucket/emr-logs/',
Tags=[
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'CreatedBy', 'Value': 'DataPipeline'},
{'Key': 'CreatedAt', 'Value': datetime.now.isoformat}
],
BootstrapActions=[
{
'Name': 'Install Custom Libraries',
'ScriptBootstrapAction': {
'Path': 's3://my-bucket/bootstrap.sh'
}
}
],
Configurations=[
{
'Classification': 'spark-defaults',
'Properties': {
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
'spark.dynamicAllocation.enabled': 'true'
}
},
{
'Classification': 'yarn-site',
'Properties': {
'yarn.scheduler.capacity.maximum-am-resource-percent': '0.5'
}
}
]
)
return response['JobFlowId']
def submit_spark_job(cluster_id, script_path, args):
"""Spark ジョブ投入"""
response = emr.add_steps(
ClusterId=cluster_id,
Steps=[
{
'Name': 'Spark ETL Job',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class', 'com.example.ETLJob',
'--deploy-mode', 'cluster',
'--executor-memory', '4g',
'--executor-cores', '2',
'--num-executors', '8',
script_path
] + args
}
}
]
)
return response['StepIds'][0]
def monitor_cluster(cluster_id):
"""クラスター状態監視"""
response = emr.describe_cluster(ClusterId=cluster_id)
cluster = response['Cluster']
print(f"Cluster: {cluster['Name']}")
print(f"Status: {cluster['Status']['State']}")
print(f"Master: {cluster.get('MasterPublicDNSName', 'N/A')}")
print(f"Nodes: {cluster['RequestedInstanceCount']}")
print(f"Created: {cluster['Status']['Timeline']['CreationDateTime']}")
return cluster['Status']['State']
if __name__ == '__main__':
cluster_id = create_emr_cluster('production-etl', num_instances=15)
print(f"Created cluster: {cluster_id}")
step_id = submit_spark_job(
cluster_id,
's3://my-bucket/etl-pipeline.jar',
['--input', 's3://raw-data/', '--output', 's3://processed/']
)
print(f"Submitted step: {step_id}")
status = monitor_cluster(cluster_id)
print(f"Current status: {status}")
Infrastructure as Code
# variables.tf
variable "cluster_name" {
default = "emr-production"
}
variable "instance_count" {
default = 10
}
variable "spot_price" {
default = "0.50"
}
# main.tf
resource "aws_emr_cluster" "main" {
name = var.cluster_name
release_label = "emr-7.11.0"
log_uri = "s3://${aws_s3_bucket.logs.id}/emr-logs/"
instance_group {
instance_role = "MASTER"
instance_type = "m5.xlarge"
instance_count = 1
}
instance_group {
instance_role = "CORE"
instance_type = "r5.2xlarge"
instance_count = 3
}
instance_group {
instance_role = "TASK"
instance_type = "m5.4xlarge"
instance_count = var.instance_count - 4
bid_price = var.spot_price
}
bootstrap_action {
path = "s3://${aws_s3_bucket.scripts.id}/bootstrap.sh"
name = "Install Libraries"
}
applications {
name = "Spark"
}
applications {
name = "Hive"
}
applications {
name = "Presto"
}
applications {
name = "Hudi"
}
applications {
name = "Iceberg"
}
service_role = aws_iam_role.emr_service_role.arn
instance_profile = aws_iam_instance_profile.ec2_instance_profile.arn
configurations = jsonencode([
{
Classification = "spark-defaults"
Properties = {
"spark.executor.memory" = "4g"
"spark.executor.cores" = "2"
"spark.sql.adaptive.enabled" = "true"
"spark.sql.shuffle.partitions" = "200"
}
}
])
tags = {
Environment = "production"
Team = "data-eng"
}
}
# IAM Role
resource "aws_iam_role" "emr_service_role" {
name = "emr-service-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "elasticmapreduce.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "emr_service_policy" {
role = aws_iam_role.emr_service_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"
}
AWSTemplateFormatVersion: '2010-09-09'
Description: 'EMR Production Cluster with Spark, Hive, Presto, Hudi, Iceberg'
Parameters:
InstanceCount:
Type: Number
Default: 10
SpotPrice:
Type: String
Default: '0.50'
Resources:
EMRCluster:
Type: AWS::EMR::Cluster
Properties:
Name: emr-production
ReleaseLabel: emr-7.11.0
Applications:
- Name: Spark
- Name: Hive
- Name: Presto
- Name: Hudi
- Name: Iceberg
Instances:
InstanceGroups:
- Name: Master
Market: ON_DEMAND
InstanceRole: MASTER
InstanceType: m5.xlarge
InstanceCount: 1
- Name: Core
Market: ON_DEMAND
InstanceRole: CORE
InstanceType: r5.2xlarge
InstanceCount: 3
- Name: Task
Market: SPOT
InstanceRole: TASK
InstanceType: m5.4xlarge
InstanceCount: !Sub '${InstanceCount - 4}'
BidPrice: !Ref SpotPrice
LogUri: s3://my-bucket/emr-logs/
ServiceRole: !GetAtt EMRServiceRole.Arn
JobFlowRole: !GetAtt EC2InstanceProfile.Arn
Tags:
- Key: Environment
Value: production
- Key: Team
Value: data-eng
EMRServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: elasticmapreduce.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole
EC2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Roles:
- !Ref EC2Role
EC2Role:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
Outputs:
ClusterId:
Value: !Ref EMRCluster
Description: EMR Cluster ID
ClusterArn:
Value: !GetAtt EMRCluster.Arn
Description: EMR Cluster ARN
ベストプラクティス
✅ 推奨事項
| 項目 |
実装 |
効果 |
| スポットインスタンス活用 |
Task ノード 100% スポット |
70-90% コスト削減 |
| 使い捨てクラスター |
KeepJobFlowAliveWhenNoSteps=false |
月額コスト 60% 削減 |
| EMRFS(S3)利用 |
HDFS より S3 優先 |
永続性・他クラスター間共有 |
| Glue Catalog 統合 |
UseGlueCatalog=true |
メタデータ一元化 |
| パーティション設計 |
日付・リージョンで分割 |
クエリ高速化 20-100 倍 |
| Spark 最適化設定 |
spark.sql.adaptive.enabled=true |
自動チューニング・効率化 |
| CloudWatch 監視 |
カスタムメトリクス設定 |
異常早期検知・自動スケーリング |
| Security Group 制限 |
Master SSH は VPN 経由のみ |
セキュリティ向上 |
| Lake Formation 統合 |
列・行レベルアクセス制御 |
ガバナンス統一 |
| Infrastructure as Code |
Terraform / CloudFormation |
バージョン管理・再現性 |
❌ 反パターン
| 反パターン |
理由 |
改善策 |
| 常時起動クラスター |
未使用時も課金(数万円/月) |
使い捨てか Serverless に変更 |
| オンデマンドインスタンスのみ |
高コスト |
Task ノードをスポット化 |
| HDFS をプライマリストレージ |
クラスター削除でデータ喪失 |
EMRFS(S3)優先 |
| クラスターサイジング過大 |
無駄なリソース・高コスト |
自動スケーリング導入 |
| ログ設定なし |
トラブル発生時デバッグ困難 |
CloudWatch/S3 ログ設定必須 |
| IAM ロール権限過剰 |
セキュリティリスク |
最小権限の原則(特定 S3 バケット・KMS キー) |
| Spark デフォルト設定 |
非効率(パーティション少ない・メモリ不足) |
環境に合わせたチューニング |
| エラーハンドリング不足 |
タイムアウト・再試行ループ |
Step ActionOnFailure 設定・Circuit Breaker |
トラブルシューティング
| 症状 |
原因 |
解決策 |
| Out of Memory(OOM) |
Executor メモリ不足 |
spark.executor.memory 増加 / パーティション数増加 |
| Job が遅い |
リソース不足 |
spark.sql.adaptive.enabled=true / パーティション最適化 |
| スポット中断 |
インスタンス返却 |
複数インスタンスタイプ指定 / Instance Fleet 使用 |
| YARN リソースリーク |
Executor の正常終了失敗 |
spark.dynamicAllocation.enabled=true |
| Glue Catalog 接続失敗 |
VPC Endpoint 不足 |
S3 Gateway Endpoint 確認 |
| 暗号化キー拒否 |
IAM KMS 権限不足 |
クラスターロールに KMS DecryptDecrypt 権限付与 |
| ノード損失中の処理停止 |
スポット中断時に再配置不能 |
Instance Fleet で複数タイプ / Core ノード活用 |
| Spark UI アクセス不可 |
セキュリティグループ制限 |
Master ノード VPC 内アクセスのみ / SSH トンネル |
| データ整合性エラー |
部分的な書き込み失敗 |
ACID テーブル(Iceberg/Hudi)導入 |
近年の動向
1. Apache Spark 4.0 リリース
Spark 4.0.1 の主要機能:
✅ ANSI SQL 完全準拠
✅ VARIANT データ型(JSON ネイティブ処理)
✅ Apache Iceberg v3 サポート(削除ベクトル・行レベルリネージ)
✅ Streaming 強化(バッファ管理・遅延メトリクス)
✅ Python API 改善(type hints・dataclass サポート)
2. Iceberg v3 の ACID トランザクション
2025 年の大きな進化:
✅ Row-level lineage 追跡
✅ Deletion Vectors(削除マーク)による効率的な DELETE 操作
✅ Time Travel クエリ(スナップショット参照)
✅ AWS Lake Formation との統合(列・行レベルアクセス制御)
✅ EMR 7.11.0 より標準対応
3. EMR Serverless の進化
PySpark Notebook 対応(新機能):
✅ emr-spark-8.0-preview で Spark 4.0 + Notebook 対応
✅ Jupyter ライク環境・Git 統合
✅ 段階的な追加課金なし(vCPU-時間のみ)
Pre-initialized Capacity 拡充:
✅ ウォームスタンバイ Worker(1-10 分コールドスタート回避)
✅ 予測可能なコスト(月額固定費用)
4. EMR on EKS の実運用拡大
- Karpenter 統合(2026):
- ✅ Pod 自動スケーリング(ASG より効率的)
- ✅ コスト最適化(混合インスタンスタイプ自動選択)
- ✅ マルチテナント対応強化
5. AI/ML 統合の深化
SageMaker との統合:
✅ Feature Store への直接書き込み
✅ Spark ML → SageMaker Autopilot への自動流入
✅ EMR ジョブの SageMaker Pipelines 統合
Ray on EMR(新):
✅ Python ネイティブ並列実行
✅ Reinforcement Learning 対応
✅ HyperParameter Tuning 効率化
学習リソース
公式ドキュメント
オンラインコース
- AWS EMR Deep Dive(Linux Academy)
- Apache Spark
- EMR + Databricks 比較コース(DataCamp)
技術ブログ・記事
OSS・ベンダー資料
実装チェックリスト
フェーズ 1:導入準備
- [ ] 既存データウェアハウス vs EMR のコスト比較完了
- [ ] 年間データ処理量・クエリ複雑度を測定
- [ ] Spark・Hadoop スキルを持つ人材確認
- [ ] AWS 環境(VPC・IAM・S3)準備完了
フェーズ 2:パイロット実装
- [ ] テスト用 EMR on EC2 クラスター作成
- [ ] 既存パイプラインを Spark で実装・性能測定
- [ ] スポットインスタンス活用でコスト削減確認
- [ ] Lake Formation・Glue Catalog 統合テスト
フェーズ 3:本番運用
- [ ] Infrastructure as Code(Terraform/CloudFormation)構築
- [ ] CloudWatch アラーム・ダッシュボード構築
- [ ] 自動スケーリングルール定義・テスト
- [ ] セキュリティ(KMS・VPC・IAM Role)完全設定
- [ ] バックアップ・ディザスタリカバリー計画
フェーズ 4:最適化・高度な運用
- [ ] Hudi / Iceberg による ACID トランザクション導入
- [ ] EMR Serverless へ短時間ジョブ移行
- [ ] EMR Studio Notebook 運用化
- [ ] SageMaker との ML パイプライン統合
- [ ] マルチアカウント環境での統一ガバナンス
まとめ
Amazon EMR は 「Apache Spark/Hadoop のフルマネージドクラスター実行基盤」。EC2(細かい制御)・EKS(Kubernetes 統合)・Serverless(ゼロ管理)の 3 オプションで、ペタバイト規模のデータ処理・複雑な ML パイプイン・リアルタイムストリーミングを実現します。
成功の鍵:
- スポットインスタンスで 70-90% コスト削減
- 使い捨てクラスターで月額コスト 60% 削減
- **EMRFS(S3)**で永続性・柔軟性確保
- Lake Formation + Glue Catalog でデータガバナンス統一
- Infrastructure as Code で再現性・バージョン管理
- 自動スケーリング + CloudWatch 監視で運用効率化
Spark 4.0・Iceberg v3・EMR Serverless などの最新機能を活用し、クラウドネイティブなデータ分析基盤を構築してください。