Amazon Timestream

目次

初心者から実務者向けの包括的解説

Amazon Timestream は、IoT・運用メトリクス・アプリケーションテレメトリ向けのフルマネージド時系列データベースです。毎日数兆の時系列データポイントを保存・分析でき、時系列データに特化したクエリエンジン、自動階層化ストレージ、ネイティブな時系列分析関数を提供します。このページでは、Timestream の概念・アーキテクチャ・設計パターン・エコシステム・近年の動向を体系的に整理します。

このページの目的

このページでは以下を対象としています。

  • 初心者向け: Timestream とは何か、RDS / DynamoDB との違いを学びたい方
  • 開発者向け: メモリストア・マグネティックストア・Scheduled Query の実装
  • データアーキテクト向け: Multi-measure record・時系列クエリの最適化
  • SRE / インフラ向け: スケーリング・VPC エンドポイント・ディザスタリカバリ
  • 意思決定者向け: InfluxDB / TimescaleDB / Prometheus との比較・投資判断

2026 年の Timestream エコシステム

  • Timestream for InfluxDB: マネージド InfluxDB v2 互換、既存 InfluxDB ワークロード移行対応
  • Timestream for LiveAnalytics: メモリストア + マグネティックストアの自動階層化
  • Schedule Query 拡張: より複雑な定期実行分析・自動集計
  • Vector Search(プレビュー): ベクトル埋め込みの直接保存・検索
  • PartiQL サポート: SQL ライク構文での時系列クエリ
  • Grafana Deep Integration: リアルタイムダッシュボード・Alert 統合
  • AWS Glue との統合: Zero-ETL で Redshift と時系列データを同期

定義

AWS 公式による定義:

“Amazon Timestream is a fast, scalable, fully managed time series database designed for IoT, industrial telemetry, analytics, and monitoring applications.”

特徴:

  • 時系列特化: 時系列データに最適化されたクエリエンジン
  • 自動階層化: メモリストア → マグネティックストア の自動移行
  • スケール: 毎秒数百万レコード、ペタバイト規模対応
  • マネージド: インフラ管理不要、99.99% SLA

概要

初心者向けメモ: Timestream は「時系列データに特化した高速・低コスト DB」です。RDS は「複雑な JOIN が必要なリレーショナルデータ」、DynamoDB は「汎用 NoSQL」に向いていますが、Timestream は「タイムスタンプが主軸のデータ」に最適化されています。

Timestream は以下を実現します:

  • 毎秒数百万レコードのスケール: IoT デバイスから大量の時系列データを高速に取り込み
  • 自動ストレージ階層化: メモリストア(最新データ)+ マグネティックストア(中期) + S3(長期)を自動管理してコスト最適化
  • 時系列特有の分析関数: 補間・ウィンドウ集計・季節性分解をネイティブサポート
  • SQL 互換クエリ: 学習コスト低く、既存 SQL スキルで即利用可能
  • Grafana 統合: リアルタイムダッシュボード構築が標準的

Timestream が解決する課題

1. 時系列データの大規模保存と高速クエリの両立が困難

課題: RDS で毎秒 10,000 レコードの時系列データを保存・クエリしようとすると、インデックス設計・テーブルパーティショニング・古いデータの削除・アーカイブ管理が複雑かつ手動

Timestream の解決:

毎秒数百万レコード → メモリストアに自動保存
  ↓ (ユーザー定義ポリシーで自動移行)
自動的にマグネティックストアに階層化
  → クエリはメモリ + マグネティックストアに透過的にアクセス
  → 古いデータは低コスト保存

2. 時系列クエリの実装が複雑

課題: 「過去 1 時間の 5 分ごとの平均値」「欠損値を線形補間で埋める」といった時系列特有の処理を RDS で実装すると複雑

Timestream の解決: 時系列関数をネイティブサポート

SELECT deviceId,
       BIN(time, 5m) AS window,
       AVG(temperature) AS avg_temp
FROM "iot"."sensor_data"
WHERE time BETWEEN ago(1h) AND now
GROUP BY deviceId, BIN(time, 5m)

3. インフラ管理の負担

課題: Prometheus・InfluxDB・TimescaleDB をオンプレミス・EC2 で運用すると、HA・バックアップ・スケーリング・パッチ管理が必要

Timestream の解決: フルマネージド・サーバーレス・99.99% SLA


主な特徴

