イベント駆動設計

目次

概要

出来事を中心にシステムをつなぐ

イベント駆動設計は、システム内で起きた出来事をイベントとして発行し、それに関心のある処理が購読する設計です。サービス同士を直接呼び合うのではなく、出来事を介して疎結合にします。

要点

イベント駆動設計の中心は「何が起きたか」を明示し、後続処理を疎結合にすることです。ただし、非同期化は複雑さも増やすため、冪等性、順序、再試行、観測可能性を同時に設計します。

この章で重視すること


イベント駆動設計とは何か

イベントは、過去に起きた事実です。

OrderCreated
PaymentFailed
InventoryReserved
UserRegistered

イベント駆動では、発行者は受信者を直接知りません。

flowchart LR Order["注文サービス"] Bus["イベントバス"] Mail["メール送信"] Stock["在庫"] Analytics["分析"] Order -->|"OrderCreated"| Bus Bus --> Mail Bus --> Stock Bus --> Analytics

AsyncAPIのイベント駆動アーキテクチャ解説では、イベントは「何かが起きた」ことを伝えるためのメッセージであり、サービス間の通信をブローカーやチャネルで整理します。RESTの同期リクエストとは違い、受信者が後から処理できる点が特徴です。

イベント、コマンド、クエリ

種類 意味
コマンド 何かをしてほしい CreateOrder
イベント 何かが起きた OrderCreated
クエリ 情報がほしい GetOrder

イベントを命令のように使うと、責務が曖昧になります。イベントは「起きた事実」として設計します。

AsyncAPIの文献でも、messageは構造としては共通でも、eventとcommandは概念として異なると説明されています。eventは「起きた事実」を通知し、commandは「何かをしてほしい」という依頼です。名前も過去形と命令形で分けると読みやすくなります。

種類 名前の例 意味
event OrderPlaced 注文が確定した
command ShipOrder 注文を出荷してほしい
query GetOrder 注文を取得したい

ブローカー、チャネル、コンシューマ

イベント駆動では、producer、broker、channel、consumerを分けて考えます。

flowchart LR Producer["Producer / Publisher"] Broker["Message Broker"] Channel["Channel / Topic"] ConsumerA["Consumer A"] ConsumerB["Consumer B"] Producer --> Broker --> Channel Channel --> ConsumerA Channel --> ConsumerB

brokerはmessageを受け取り、関心を示したsubscriberへ届ける基盤です。channelはtopic、routing key、event typeなどサービスや製品によって呼び名が違いますが、特定種類のmessageを流す場所として設計します。

channelに複数の意味を混ぜると、consumer側が不要なmessageを大量に受け取り、filterや分岐が増えます。

避けたい:
  business-events

読みやすい:
  order.placed
  order.cancelled
  invoice.issued

channel名は実装都合より、業務上の事実や関心に合わせます。

同期通信との違い

同期通信は結果がすぐ分かる一方、呼び出し先の遅延や障害を受けやすいです。

非同期イベントは呼び出し元を軽くできますが、結果整合性になります。

方式 利点 注意点
同期API 分かりやすい、即時結果 密結合、障害伝播
非同期イベント 疎結合、スケールしやすい 遅延、重複、順序、追跡

イベントの設計

イベントには、最低限次の情報を入れます。

{
  "eventId": "evt_123",
  "eventType": "OrderCreated",
  "occurredAt": "2026-04-29T10:00:00Z",
  "version": 1,
  "data": {
    "orderId": "ord_123",
    "customerId": "cus_456",
    "totalAmount": 12000
  }
}

設計上の注意:

  • イベントIDを持つ
  • 発生時刻を持つ
  • スキーマバージョンを持つ
  • 個人情報を入れすぎない
  • 内部DB構造をそのまま出さない

AsyncAPIで契約を明文化する

イベントもAPIです。AsyncAPI Specificationは、非同期APIやイベント駆動アーキテクチャのメッセージ、チャネル、スキーマを記述するための仕様です。OpenAPIがHTTP APIの契約を文書化するように、AsyncAPIはイベントの契約を文書化します。

asyncapi: 3.0.0
info:
  title: Order Events
  version: 1.0.0
channels:
  order.created:
    address: order.created
    messages:
      OrderCreated:
        payload:
          type: object
          properties:
            orderId:
              type: string
            customerId:
              type: string

