← All skills
Tencent SkillHub Β· AI

Backend Event Stores

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

skill openclawclawhub Free
0 Downloads
0 Stars
0 Installs
0 Score
High Signal

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

⬇ 0 downloads β˜… 0 stars Unverified but indexed

Install for OpenClaw

Quick setup
  1. Download the package from Yavira.
  2. Extract the archive and review SKILL.md first.
  3. Import or place the package into your OpenClaw setup.

Requirements

Target platform
OpenClaw
Install method
Manual import
Extraction
Extract archive
Prerequisites
OpenClaw
Primary doc
SKILL.md

Package facts

Download mode
Yavira redirect
Package format
ZIP package
Source platform
Tencent SkillHub
What's included
README.md, SKILL.md

Validation

  • Use the Yavira download entry.
  • Review SKILL.md after the package is downloaded.
  • Confirm the extracted package contains the expected setup assets.

Install with your agent

Agent handoff

Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.

  1. Download the package from Yavira.
  2. Extract it into a folder your agent can access.
  3. Paste one of the prompts below and point your agent at the extracted folder.
New install

I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.

Upgrade existing

I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.

Trust & source

Release facts

Source
Tencent SkillHub
Verification
Indexed source record
Version
1.0.0

Documentation

ClawHub primary doc Primary doc: SKILL.md 24 sections Open source page

Event Store

Guide to designing event stores for event-sourced applications β€” covering event schemas, projections, snapshotting, and CQRS integration.

When to Use This Skill

Designing event sourcing infrastructure Choosing between event store technologies Implementing custom event stores Building projections from event streams Adding snapshotting for aggregate performance Integrating CQRS with event sourcing

Event Store Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Event Store β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Stream 1 β”‚ β”‚ Stream 2 β”‚ β”‚ Stream 3 β”‚ β”‚ β”‚ β”‚ (Aggregate) β”‚ β”‚ (Aggregate) β”‚ β”‚ (Aggregate) β”‚ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ Event 1 β”‚ β”‚ Event 1 β”‚ β”‚ Event 1 β”‚ β”‚ β”‚ β”‚ Event 2 β”‚ β”‚ Event 2 β”‚ β”‚ Event 2 β”‚ β”‚ β”‚ β”‚ Event 3 β”‚ β”‚ ... β”‚ β”‚ Event 3 β”‚ β”‚ β”‚ β”‚ ... β”‚ β”‚ β”‚ β”‚ Event 4 β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ Global Position: 1 β†’ 2 β†’ 3 β†’ 4 β†’ 5 β†’ 6 β†’ ... β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Event Store Requirements

RequirementDescriptionAppend-onlyEvents are immutable, only appendsOrderedPer-stream and global orderingVersionedOptimistic concurrency controlSubscriptionsReal-time event notificationsIdempotentHandle duplicate writes safely

Technology Comparison

TechnologyBest ForLimitationsEventStoreDBPure event sourcingSingle-purposePostgreSQLExisting Postgres stackManual implementationKafkaHigh-throughput streamsNot ideal for per-stream queriesDynamoDBServerless, AWS-nativeQuery limitations

Event Schema Design

Events are the source of truth. Well-designed schemas ensure long-term evolvability.

Event Envelope Structure

{ "event_id": "uuid", "stream_id": "Order-abc123", "event_type": "OrderPlaced", "version": 1, "schema_version": 1, "data": { "customer_id": "cust-1", "total_cents": 5000 }, "metadata": { "correlation_id": "req-xyz", "causation_id": "evt-prev", "user_id": "user-1", "timestamp": "2025-01-15T10:30:00Z" }, "global_position": 42 }

Schema Evolution Rules

Add fields freely β€” new optional fields are always safe Never remove or rename fields β€” introduce a new event type instead Version event types β€” OrderPlacedV2 when the schema changes materially Upcast on read β€” transform old versions to the current shape in the deserializer

PostgreSQL Event Store Schema

CREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_stream_version UNIQUE (stream_id, version) ); CREATE INDEX idx_events_stream ON events(stream_id, version); CREATE INDEX idx_events_global ON events(global_position); CREATE INDEX idx_events_type ON events(event_type); CREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() );