特徴 説明
時系列最適化 クエリエンジン・ストレージ・関数全てが時系列に最適化
自動階層化 メモリ → マグネティック → S3 を自動移行、コスト最適化
毎秒数百万レコード IoT・メトリクス規模のスケール対応
マルチメジャー 1 レコードに複数メトリクス(CPU, Memory, Disk)を格納
時系列関数 補間・ウィンドウ集計・季節分解をネイティブサポート
SQL 互換 既存 SQL スキルで利用可能
Scheduled Query 定期実行クエリで事前集計・コスト削減
Grafana / QuickSight ダッシュボード統合が標準
VPC エンドポイント プライベート接続対応
CloudTrail 監査ログ完全サポート

Timestream for LiveAnalytics vs Timestream for InfluxDB

観点 LiveAnalytics for InfluxDB
エンジン AWS 独自 SQL 互換 InfluxDB v2 互換
クエリ言語 SQL + 時系列関数 Flux / InfluxQL
メモリストア AWS 管理(MB~GB) InfluxDB メモリ
マグネティックストア S3 ベース(低コスト) オンプレミス or クラウド
既存InfluxDB移行 スキーマ変換必要 互換(接続するだけ)
学習曲線 SQL 標準(低) Flux/InfluxQL 習得必要
推奨用途 汎用時系列・IoT・メトリクス InfluxDB 既存ユーザー
コスト 書き込み量・クエリスキャン インスタンス時間

選択基準:

  • InfluxDB 既存ユーザー → Timestream for InfluxDB
  • 新規・SQL 習熟 → Timestream for LiveAnalytics

アーキテクチャ

graph TB
    IoT["IoT Core / Kinesis Data Streams / MSK"]
    TS["Amazon Timestream"]
    MS["メモリストア (InMemory)"]
    MGS["マグネティックストア (S3)"]

    Grafana["Grafana / QuickSight / Analytics"]
    Query["Query Engine"]

    IoT -->|Write API| TS
    TS -->|Auto Tiering| MS
    MS -->|Auto Migration Policy| MGS

    Query -->|Transparent Access| MS
    Query -->|Auto Redirect| MGS
    Query -->|Results| Grafana

    Lambda["Lambda Code Hook"]
    Scheduled["Scheduled Query"]

    TS -->|Trigger| Lambda
    TS -->|Pre-aggregation| Scheduled

    SNS["Amazon SNS / CloudWatch"]
    Scheduled -->|Alert| SNS

データモデル

テーブル・ディメンション・メジャー

Database: my_app
  └── Table: server_metrics
      ├── ディメンション(タグ・メタデータ)
      │   ├── region: us-east-1 (変化しない)
      │   ├── instanceId: i-abc123
      │   └── service: web-api
      │
      ├── メジャー(計測値)
      │   ├── cpu_utilization: 75.3
      │   ├── memory_used_gb: 12.8
      │   └── disk_read_iops: 350
      │
      └── タイムスタンプ: 2026-04-27T10:30:00Z

マルチメジャーレコード(推奨)

1 回の書き込みで複数メトリクスを格納:

{
  "DatabaseName": "my_app",
  "TableName": "server_metrics",
  "Records": [
    {
      "Time": "2026-04-27T10:30:00Z",
      "TimeUnit": "SECONDS",
      "Dimensions": [
        {"Name": "region", "Value": "us-east-1"},
        {"Name": "instanceId", "Value": "i-abc123"}
      ],
      "MeasureName": "server_stats",
      "MeasureValues": [
        {"Name": "cpu_utilization", "Value": "75.3", "Type": "DOUBLE"},
        {"Name": "memory_used_gb", "Value": "12.8", "Type": "DOUBLE"},
        {"Name": "disk_read_iops", "Value": "350", "Type": "BIGINT"}
      ]
    }
  ]
}

メリット:

  • 書き込み 1 回で複数メトリクス
  • ストレージ効率向上
  • 同一タイムスタンプの一貫性

ストレージ階層化

データ書き込み(Write API)
    ↓
メモリストア(InMemory)
    ← 最近 1 時間〜数時間
    ← 高速クエリ向け
    ← 高コスト(GB/時)

    ↓ (ユーザー定義ポリシー自動適用)

マグネティックストア(S3)
    ← 数日〜年単位
    ← 中コスト(GB/月)
    ← シーケンシャルアクセス最適化

    ↓ (ライフサイクルポリシー)

