# Send Backend Event Stores to your agent
Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.
## Fast path
- Download the package from Yavira.
- Extract it into a folder your agent can access.
- Paste one of the prompts below and point your agent at the extracted folder.
## Suggested prompts
### New install

```text
I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.
```
### Upgrade existing

```text
I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.
```
## Machine-readable fields
```json
{
  "schemaVersion": "1.0",
  "item": {
    "slug": "backend-event-stores",
    "name": "Backend Event Stores",
    "source": "tencent",
    "type": "skill",
    "category": "AI 智能",
    "sourceUrl": "https://clawhub.ai/wpank/backend-event-stores",
    "canonicalUrl": "https://clawhub.ai/wpank/backend-event-stores",
    "targetPlatform": "OpenClaw"
  },
  "install": {
    "downloadUrl": "/downloads/backend-event-stores",
    "sourceDownloadUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=backend-event-stores",
    "sourcePlatform": "tencent",
    "targetPlatform": "OpenClaw",
    "packageFormat": "ZIP package",
    "primaryDoc": "SKILL.md",
    "includedAssets": [
      "README.md",
      "SKILL.md"
    ],
    "downloadMode": "redirect",
    "sourceHealth": {
      "source": "tencent",
      "status": "healthy",
      "reason": "direct_download_ok",
      "recommendedAction": "download",
      "checkedAt": "2026-04-23T16:43:11.935Z",
      "expiresAt": "2026-04-30T16:43:11.935Z",
      "httpStatus": 200,
      "finalUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=4claw-imageboard",
      "contentType": "application/zip",
      "probeMethod": "head",
      "details": {
        "probeUrl": "https://wry-manatee-359.convex.site/api/v1/download?slug=4claw-imageboard",
        "contentDisposition": "attachment; filename=\"4claw-imageboard-1.0.1.zip\"",
        "redirectLocation": null,
        "bodySnippet": null
      },
      "scope": "source",
      "summary": "Source download looks usable.",
      "detail": "Yavira can redirect you to the upstream package for this source.",
      "primaryActionLabel": "Download for OpenClaw",
      "primaryActionHref": "/downloads/backend-event-stores"
    },
    "validation": {
      "installChecklist": [
        "Use the Yavira download entry.",
        "Review SKILL.md after the package is downloaded.",
        "Confirm the extracted package contains the expected setup assets."
      ],
      "postInstallChecks": [
        "Confirm the extracted package includes the expected docs or setup files.",
        "Validate the skill or prompts are available in your target agent workspace.",
        "Capture any manual follow-up steps the agent could not complete."
      ]
    }
  },
  "links": {
    "detailUrl": "https://openagent3.xyz/skills/backend-event-stores",
    "downloadUrl": "https://openagent3.xyz/downloads/backend-event-stores",
    "agentUrl": "https://openagent3.xyz/skills/backend-event-stores/agent",
    "manifestUrl": "https://openagent3.xyz/skills/backend-event-stores/agent.json",
    "briefUrl": "https://openagent3.xyz/skills/backend-event-stores/agent.md"
  }
}
```
## Documentation

### Event Store

Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration.

### When to Use This Skill

Designing event sourcing infrastructure
Choosing between event store technologies
Implementing custom event stores
Building projections from event streams
Adding snapshotting for aggregate performance
Integrating CQRS with event sourcing

### Event Store Architecture

┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘

### Event Store Requirements

RequirementDescriptionAppend-onlyEvents are immutable, only appendsOrderedPer-stream and global orderingVersionedOptimistic concurrency controlSubscriptionsReal-time event notificationsIdempotentHandle duplicate writes safely

### Technology Comparison

TechnologyBest ForLimitationsEventStoreDBPure event sourcingSingle-purposePostgreSQLExisting Postgres stackManual implementationKafkaHigh-throughput streamsNot ideal for per-stream queriesDynamoDBServerless, AWS-nativeQuery limitations

### Event Schema Design

Events are the source of truth. Well-designed schemas ensure long-term evolvability.

### Event Envelope Structure

{
  "event_id": "uuid",
  "stream_id": "Order-abc123",
  "event_type": "OrderPlaced",
  "version": 1,
  "schema_version": 1,
  "data": {
    "customer_id": "cust-1",
    "total_cents": 5000
  },
  "metadata": {
    "correlation_id": "req-xyz",
    "causation_id": "evt-prev",
    "user_id": "user-1",
    "timestamp": "2025-01-15T10:30:00Z"
  },
  "global_position": 42
}

### Schema Evolution Rules

Add fields freely — new optional fields are always safe
Never remove or rename fields — introduce a new event type instead
Version event types — OrderPlacedV2 when the schema changes materially
Upcast on read — transform old versions to the current shape in the deserializer

### PostgreSQL Event Store Schema

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);