イベント契約を明文化すると、発行側と購読側の認識ずれを減らせます。

AsyncAPI Specificationは、receive側の定義をsend側へ機械的に反転することを推奨していません。受信に使うchannelと送信に使うchannelが同じとは限らず、operation名やsummaryも意味が変わるためです。producerとconsumerは、それぞれの責務としてdocumentを持つ方が安全です。

Receiver document:
  operation: onUserSignedUp
  action: receive

Sender document:
  operation: publishUserSignedUp
  action: send

「同じeventを扱う」ことと「同じAPI契約を持つ」ことは別です。broker設定、topic権限、DLQ、retry policyなど、AsyncAPIだけでは表しきれない運用設定も別途管理します。

配信方式

方式 向いている場面
キュー SQS, RabbitMQ 1つの処理者に渡す
Pub/Sub SNS, Google Pub/Sub 複数購読者に配る
ストリーム Kafka, Kinesis 順序、再処理、履歴
イベントバス EventBridge サービス間連携

キューとストリームは似ていますが、履歴を読む設計かどうかが大きく違います。

冪等性

イベント処理では、同じイベントが複数回届くことを前提にします。冪等性とは、同じ処理を複数回実行しても結果が変わらない性質です。

eventId = evt_123 を処理済みとして記録
同じ eventId が来たらスキップ

決済、在庫、通知では特に重要です。

順序と重複

分散システムでは、イベントが順番通りに届くとは限りません。

対策:

  • 集約単位で順序キーを使う
  • バージョン番号を持つ
  • 古いイベントを無視する
  • 順序が必要な範囲を小さくする

すべてのイベントに全体順序を求めると、スケーラビリティが落ちます。

エラーハンドリング

イベント処理では失敗が必ず起きます。

flowchart TD Event["イベント受信"] Process["処理"] Success["成功"] Retry["リトライ"] DLQ["デッドレターキュー"] Event --> Process Process -->|成功| Success Process -->|一時失敗| Retry --> Process Process -->|恒久失敗| DLQ

AWSのイベント駆動アーキテクチャ資料でも、疎結合化、スケール、非同期処理の利点と同時に、失敗時の再試行、デッドレター、監視を設計することが重要です。

イベントソーシング

イベントソーシングは、現在状態ではなくイベント列を正として保存する方式です。

OrderCreated
ItemAdded
PaymentAuthorized
OrderShipped

利点:

  • 状態の履歴が完全に残る
  • 監査に強い
  • 過去イベントから別のビューを再構築できる

注意点:

  • 実装が複雑
  • スキーマ進化が難しい
  • 読み取りモデルが別途必要

CQRS

CQRSは、更新モデルと参照モデルを分ける考え方です。

flowchart LR Command["Command Model"] Event["Events"] Read["Read Model"] UI["UI"] Command --> Event --> Read --> UI

読み取りが多いシステムや、画面ごとに最適化された参照モデルが必要な場合に有効です。

観測可能性

イベント駆動では、処理が非同期になるため追跡が難しくなります。

必要なもの:

  • correlationId
  • causationId
  • eventId
  • traceId
  • 処理遅延
  • キュー長
  • リトライ回数
  • デッドレター件数

オブザーバビリティを設計しないと、「どこで止まったか」が分からなくなります。

使うべき場面

向いている:

  • 後続処理が複数ある
  • 即時完了しなくてよい
  • 外部サービス障害を分離したい
  • 監査ログや履歴が重要
  • ストリーム処理が必要

向いていない:

  • 強い一貫性が必要
  • 即時結果が必要
  • 処理フローが単純
  • チームが非同期運用に慣れていない