S3 深いアーカイブ / Glacier
    ← 年単位の長期保存
    ← 最低コスト(GB/年)
ストア 用途 コスト レイテンシ
メモリストア 直近データの超高速クエリ < 1ms
マグネティックストア 中期・分析クエリ 数秒
S3 アーカイブ 長期コンプライアンス保存 分〜時

クエリ言語と時系列関数

基本クエリ

-- 直近 1 時間の平均 CPU 使用率(5 分ごと)
SELECT instanceId,
       BIN(time, 5m) AS window,
       AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now
  AND region = 'us-east-1'
GROUP BY instanceId, BIN(time, 5m)
ORDER BY window DESC;

-- 過去 7 日の日単位集計
SELECT DATE_TRUNC('day', time) AS day,
       region,
       MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
       AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
       MIN(CAST(cpu_utilization AS DOUBLE)) AS min_cpu
FROM "my_app"."server_metrics"
WHERE time > ago(7d)
GROUP BY DATE_TRUNC('day', time), region
ORDER BY day DESC;

時系列専用関数

関数 機能
BIN(time, interval) 時間をバケット化(5 分ごと等)
FILL(value) NULL 値を指定値で埋める
INTERPOLATE_LINEAR 線形補間で欠損値補完
INTERPOLATE_LOCF 直前値で補完(Last Observation Carried Forward)
ago(duration) 現在から過去の時間を計算
CREATE_TIME_SERIES 時系列オブジェクト作成
SMOOTH スムージング(移動平均等)
APPROX_PERCENTILE_CONT 近似パーセンタイル

補間・平滑化例

-- 線形補間で欠損値を埋める
SELECT instanceId,
       time,
       INTERPOLATE_LINEAR(
         CREATE_TIME_SERIES(time, cpu_utilization),
         SEQUENCE(min(time) OVER , max(time) OVER , 1m)
       ) AS interpolated_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(24h) AND now
GROUP BY instanceId;

-- 移動平均(5 分ウィンドウ)
SELECT instanceId,
       time,
       AVG(CAST(cpu_utilization AS DOUBLE)) OVER (
         PARTITION BY instanceId
         ORDER BY time
         ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
       ) AS moving_avg_cpu
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(12h) AND now
ORDER BY time DESC;

Scheduled Queries

定期実行クエリで事前集計・ダッシュボード用プリアグリゲーション:

-- 定義:毎 5 分ごとに 5 分単位の平均値を別テーブルに書き込み
CREATE SCHEDULED_QUERY 'compute_5min_agg' AS
SELECT
  BIN(time, 5m) AS binned_time,
  region,
  instanceId,
  AVG(CAST(cpu_utilization AS DOUBLE)) AS avg_cpu,
  MAX(CAST(cpu_utilization AS DOUBLE)) AS max_cpu,
  COUNT(*) AS sample_count
FROM "my_app"."server_metrics"
WHERE time BETWEEN ago(1h) AND now
GROUP BY BIN(time, 5m), region, instanceId
EXEC INTO "my_app"."server_metrics_5min_agg"
SCHEDULE 'rate(5 minutes)';

メリット:

  • ダッシュボード表示の高速化(事前計算済み)
  • クエリコスト削減(大規模テーブルをスキャンしない)
  • 自動アラート・通知(SNS 統合)

マルチメジャーレコード

シングルメジャー(非効率)

# 1 回の書き込みで 1 メトリクスのみ
for metric in ['cpu', 'memory', 'disk']:
    timestream.write_records(
        Records=[{
            'Time': str(int(time.time * 1000)),
            'TimeUnit': 'MILLISECONDS',
            'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
            'MeasureName': metric,
            'MeasureValue': '75.3',
            'MeasureValueType': 'DOUBLE'
        }]
    )  # 3 回の API コール

マルチメジャー(推奨)

# 1 回の書き込みで複数メトリクス
timestream.write_records(
    Records=[{
        'Time': str(int(time.time * 1000)),
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [{'Name': 'instance_id', 'Value': 'i-123'}],
        'MeasureName': 'server_metrics',
        'MeasureValues': [
            {'Name': 'cpu_utilization', 'Value': '75.3', 'Type': 'DOUBLE'},
            {'Name': 'memory_used_gb', 'Value': '12.8', 'Type': 'DOUBLE'},
            {'Name': 'disk_read_iops', 'Value': '350', 'Type': 'BIGINT'}
        ]
    }]
)  # 1 回の API コール、コスト削減