CREATE TABLE snapshots (
    stream_id VARCHAR(255) PRIMARY KEY,
    stream_type VARCHAR(255) NOT NULL,
    snapshot_data JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE subscription_checkpoints (
    subscription_id VARCHAR(255) PRIMARY KEY,
    last_position BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

### Event Store Implementation

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: UUID = field(default_factory=uuid4)
    version: int | None = None
    global_position: int | None = None

class EventStore:  # backed by PostgreSQL schema above
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append(self, stream_id: str, stream_type: str,
                     events: list[Event],
                     expected_version: int | None = None) -> list[Event]:
        """Append events with optimistic concurrency control."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                if expected_version is not None:
                    current = await conn.fetchval(
                        "SELECT MAX(version) FROM events "
                        "WHERE stream_id = $1", stream_id
                    ) or 0
                    if current != expected_version:
                        raise ConcurrencyError(
                            f"Expected {expected_version}, got {current}"
                        )

                start = await conn.fetchval(
                    "SELECT COALESCE(MAX(version), 0) + 1 "
                    "FROM events WHERE stream_id = $1", stream_id
                )
                for i, evt in enumerate(events):
                    evt.version = start + i
                    row = await conn.fetchrow(
                        "INSERT INTO events (id, stream_id, stream_type, "
                        "event_type, event_data, metadata, version) "
                        "VALUES ($1,$2,$3,$4,$5,$6,$7) "
                        "RETURNING global_position",
                        evt.event_id, stream_id, stream_type,
                        evt.event_type, json.dumps(evt.data),
                        json.dumps(evt.metadata), evt.version,
                    )
                    evt.global_position = row["global_position"]
                return events

    async def read_stream(self, stream_id: str,
                          from_version: int = 0) -> list[Event]:
        """Read events for a single stream."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE stream_id = $1 "
                "AND version >= $2 ORDER BY version",
                stream_id, from_version,
            )
            return [self._to_event(r) for r in rows]

    async def read_all(self, from_position: int = 0,
                       limit: int = 1000) -> list[Event]:
        """Read global event stream for projections / subscriptions."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE global_position > $1 "
                "ORDER BY global_position LIMIT $2",
                from_position, limit,
            )
            return [self._to_event(r) for r in rows]

### Projections

Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.

### Projection Lifecycle

Start from checkpoint — resume from last processed global position
Apply events — update the read model for each relevant event type
Save checkpoint — persist the new position atomically with the read model

### Projection Example

class OrderSummaryProjection:
    def __init__(self, db, event_store: EventStore):
        self.db = db
        self.store = event_store

    async def run(self, batch_size: int = 100):
        position = await self._load_checkpoint()
        while True:
            events = await self.store.read_all(position, batch_size)
            if not events:
                await asyncio.sleep(1)
                continue
            for evt in events:
                await self._apply(evt)
                position = evt.global_position
            await self._save_checkpoint(position)

    async def _apply(self, event: Event):
        match event.event_type:
            case "OrderPlaced":
                await self.db.execute(
                    "INSERT INTO order_summaries (id, customer, total, status) "
                    "VALUES ($1,$2,$3,'placed')",
                    event.data["order_id"], event.data["customer_id"],
                    event.data["total_cents"],
                )
            case "OrderShipped":
                await self.db.execute(
                    "UPDATE order_summaries SET status='shipped' "
                    "WHERE id=$1", event.data["order_id"],
                )

### Projection Design Rules

Idempotent handlers — replaying the same event twice must not corrupt state
One projection per read model — keep projections focused
Rebuild from scratch — projections should be deletable and fully replayable
Separate storage — projections can live in different databases (Postgres, Elasticsearch, Redis)

### Snapshotting

Snapshots accelerate aggregate rehydration by caching state at a known version.

Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).

### Snapshot Flow

class SnapshottedRepository:
    def __init__(self, event_store: EventStore, pool):
        self.store = event_store
        self.pool = pool

    async def load(self, stream_id: str) -> Aggregate:
        # 1. Try loading snapshot
        snap = await self._load_snapshot(stream_id)
        from_version = 0
        aggregate = Aggregate(stream_id)

        if snap:
            aggregate.restore(snap["data"])
            from_version = snap["version"] + 1

        # 2. Replay events after snapshot
        events = await self.store.read_stream(stream_id, from_version)
        for evt in events:
            aggregate.apply(evt)

        # 3. Snapshot if too many events replayed
        if len(events) > 50:
            await self._save_snapshot(
                stream_id, aggregate.snapshot(), aggregate.version
            )

        return aggregate

### CQRS Integration

CQRS separates the write model (commands → events) from the read model (projections).

Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API
 (write)     (domain)      (append)        (build)        (read)

### Key Principles

Write side validates commands, emits events, enforces invariants
Read side subscribes to events, builds optimised query models
Eventual consistency — reads may lag behind writes by milliseconds to seconds
Independent scaling — scale reads and writes separately

### Command Handler Pattern

class PlaceOrderHandler:
    def __init__(self, event_store: EventStore):
        self.store = event_store

    async def handle(self, cmd: PlaceOrderCommand):
        # Load aggregate from events
        events = await self.store.read_stream(f"Order-{cmd.order_id}")
        order = Order.reconstitute(events)

        # Execute command — validates and produces new events
        new_events = order.place(cmd.customer_id, cmd.items)

        # Persist with concurrency check
        await self.store.append(
            f"Order-{cmd.order_id}", "Order", new_events,
            expected_version=order.version,
        )

### EventStoreDB Integration

from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

def append_events(stream_name: str, events: list, expected_revision=None):
    new_events = [
        NewEvent(
            type=event['type'],
            data=json.dumps(event['data']).encode(),
            metadata=json.dumps(event.get('metadata', {})).encode()
        )
        for event in events
    ]
    state = (StreamState.ANY if expected_revision is None
             else StreamState.NO_STREAM if expected_revision == -1
             else expected_revision)
    return client.append_to_stream(stream_name, new_events, current_version=state)

def read_stream(stream_name: str, from_revision: int = 0):
    return [
        {'type': e.type, 'data': json.loads(e.data),
         'stream_position': e.stream_position}
        for e in client.get_stream(stream_name, stream_position=from_revision)
    ]

# Category projection: read all events for Order-* streams
def read_category(category: str):
    return read_stream(f"$ce-{category}")

### DynamoDB Event Store

import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json, uuid

class DynamoEventStore:
    def __init__(self, table_name: str):
        self.table = boto3.resource('dynamodb').Table(table_name)

    def append(self, stream_id: str, events: list, expected_version: int = 0):
        with self.table.batch_writer() as batch:
            for i, event in enumerate(events):
                version = expected_version + i + 1
                batch.put_item(Item={
                    'PK': f"STREAM#{stream_id}",
                    'SK': f"VERSION#{version:020d}",
                    'GSI1PK': 'EVENTS',
                    'GSI1SK': datetime.utcnow().isoformat(),
                    'event_id': str(uuid.uuid4()),
                    'event_type': event['type'],
                    'event_data': json.dumps(event['data']),
                    'version': version,
                })

    def read_stream(self, stream_id: str, from_version: int = 0):
        resp = self.table.query(
            KeyConditionExpression=
                Key('PK').eq(f"STREAM#{stream_id}") &
                Key('SK').gte(f"VERSION#{from_version:020d}")
        )
        return [
            {'event_type': item['event_type'],
             'data': json.loads(item['event_data']),
             'version': item['version']}
            for item in resp['Items']
        ]

DynamoDB table design: PK=STREAM#{id}, SK=VERSION#{version}, GSI1 for global ordering.

### Do

Name streams {Type}-{id} — e.g., Order-abc123
Include correlation / causation IDs in metadata for tracing
Version event schemas from day one — plan for evolution
Implement idempotent writes — use event IDs for deduplication
Index for your query patterns — stream, global position, event type

### Don't

Mutate or delete events — they are immutable facts
Store large payloads — keep events small; reference blobs externally
Skip optimistic concurrency — prevents data corruption
Ignore backpressure — handle slow consumers gracefully
Couple projections to the write model — projections should be independently deployable

### NEVER Do

NEVER update or delete events — Events are immutable historical facts; create compensating events instead
NEVER skip version checks on append — Optimistic concurrency prevents lost updates and corruption
NEVER embed large blobs in events — Store blobs externally, reference by ID in the event
NEVER use random UUIDs for event IDs without idempotency checks — Retries create duplicates
NEVER read projections for command validation — Use the event stream as the source of truth
NEVER couple projections to the write transaction — Projections must be rebuildable independently
## Trust
- Source: tencent
- Verification: Indexed source record
- Publisher: wpank
- Version: 1.0.0
## Source health
- Status: healthy
- Source download looks usable.
- Yavira can redirect you to the upstream package for this source.
- Health scope: source
- Reason: direct_download_ok
- Checked at: 2026-04-23T16:43:11.935Z
- Expires at: 2026-04-30T16:43:11.935Z
- Recommended action: Download for OpenClaw
## Links
- [Detail page](https://openagent3.xyz/skills/backend-event-stores)
- [Send to Agent page](https://openagent3.xyz/skills/backend-event-stores/agent)
- [JSON manifest](https://openagent3.xyz/skills/backend-event-stores/agent.json)
- [Markdown brief](https://openagent3.xyz/skills/backend-event-stores/agent.md)
- [Download page](https://openagent3.xyz/downloads/backend-event-stores)