zure Architecture: Event-driven architecture style](https://learn.microsoft.com/en-us/azure/architecture/guide/architecture-styles/event-driven)

イベントスキーマ

イベント駆動では、イベント名だけでなくschemaが契約になります。producerとconsumerが別々に進化するため、互換性を意識して設計します。

{
  "eventId": "evt_123",
  "eventType": "OrderConfirmed",
  "occurredAt": "2026-04-29T10:00:00Z",
  "version": 1,
  "payload": {
    "orderId": "ord_123",
    "customerId": "cus_456"
  }
}

入れるべき情報:

  • event id
  • event type
  • occurred at
  • schema version
  • correlation id
  • causation id
  • producer
  • payload

アウトボックスパターン

DB更新とイベント発行を別々に行うと、片方だけ成功する問題が起きます。Outbox patternでは、業務DBの同じtransaction内にoutbox tableへイベントを書き、別processがそれをbrokerへ送ります。

flowchart LR A["業務処理"] --> B["DB更新"] A --> C["outboxへ記録"] C --> D["publisher"] D --> E["message broker"]

これにより、DB更新とイベント記録の原子性を保ちやすくなります。ただし、consumer側では重複配信を前提に冪等に処理します。

コンシューマ設計

consumerは、イベントを受け取ったら必ず1回だけ処理できるとは限りません。

  • 同じeventが複数回来る
  • 順序が入れ替わる
  • 古いschemaが届く
  • 下流APIが失敗する
  • 処理途中でprocessが落ちる

そのため、idempotency key、処理済みevent table、retry policy、dead letter queue、schema互換性を設計します。

AsyncAPI仕様による非同期API設計

AsyncAPI (asyncapi.com) は、非同期メッセージングプロトコルのための OpenAPI等価物です。

AsyncAPI 3.0 スキーマ構造

asyncapi: '3.0.0'
info:
  title: 'E-Commerce Event Broker'
  version: '1.0.0'
  description: 'イベント駆動型ECシステムのメッセージング仕様'

defaultContentType: 'application/json'

servers:
  rabbitmq:
    host: 'rabbitmq.example.com:5672'
    protocol: amqp
    protocolVersion: '0.9.1'
  kafka:
    host: 'kafka.example.com:9092'
    protocol: kafka

channels:
  orders/created:
    description: '注文作成イベント'
    address: 'orders.created'
    publish:
      summary: '新規注文のイベント発行'
      message:
        $ref: '#/components/messages/OrderCreated'
        
  inventory/updated:
    description: '在庫更新イベント'
    address: 'inventory.updated'
    subscribe:
      message:
        $ref: '#/components/messages/InventoryUpdated'

components:
  messages:
    OrderCreated:
      payload:
        type: object
        properties:
          orderId:
            type: string
            format: uuid
          customerId:
            type: string
          totalAmount:
            type: number
            format: float
          items:
            type: array
            items:
              type: object
              properties:
                productId:
                  type: string
                quantity:
                  type: integer
                unitPrice:
                  type: number
          timestamp:
            type: string
            format: date-time

メッセージング仕様の重要性

  • プロデューサー・コンシューマー間の契約管理
  • スキーマ進化への対応
  • API第一設計による堅牢性

参考: AsyncAPI Community は、各プロトコル(RabbitMQ, Kafka, MQTT)の実装例を提供しています。

AWS イベント駆動アーキテクチャ

AWS (docs.aws.amazon.com, aws.amazon.com) は、フルマネージドなイベント駆動サービスを提供します。

Amazon EventBridge

EventBridgeは、アプリケーション間のイベント配信を自動化します。

import boto3
import json
from datetime import datetime

client = boto3.client('events', region_name='us-east-1')

# イベントルールの作成
response = client.put_rule(
    Name='order-processing-rule',
    EventPattern=json.dumps({
        'source': ['orders.service'],
        'detail-type': ['Order Placed'],
        'detail': {
            'status': ['pending']
        }
    }),
    State='ENABLED'
)

# ターゲット(SQS)の指定
client.put_targets(
    Rule='order-processing-rule',
    Targets=[
        {
            'Id': '1',
            'Arn': 'arn:aws:sqs:us-east-1:123456789012:order-queue',
            'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeRole'
        }
    ]
)

イベントパターンマッチング:

{
  "source": ["myapp"],
  "detail-type": ["User Action"],
  "detail": {
    "action": ["login", "logout"],
    "region": [{"prefix": "us-"}]
  }
}

Amazon SNS/SQS メッセージング

SNS(Simple Notification Service): パブリッシュ・サブスクライブモデル

sns = boto3.client('sns')
topic_arn = 'arn:aws:sns:us-east-1:123456789012:user-events'

# メッセージの発行
response = sns.publish(
    TopicArn=topic_arn,
    Message=json.dumps({
        'event_type': 'user.registered',
        'user_id': '12345',
        'email': 'user@example.com',
        'timestamp': datetime.utcnow().isoformat()
    }),
    MessageAttributes={
        'event_type': {'DataType': 'String', 'StringValue': 'user.registered'}
    }
)

SQS(Simple Queue Service): メッセージキュー(メッセージ保持)

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/order-queue'

# メッセージの送信
sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps({
        'order_id': 'ORD-001',
        'amount': 99.99
    }),
    MessageAttributes={
        'priority': {'StringValue': 'high', 'DataType': 'String'},
        'retry_count': {'StringValue': '0', 'DataType': 'Number'}
    }
)