主要ユースケース

  1. IoT センサーモニタリング

    • 工場の 10,000 台のセンサーから毎秒 100 万データポイント取得
    • Grafana で異常検知ダッシュボード表示
    • Scheduled Query で 1 時間ごとの集計・SNS 通知
  2. アプリケーション メトリクス

    • EC2・Lambda のレイテンシー・エラー率・スループット
    • 13 ヶ月保持してキャパシティプランニング
    • QuickSight で BI ダッシュボード
  3. 金融市場データ

    • 株価・為替レート・先物の時系列保存
    • 1 分・5 分・1 時間の OHLC(始値・高値・安値・終値)を自動計算
    • Scheduled Query で価格アラート
  4. スマートメーター

    • 電力会社が数百万世帯の電力消費データを 7 年間保持
    • 機械学習で需要予測・コスト最適化
    • SageMaker と連携
  5. ネットワーク・セキュリティ

    • VPC Flow Logs・WAF ログ・GuardDuty アラート
    • 時系列で侵入検知・トレンド分析
  6. 医療・IoT デバイス

    • ウェアラブルから心拍・酸素飽和度を毎秒取得
    • HIPAA 準拠で長期保存・研究利用
  7. SaaS メトリクス

    • API レスポンスタイム・アクティブユーザー数・課金メトリクス
    • オペレーション・ダッシュボード
  8. 気象・環境モニタリング

    • 気象ステーション・大気品質・水質データ
    • 数十年の歴史データで気候変動分析
  9. ゲーム分析

    • プレイヤーの時系列アクティビティ
    • リアルタイムランキング・ミッション進捗
  10. 製造業 - 予防保全

    • 機械の振動・温度・圧力データ
    • 異常検知で故障予測・ダウンタイム削減

設定・操作の具体例

AWS Management Console での作成

  1. Timestream コンソール → Create Database

    Database name: my_iot_app
    KMS Key: (AWS managed or Customer managed)
    
  2. Create Table

    Table name: sensor_data
    Memory store retention: 1 hour
    Magnetic store retention: 365 days (1 year)
    Magnetic store S3: s3://my-bucket/timestream/
    
  3. Table Schema

    Dimensions: region, instance_id, sensor_type
    Time attribute: time (TIMESTAMP)
    Retention: Memory 1h, Magnetic 365d
    

CLI 操作

データベース・テーブル作成

# Database 作成
aws timestream-write create-database \
  --database-name my_iot_app \
  --region us-east-1

# Table 作成
aws timestream-write create-table \
  --database-name my_iot_app \
  --table-name sensor_data \
  --retention-properties \
    MemoryStoreRetentionPeriodInHours=24,\
    MagneticStoreRetentionPeriodInDays=365

データ書き込み

# JSON ファイルから書き込み
aws timestream-write write-records \
  --database-name my_iot_app \
  --table-name sensor_data \
  --records '[
    {
      "Time": "1667303200000",
      "TimeUnit": "MILLISECONDS",
      "Dimensions": [
        {"Name": "region", "Value": "us-east-1"},
        {"Name": "instance_id", "Value": "i-abc123"}
      ],
      "MeasureName": "cpu_utilization",
      "MeasureValue": "75.3",
      "MeasureValueType": "DOUBLE"
    }
  ]'

クエリ実行

# クエリ実行
aws timestream-query query \
  --query-string "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10"

# 結果取得
aws timestream-query get-query-execution-status \
  --query-id <query-id>

Scheduled Query 作成

aws timestream-write create-scheduled-query \
  --name daily-aggregation \
  --schedule-expression "rate(1 day)" \
  --query-string "SELECT \
    DATE_TRUNC('day', time) as day, \
    region, \
    AVG(CAST(cpu_utilization AS DOUBLE)) as avg_cpu \
    FROM \"my_iot_app\".\"sensor_data\" \
    WHERE time > ago(1d) \
    GROUP BY DATE_TRUNC('day', time), region" \
  --scheduled-query-execution-role-arn arn:aws:iam::123456789012:role/TimestreamRole \
  --target-configuration '{"TimestreamConfiguration": {"DatabaseName": "my_iot_app", "TableName": "sensor_data_daily"}}'