Event Store Implementation

@dataclass class Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: int | None = None global_position: int | None = None class EventStore: # backed by PostgreSQL schema above def __init__(self, pool: asyncpg.Pool): self.pool = pool async def append(self, stream_id: str, stream_type: str, events: list[Event], expected_version: int | None = None) -> list[Event]: """Append events with optimistic concurrency control.""" async with self.pool.acquire() as conn: async with conn.transaction(): if expected_version is not None: current = await conn.fetchval( "SELECT MAX(version) FROM events " "WHERE stream_id = $1", stream_id ) or 0 if current != expected_version: raise ConcurrencyError( f"Expected {expected_version}, got {current}" ) start = await conn.fetchval( "SELECT COALESCE(MAX(version), 0) + 1 " "FROM events WHERE stream_id = $1", stream_id ) for i, evt in enumerate(events): evt.version = start + i row = await conn.fetchrow( "INSERT INTO events (id, stream_id, stream_type, " "event_type, event_data, metadata, version) " "VALUES ($1,$2,$3,$4,$5,$6,$7) " "RETURNING global_position", evt.event_id, stream_id, stream_type, evt.event_type, json.dumps(evt.data), json.dumps(evt.metadata), evt.version, ) evt.global_position = row["global_position"] return events async def read_stream(self, stream_id: str, from_version: int = 0) -> list[Event]: """Read events for a single stream.""" async with self.pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM events WHERE stream_id = $1 " "AND version >= $2 ORDER BY version", stream_id, from_version, ) return [self._to_event(r) for r in rows] async def read_all(self, from_position: int = 0, limit: int = 1000) -> list[Event]: """Read global event stream for projections / subscriptions.""" async with self.pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM events WHERE global_position > $1 " "ORDER BY global_position LIMIT $2", from_position, limit, ) return [self._to_event(r) for r in rows]

Projections

Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.

Projection Lifecycle

Start from checkpoint β€” resume from last processed global position Apply events β€” update the read model for each relevant event type Save checkpoint β€” persist the new position atomically with the read model

Projection Example

class OrderSummaryProjection: def __init__(self, db, event_store: EventStore): self.db = db self.store = event_store async def run(self, batch_size: int = 100): position = await self._load_checkpoint() while True: events = await self.store.read_all(position, batch_size) if not events: await asyncio.sleep(1) continue for evt in events: await self._apply(evt) position = evt.global_position await self._save_checkpoint(position) async def _apply(self, event: Event): match event.event_type: case "OrderPlaced": await self.db.execute( "INSERT INTO order_summaries (id, customer, total, status) " "VALUES ($1,$2,$3,'placed')", event.data["order_id"], event.data["customer_id"], event.data["total_cents"], ) case "OrderShipped": await self.db.execute( "UPDATE order_summaries SET status='shipped' " "WHERE id=$1", event.data["order_id"], )

Projection Design Rules

Idempotent handlers β€” replaying the same event twice must not corrupt state One projection per read model β€” keep projections focused Rebuild from scratch β€” projections should be deletable and fully replayable Separate storage β€” projections can live in different databases (Postgres, Elasticsearch, Redis)

Snapshotting

Snapshots accelerate aggregate rehydration by caching state at a known version. Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).

Snapshot Flow

class SnapshottedRepository: def __init__(self, event_store: EventStore, pool): self.store = event_store self.pool = pool async def load(self, stream_id: str) -> Aggregate: # 1. Try loading snapshot snap = await self._load_snapshot(stream_id) from_version = 0 aggregate = Aggregate(stream_id) if snap: aggregate.restore(snap["data"]) from_version = snap["version"] + 1 # 2. Replay events after snapshot events = await self.store.read_stream(stream_id, from_version) for evt in events: aggregate.apply(evt) # 3. Snapshot if too many events replayed if len(events) > 50: await self._save_snapshot( stream_id, aggregate.snapshot(), aggregate.version ) return aggregate

CQRS Integration

CQRS separates the write model (commands β†’ events) from the read model (projections). Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API (write) (domain) (append) (build) (read)

Key Principles