# メッセージの受信
messages = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20  # ロングポーリング
)

AWS Lambda + EventBridge統合

def lambda_handler(event, context):
    '''EventBridgeイベント処理ハンドラ'''
    
    detail = event['detail']
    order_id = detail['orderId']
    
    # 在庫確認
    if check_inventory(order_id):
        # 決済処理
        process_payment(detail)
        
        # 完了イベント発行
        publish_event('order.confirmed', detail)
    else:
        # キャンセルイベント発行
        publish_event('order.cancelled', detail)
    
    return {'statusCode': 200}

Microsoft Azure イベント駆動設計

Azure (learn.microsoft.com) のイベント駆動パターン実装:

Azure Event Grid

Event Gridは、イベントソースからイベントハンドラへの配信を管理します。

from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import CloudEvent

endpoint = "https://{topic-name}.eventgrid.azure.net/api/events"
credential = AzureKeyCredential("{access-key}")
client = EventGridPublisherClient(endpoint, credential)

# カスタムイベントの発行
event = CloudEvent(
    type="user.registered",
    source="my-application",
    data={
        "user_id": "12345",
        "email": "user@example.com",
        "registration_date": "2026-05-02"
    }
)

client.send(event)

Azure Service Bus

メッセージング:トピック・サブスクリプション

from azure.servicebus import ServiceBusClient, ServiceBusMessage

connstr = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=..."
client = ServiceBusClient.from_connection_string(connstr)

# メッセージ送信
with client.get_topic_sender(topic_name="orders") as sender:
    message = ServiceBusMessage(
        body="Order data",
        subject="order.created",
        session_id="order-123",
        correlation_id="req-456"
    )
    sender.send_messages(message)

# メッセージ受信
with client.get_subscription_receiver(
    topic_name="orders", 
    subscription_name="inventory-service"
) as receiver:
    messages = receiver.receive_messages(max_wait_time=10)
    for msg in messages:
        print(msg.subject, msg.body)
        msg.complete()

Reactive Manifesto と宣言的設計

Reactive Manifesto (reactivemanifesto.org) は、分散システムの4つの特性を定義しています。

4つの特性

  1. Responsive(応答性): システムはリアルタイムレスポンスを提供
  2. Resilient(復元力): コンポーネント障害時にシステムは継続稼働
  3. Elastic(弾性): 負荷変動に応じてスケール
  4. Message Driven(メッセージ駆動): 非同期メッセージングによる疎結合

実装パターン

from asyncio import Queue
import asyncio

class ReactiveEventBus:
    '''イベント駆動型リアクティブシステム'''
    
    def __init__(self):
        self.subscribers = {}
        self.event_queue = Queue()
    
    async def subscribe(self, event_type, handler):
        '''イベント購読'''
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event_type, data):
        '''イベント発行(非同期)'''
        await self.event_queue.put((event_type, data))
    
    async def start(self):
        '''イベント処理ループ'''
        while True:
            event_type, data = await self.event_queue.get()
            
            if event_type in self.subscribers:
                # 全購読者に並行実行で通知
                tasks = [
                    handler(data)
                    for handler in self.subscribers[event_type]
                ]
                await asyncio.gather(*tasks, return_exceptions=True)

# 使用例
bus = ReactiveEventBus()

async def handle_order(data):
    print(f"処理: {data}")
    await asyncio.sleep(1)  # 非同期処理

await bus.subscribe('order.created', handle_order)
await bus.publish('order.created', {'order_id': 123})

マイクロサービス通信パターン

Microservices.io (microservices.io) の通信パターン実装:

Saga パターン (分散トランザクション)