SDK 実装例

Python (Boto3)

import boto3
import json
from datetime import datetime
import time

client = boto3.client('timestream-write', region_name='us-east-1')
query_client = boto3.client('timestream-query', region_name='us-east-1')

# データ書き込み(マルチメジャー)
def write_sensor_data(device_id, temperature, humidity, pressure):
    current_time = str(int(time.time * 1000))

    records = [{
        'Time': current_time,
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [
            {'Name': 'region', 'Value': 'us-east-1'},
            {'Name': 'device_id', 'Value': device_id},
            {'Name': 'sensor_type', 'Value': 'environmental'}
        ],
        'MeasureName': 'environmental_metrics',
        'MeasureValues': [
            {'Name': 'temperature_celsius', 'Value': str(temperature), 'Type': 'DOUBLE'},
            {'Name': 'humidity_percent', 'Value': str(humidity), 'Type': 'DOUBLE'},
            {'Name': 'pressure_mbar', 'Value': str(pressure), 'Type': 'DOUBLE'}
        ]
    }]

    response = client.write_records(
        DatabaseName='my_iot_app',
        TableName='sensor_data',
        Records=records
    )
    print(f"Wrote {response['RecordsIngested']['Total']} records")
    return response

# クエリ実行
def query_sensor_stats(hours=1):
    query_string = f"""
    SELECT
        device_id,
        BIN(time, 5m) as window,
        AVG(CAST(temperature_celsius AS DOUBLE)) as avg_temp,
        MAX(CAST(temperature_celsius AS DOUBLE)) as max_temp,
        MIN(CAST(temperature_celsius AS DOUBLE)) as min_temp,
        COUNT(*) as sample_count
    FROM "my_iot_app"."sensor_data"
    WHERE time > ago({hours}h)
    GROUP BY device_id, BIN(time, 5m)
    ORDER BY window DESC
    """

    response = query_client.query(QueryString=query_string)

    columns = [col['Name'] for col in response['ColumnInfo']]
    print(f"Columns: {columns}")

    for row in response['Rows']:
        print([cell.get('ScalarValue') for cell in row['Data']])

    return response

# 使用例
if __name__ == '__main__':
    # データ書き込み
    write_sensor_data('sensor-001', temperature=23.5, humidity=45.2, pressure=1013.25)
    write_sensor_data('sensor-002', temperature=22.8, humidity=48.1, pressure=1013.30)

    # クエリ実行
    query_sensor_stats(hours=1)

Java

import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;
import software.amazon.awssdk.services.timestreamwrite.model.*;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamquery.model.*;

public class TimestreamExample {
    public static void writeRecords {
        TimestreamWriteClient client = TimestreamWriteClient.builder.build;

        long time = System.currentTimeMillis;
        String timeStr = Long.toString(time);

        Record record = Record.builder
            .time(timeStr)
            .timeUnit(TimeUnit.MILLISECONDS)
            .dimensions(
                Dimension.builder.name("region").value("us-east-1").build,
                Dimension.builder.name("instance_id").value("i-abc123").build
            )
            .measureName("server_metrics")
            .measureValues(
                MeasureValue.builder.name("cpu_utilization").value("75.3").type(MeasureValueType.DOUBLE).build,
                MeasureValue.builder.name("memory_used_gb").value("12.8").type(MeasureValueType.DOUBLE).build
            )
            .build;

        WriteRecordsRequest writeRequest = WriteRecordsRequest.builder
            .databaseName("my_iot_app")
            .tableName("sensor_data")
            .records(record)
            .build;

        WriteRecordsResponse response = client.writeRecords(writeRequest);
        System.out.println("Records written: " + response.recordsIngested.total);

        client.close;
    }

    public static void queryRecords {
        TimestreamQueryClient queryClient = TimestreamQueryClient.builder.build;

        String queryString = "SELECT * FROM \"my_iot_app\".\"sensor_data\" WHERE time > ago(1h) LIMIT 10";

        QueryRequest queryRequest = QueryRequest.builder
            .queryString(queryString)
            .build;

        QueryResponse response = queryClient.query(queryRequest);

        response.rows.forEach(row -> {
            row.data.forEach(datum -> System.out.print(datum.scalarValue + " "));
            System.out.println;
        });

        queryClient.close;
    }
}

Node.js

const AWS = require('aws-sdk');