Write side validates commands, emits events, enforces invariants Read side subscribes to events, builds optimised query models Eventual consistency β€” reads may lag behind writes by milliseconds to seconds Independent scaling β€” scale reads and writes separately

Command Handler Pattern

class PlaceOrderHandler: def __init__(self, event_store: EventStore): self.store = event_store async def handle(self, cmd: PlaceOrderCommand): # Load aggregate from events events = await self.store.read_stream(f"Order-{cmd.order_id}") order = Order.reconstitute(events) # Execute command β€” validates and produces new events new_events = order.place(cmd.customer_id, cmd.items) # Persist with concurrency check await self.store.append( f"Order-{cmd.order_id}", "Order", new_events, expected_version=order.version, )

EventStoreDB Integration

from esdbclient import EventStoreDBClient, NewEvent, StreamState import json client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false") def append_events(stream_name: str, events: list, expected_revision=None): new_events = [ NewEvent( type=event['type'], data=json.dumps(event['data']).encode(), metadata=json.dumps(event.get('metadata', {})).encode() ) for event in events ] state = (StreamState.ANY if expected_revision is None else StreamState.NO_STREAM if expected_revision == -1 else expected_revision) return client.append_to_stream(stream_name, new_events, current_version=state) def read_stream(stream_name: str, from_revision: int = 0): return [ {'type': e.type, 'data': json.loads(e.data), 'stream_position': e.stream_position} for e in client.get_stream(stream_name, stream_position=from_revision) ] # Category projection: read all events for Order-* streams def read_category(category: str): return read_stream(f"$ce-{category}")

DynamoDB Event Store

import boto3 from boto3.dynamodb.conditions import Key from datetime import datetime import json, uuid class DynamoEventStore: def __init__(self, table_name: str): self.table = boto3.resource('dynamodb').Table(table_name) def append(self, stream_id: str, events: list, expected_version: int = 0): with self.table.batch_writer() as batch: for i, event in enumerate(events): version = expected_version + i + 1 batch.put_item(Item={ 'PK': f"STREAM#{stream_id}", 'SK': f"VERSION#{version:020d}", 'GSI1PK': 'EVENTS', 'GSI1SK': datetime.utcnow().isoformat(), 'event_id': str(uuid.uuid4()), 'event_type': event['type'], 'event_data': json.dumps(event['data']), 'version': version, }) def read_stream(self, stream_id: str, from_version: int = 0): resp = self.table.query( KeyConditionExpression= Key('PK').eq(f"STREAM#{stream_id}") & Key('SK').gte(f"VERSION#{from_version:020d}") ) return [ {'event_type': item['event_type'], 'data': json.loads(item['event_data']), 'version': item['version']} for item in resp['Items'] ] DynamoDB table design: PK=STREAM#{id}, SK=VERSION#{version}, GSI1 for global ordering.

Do

Name streams {Type}-{id} β€” e.g., Order-abc123 Include correlation / causation IDs in metadata for tracing Version event schemas from day one β€” plan for evolution Implement idempotent writes β€” use event IDs for deduplication Index for your query patterns β€” stream, global position, event type

Don't

Mutate or delete events β€” they are immutable facts Store large payloads β€” keep events small; reference blobs externally Skip optimistic concurrency β€” prevents data corruption Ignore backpressure β€” handle slow consumers gracefully Couple projections to the write model β€” projections should be independently deployable

NEVER Do

NEVER update or delete events β€” Events are immutable historical facts; create compensating events instead NEVER skip version checks on append β€” Optimistic concurrency prevents lost updates and corruption NEVER embed large blobs in events β€” Store blobs externally, reference by ID in the event NEVER use random UUIDs for event IDs without idempotency checks β€” Retries create duplicates NEVER read projections for command validation β€” Use the event stream as the source of truth NEVER couple projections to the write transaction β€” Projections must be rebuildable independently

Category context

Agent frameworks, memory systems, reasoning layers, and model-native orchestration.

Source: Tencent SkillHub

Largest current source with strong distribution and engagement signals.

Package contents

Included in package
2 Docs
  • SKILL.md Primary doc
  • README.md Docs