class OrderSaga:
    '''注文処理Saga(分散トランザクション)'''
    
    async def execute_order(self, order_data):
        try:
            # ステップ1: 在庫予約
            reservation_id = await self.inventory_service.reserve(
                order_data['items']
            )
            
            # ステップ2: 決済
            payment_id = await self.payment_service.charge(
                order_data['amount']
            )
            
            # ステップ3: 配送
            shipment_id = await self.shipping_service.create_shipment(
                order_data
            )
            
            # 完了
            await self.order_service.confirm(order_data['order_id'])
            
        except Exception as e:
            # 補償トランザクション実行
            await self.compensate(reservation_id, payment_id)
            raise
    
    async def compensate(self, reservation_id, payment_id):
        '''ロールバック処理'''
        await self.inventory_service.release(reservation_id)
        await self.payment_service.refund(payment_id)

Circuit Breaker パターン

import asyncio
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = 'closed'      # 正常
    OPEN = 'open'          # 故障
    HALF_OPEN = 'half_open'  # 復旧テスト中

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
    
    async def call(self, coro):
        if self.state == CircuitState.OPEN:
            # 復旧時間経過をチェック
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await coro
            
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise

# 使用例
breaker = CircuitBreaker(failure_threshold=3)

async def call_external_api():
    return await breaker.call(some_async_operation())

イベント駆動設計のベストプラクティス

  1. イベント粒度の最適化

    • 粗粒度: パフォーマンス向上、但し再利用性低い
    • 細粒度: 再利用性高い、但しイベント数増加
  2. 冪等性の確保

    # イベント再処理時に同じ結果を生じる
    def process_payment(event_id, amount):
        existing = db.query(Payment).filter(id=event_id).first()
        if existing:
            return existing
        return db.insert(Payment(id=event_id, amount=amount))
    
  3. イベント順序の保証

    • パーティショニングキー(customer_id等)で順序保証
    • Kafka: 同一パーティション内で順序保証
  4. デッドレター処理

    # 処理失敗イベントをDLQに移動
    async def handle_with_dlq(event, max_retries=3):
        for attempt in range(max_retries):
            try:
                await process_event(event)
                return
            except Exception as e:
                if attempt == max_retries - 1:
                    await dead_letter_queue.send(event, error=str(e))
                await asyncio.sleep(2 ** attempt)  # 指数バックオフ
    

実装基盤を選ぶときの整理

AsyncAPI、AWS EventBridge、Azure Service Bus、Kafkaのような技術は、どれもイベント駆動を支える道具ですが、役割は同じではありません。AsyncAPIは契約を記述する仕様、EventBridgeやService Busはマネージドな配送基盤、Kafkaはログ指向のストリーミング基盤です。

選択肢 向いていること 注意点
AsyncAPI イベント契約、チャネル、メッセージ形式の文書化 broker設定や権限までは別管理が必要
EventBridge SaaS連携、疎結合なイベントルーティング 厳密な順序や高スループット処理には別設計が必要
Azure Service Bus enterprise messaging、queue、topic、dead-letter メッセージ設計と再試行方針を明示する
Kafka 大量イベント、再処理、stream processing 運用、partition設計、schema進化が難しい

技術選定では、まず「イベントを契約として管理したいのか」「業務イベントを配送したいのか」「履歴を再処理したいのか」を分けます。ここを混ぜると、軽い通知のために重いstreaming基盤を持ったり、再処理が必要なデータを一過性のqueueだけに流したりしがちです。

イベント駆動設計のトレードオフ

スケーラビリティ vs 複雑性

特性 同期API 非同期イベント
実装の単純さ 高い 低い(分散追跡、補償トランザクション等)
結果の即時性 即座 遅延(最終一貫性)
スケーラビリティ 制限される(直結) 優れている(疎結合)
障害の隔離 困難(波及する) 容易(独立処理)
監視・デバッグ 直線的 複雑(分散イベント追跡)

使い分け原則

  1. 同期APIを選ぶべき場合

    • 即座の応答が必要
    • 結果が確定的でなければならない
    • オブジェクト数が少ない
  2. 非同期イベントを選ぶべき場合

    • 高スケーラビリティが必須
    • 結果整合性で許容される
    • サービス間の疎結合が重要
    • イベントソーシングで監査径跡が必要

まとめ

イベント駆動設計は、サービス間を疎結合にし、拡張しやすい構造を作る強力な方法です。一方で、重複、順序、再試行、観測可能性を設計しないと、問題の追跡が難しくなります。イベントは「起きた事実」として扱い、必要な複雑さだけを導入することが重要です。

参考文献

公式・標準