const writeClient = new AWS.TimestreamWrite({ region: 'us-east-1' });
const queryClient = new AWS.TimestreamQuery({ region: 'us-east-1' });

// データ書き込み
async function writeSensorData(deviceId, temperature, humidity) {
    const time = Date.now.toString;

    const params = {
        DatabaseName: 'my_iot_app',
        TableName: 'sensor_data',
        Records: [{
            Time: time,
            TimeUnit: 'MILLISECONDS',
            Dimensions: [
                { Name: 'region', Value: 'us-east-1' },
                { Name: 'device_id', Value: deviceId }
            ],
            MeasureName: 'environment_metrics',
            MeasureValues: [
                { Name: 'temperature', Value: temperature.toString, Type: 'DOUBLE' },
                { Name: 'humidity', Value: humidity.toString, Type: 'DOUBLE' }
            ]
        }]
    };

    try {
        const response = await writeClient.writeRecords(params).promise;
        console.log('Records written:', response.RecordsIngested.Total);
    } catch (error) {
        console.error('Error writing records:', error);
    }
}

// クエリ実行
async function querySensorData {
    const params = {
        QueryString: `
            SELECT device_id,
                   BIN(time, 5m) as window,
                   AVG(CAST(temperature AS DOUBLE)) as avg_temp
            FROM "my_iot_app"."sensor_data"
            WHERE time > ago(1h)
            GROUP BY device_id, BIN(time, 5m)
        `
    };

    try {
        const response = await queryClient.query(params).promise;
        console.log('Query result:', response.Rows);
    } catch (error) {
        console.error('Error querying:', error);
    }
}

// 実行
(async  => {
    await writeSensorData('sensor-001', 23.5, 45.2);
    await querySensorData;
});

IaC (CloudFormation/Terraform)

CloudFormation

AWSTemplateFormatVersion: '2010-09-09'
Description: 'Timestream Database and Table'

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: my_iot_app
      Tags:
        - Key: Environment
          Value: Production

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: sensor_data
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 24
        MagneticStoreRetentionPeriodInDays: 365
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true
      Tags:
        - Key: Environment
          Value: Production

  TimestreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: TimestreamAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - timestream:WriteRecords
                  - timestream:SelectValues
                Resource: !Sub 'arn:aws:timestream:${AWS::Region}:${AWS::AccountId}:database/${TimestreamDatabase}/table/${TimestreamTable}'

Outputs:
  DatabaseName:
    Value: !Ref TimestreamDatabase
  TableName:
    Value: !Ref TimestreamTable
  RoleArn:
    Value: !GetAtt TimestreamRole.Arn

Terraform

provider "aws" {
  region = "us-east-1"
}

resource "aws_timestreamwrite_database" "my_iot_app" {
  database_name = "my_iot_app"

  tags = {
    Environment = "Production"
  }
}

resource "aws_timestreamwrite_table" "sensor_data" {
  database_name = aws_timestreamwrite_database.my_iot_app.database_name
  table_name    = "sensor_data"

  retention_properties {
    memory_store_retention_period_in_hours    = 24
    magnetic_store_retention_period_in_days   = 365
  }

  magnetic_store_write_properties {
    enable_magnetic_store_writes = true
  }

  tags = {
    Environment = "Production"
  }
}

resource "aws_iam_role" "timestream_role" {
  name = "timestream-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "timestream_policy" {
  name = "timestream-policy"
  role = aws_iam_role.timestream_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = [
        "timestream:WriteRecords",
        "timestream:SelectValues"
      ]
      Resource = "arn:aws:timestream:us-east-1:*:database/${aws_timestreamwrite_database.my_iot_app.database_name}/table/${aws_timestreamwrite_table.sensor_data.table_name}"
    }]
  })
}

output "database_name" {
  value = aws_timestreamwrite_database.my_iot_app.database_name
}

output "table_name" {
  value = aws_timestreamwrite_table.sensor_data.table_name
}

類似サービス比較表

比較軸 Timestream InfluxDB(OSS) TimescaleDB Prometheus ClickHouse
マネージド時系列 DB OSS 時系列 DB PostgreSQL 拡張 メトリクス収集 OLAP 列指向
インフラ管理 AWS マネージド オンプレ / 自己管理 オンプレ / 自己管理 オンプレ / 自己管理 オンプレ / 自己管理
スケール 毎秒百万レコード 毎秒万レコード 毎秒十万レコード 数千〜万レコード 毎秒数百万イベント
クエリ言語 SQL 互換 InfluxQL / Flux SQL (PostgreSQL) PromQL SQL
ストレージ階層化 自動(メモリ→磁気) 手動 手動 メモリのみ S3 / HDFS 統合
学習曲線 低(SQL標準) 中(InfluxQL習得) 低(SQL拡張) 中(PromQL) 低(SQL標準)
関数 時系列専用(補間等) InfluxQL 固有 時系列 / ウィンドウ 集計関数 SQL 標準
推奨用途 IoT・クラウドネイティブ IoT・オンプレ クラウド関連データ Prometheus エコシステム 大規模分析
コスト 従量制(書き込み・クエリ) 初期 + 運用 初期 + 運用 初期 + 運用 初期 + 運用
HA/DR 99.99% SLA 構築必要 構築必要 構築必要 構築必要
サーバーレス × × × ×
Grafana 統合 ネイティブ ネイティブ 可能 ネイティブ 可能

ベストプラクティス

✅ 推奨パターン

  1. マルチメジャーレコード利用

    • 1 回の書き込みで複数メトリクス
    • 書き込みコスト削減・ストレージ効率向上
  2. 適切なディメンション設計

    ✅ region, instance_id, service, environment
    ❌ timestamp, user_id(高カーディナリティ)
    

    カーディナリティが高すぎるとメモリストア効率低下

  3. Scheduled Query でプリアグリゲーション

    • ダッシュボード用の事前集計
    • クエリスキャン量削減 → コスト削減
  4. VPC エンドポイント利用

    • プライベート接続で安全性向上
    • CloudTrail ログ有効化
  5. パーティション戦略

    -- ディメンションを活用したパーティショニング
    WHERE region = 'us-east-1' AND time > ago(7d)
    
  6. メモリストア / マグネティックストア の適切な分離

    • ホット(直近 1 時間)→ メモリ
    • ウォーム(最近 1 週間)→ マグネティック
    • コールド(過去 1 年以上)→ S3

❌ アンチパターン

  1. シングルメジャーレコード の乱用

    • 毎回 API コール
    • 書き込みコスト増加
  2. 高カーディナリティディメンション

    ❌ user_id, request_id(数百万種類)
    
  3. スケジュール化されたアグリゲーションなし

    • ダッシュボードが毎回フルスキャン
    • コスト増・レイテンシ増
  4. IAM ポリシーが過剰に広い

    "Action": "timestream:*""Action": ["timestream:WriteRecords", "timestream:SelectValues"]
    
  5. レテンション期間の不適切な設定

    • マグネティック無しで容量超過
    • S3 にアーカイブしない

トラブルシューティング

症状 原因 対策
ValidationException スキーマ・データ型不一致 MeasureValue が DOUBLE なのに STRING 送信 → 型確認
ResourceNotFoundException テーブル・DB が存在しない arn:aws:timestream: 確認、リージョン確認
ThrottlingException 書き込み / クエリレート超過 Scheduled Query で集計、バッチ処理活用
AccessDeniedException IAM ロール権限不足 Policy に WriteRecords / SelectValues 追加
Query timeout クエリが大きすぎる / スキャンが多い WHERE で時間範囲・ディメンション フィルター追加
Memory store full 書き込み量が多く、ポリシー遅延 マグネティックストア移行時間確認
レスポンスが遅い メモリストア外のデータをスキャン メモリストア保持期間を延長

セキュリティ・コンプライアンス

内容
IAM リソース単位のきめ細かい権限制御
KMS 保存時暗号化(AWS Managed Key / CMK 選択可)
TLS / HTTPS 転送中の暗号化(デフォルト)
VPC Endpoint PrivateLink 対応、インターネット露出なし
CloudTrail API 操作の監査ログ
Retention コンプライアンス保持期間設定・自動削除可能
監査 CloudWatch Logs に詳細ログ出力

パフォーマンスチューニング

書き込みの最適化

# バッチ書き込み(複数レコード)
def batch_write_records(records, batch_size=100):
    for i in range(0, len(records), batch_size):
        batch = records[i:i+batch_size]
        client.write_records(
            DatabaseName='my_app',
            TableName='sensor_data',
            Records=batch
        )

クエリの最適化

-- ❌ 非効率:全テーブルスキャン
SELECT * FROM "my_app"."sensor_data"

-- ✅ 効率的:時間範囲フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE time > ago(1h)

-- ✅ さらに効率的:ディメンション + 時間フィルター
SELECT * FROM "my_app"."sensor_data"
WHERE region = 'us-east-1'
  AND time > ago(1h)

-- ✅ 最高効率:Scheduled Query で事前集計
SELECT device_id, BIN(time, 5m), AVG(temperature)
FROM "my_app"."sensor_data_5min_agg"  -- 既に集計済み
WHERE time > ago(1h)

コスト最適化

1. マルチメジャーレコード利用

  • シングル × 3 メトリクス → マルチ × 1 レコード
  • 書き込みコスト 1/3

2. Scheduled Query でプリアグリゲーション

  • ダッシュボード毎回フルスキャン → 事前集計
  • クエリコスト 50-90% 削減

3. 適切なレテンション設定

  • メモリストア: 24 時間(ホットデータ)
  • マグネティック: 365 日(ウォームデータ)
  • → 古いデータは S3 ライフサイクルで移行

4. フィルターの活用

WHERE region = 'us-east-1' AND time > ago(7d)

不要なデータはスキャン前に除外


近年の動向

Timestream for InfluxDB の進化

  • InfluxDB v3 互試対応(2026 年上半期予定)
  • パフォーマンス向上・マルチテナント対応

Vector Search サポート

  • 埋め込みベクトルの直接保存・検索
  • 時系列 + ベクトル検索の統合

Zero-ETL 統合

  • Redshift との自動同期
  • Glue なしでのデータレプリケーション

AI/ML 統合の深化

  • SageMaker Forecast との直接統合
  • Bedrock で異常検知・予測

マルチリージョン対応

  • クロスリージョン レプリケーション
  • グローバルアプリケーション対応

学習リソース・参考文献

公式リソース

ベンダー・OSS リソース

コミュニティ・ブログ

  • AWS Database Blog
  • Time Series Data Management
  • IoT on AWS Blog

実装例・チェックリスト

チェックリスト

  • [ ] データレート(毎秒レコード数)を測定
  • [ ] ディメンション・カーディナリティを設計
  • [ ] マルチメジャーレコード利用か確認
  • [ ] メモリストア / マグネティックストア 保持期間設定
  • [ ] Scheduled Query でプリアグリゲーション計画
  • [ ] VPC エンドポイント設定
  • [ ] IAM ロール・最小権限確認
  • [ ] CloudTrail 監査ログ有効化
  • [ ] Grafana / QuickSight 接続確認
  • [ ] コスト見積もり・予算アラート設定

実装チェック

# Timestream 接続確認
import boto3

client = boto3.client('timestream-write')

# DB リスト確認
response = client.list_databases
print("Databases:", [db['DatabaseName'] for db in response['Databases']])

# テーブル確認
response = client.list_tables(DatabaseName='my_app')
print("Tables:", [t['TableName'] for t in response['Tables']])

# 書き込みテスト
import time
response = client.write_records(
    DatabaseName='my_app',
    TableName='sensor_data',
    Records=[{
        'Time': str(int(time.time * 1000)),
        'TimeUnit': 'MILLISECONDS',
        'Dimensions': [{'Name': 'test', 'Value': 'true'}],
        'MeasureName': 'test_metric',
        'MeasureValue': '1.0',
        'MeasureValueType': 'DOUBLE'
    }]
)
print("Write success:", response['RecordsIngested']['Total'] > 0)

まとめ

Amazon Timestream は 「時系列データ専用の高速・低コスト・マネージド DB」 です。IoT センサーデータ・サーバーメトリクス・アプリテレメトリなど、タイムスタンプが主軸のデータに特化しており、以下の特徴を提供します:

  • 自動ストレージ階層化 でコスト最適化
  • SQL 互換クエリ で学習コスト低減
  • 時系列専用関数 で分析を高速化
  • Scheduled Query でダッシュボード最適化
  • Grafana 統合 でリアルタイムダッシュボード
  • 99.99% SLA でエンタープライズ品質

RDS / DynamoDB では対応困難な 毎秒百万レコード規模 のデータを、低コスト・低運用負荷で実現します。Timestream for LiveAnalytics(AWS ネイティブ)か Timestream for InfluxDB(InfluxDB 既存ユーザー向け)から選択できます。