Requirements
- Target platform
- OpenClaw
- Install method
- Manual import
- Extraction
- Extract archive
- Prerequisites
- OpenClaw
- Primary doc
- SKILL.md
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.
I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.
I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.
Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.
npx clawhub@latest install dual-stream-architecture
Event-driven systems needing both durability AND real-time WebSocket/SSE backends that push live updates Dashboards showing events as they happen Kafka consumers have lag but users expect instant updates
type DualPublisher struct { kafka *kafka.Writer redis *redis.Client logger *slog.Logger } func (p *DualPublisher) Publish(ctx context.Context, event Event) error { // 1. Kafka: Critical path - must succeed payload, _ := json.Marshal(event) err := p.kafka.WriteMessages(ctx, kafka.Message{ Key: []byte(event.SourceID), Value: payload, }) if err != nil { return fmt.Errorf("kafka publish failed: %w", err) } // 2. Redis: Best-effort - don't fail the operation p.publishToRedis(ctx, event) return nil } func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) { // Lightweight payload (full event in Kafka) notification := map[string]interface{}{ "id": event.ID, "type": event.Type, "source_id": event.SourceID, } payload, _ := json.Marshal(notification) channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID) // Fire and forget - log errors but don't propagate if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil { p.logger.Warn("redis publish failed", "error", err) } }
ββββββββββββββββ βββββββββββββββββββ ββββββββββββββββ β Ingester ββββββΆβ DualPublisher ββββββΆβ Kafka ββββΆ Event Processor β β β β β (durable) β ββββββββββββββββ β β ββββββββββββββββ β β ββββββββββββββββ β ββββββΆβ Redis PubSub ββββΆ WebSocket Gateway β β β (real-time) β βββββββββββββββββββ ββββββββββββββββ
For high throughput: func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error { // 1. Batch to Kafka messages := make([]kafka.Message, len(events)) for i, event := range events { payload, _ := json.Marshal(event) messages[i] = kafka.Message{ Key: []byte(event.SourceID), Value: payload, } } if err := p.kafka.WriteMessages(ctx, messages...); err != nil { return fmt.Errorf("kafka batch failed: %w", err) } // 2. Redis: Pipeline for efficiency pipe := p.redis.Pipeline() for _, event := range events { channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID) notification, _ := json.Marshal(map[string]interface{}{ "id": event.ID, "type": event.Type, }) pipe.Publish(ctx, channel, notification) } if _, err := pipe.Exec(ctx); err != nil { p.logger.Warn("redis batch failed", "error", err) } return nil }
RequirementStreamWhyMust not lose eventKafka onlyAck required, replicatedUser sees immediatelyRedis onlySub-ms deliveryBoth durability + real-timeDual streamThis patternHigh volume (>10k/sec)Kafka, batch RedisRedis can bottleneckMany subscribers per channelRedis + local fan-outDon't hammer Redis
Meta-skill: ai/skills/meta/realtime-dashboard/ β Complete realtime dashboard guide websocket-hub-patterns β WebSocket gateway backend/service-layer-architecture β Service integration
NEVER fail on Redis errors β Redis is best-effort. Log and continue. NEVER send full payload to Redis β Send IDs only, clients fetch from API. NEVER create one Redis channel per event β Use source-level channels. NEVER skip Kafka for "unimportant" events β All events go to Kafka for replay. NEVER use Redis Pub/Sub for persistence β Messages are fire-and-forget.
CaseSolutionRedis downLog warning, continue with Kafka onlyClient connects mid-streamQuery API for recent events, then subscribeHigh channel cardinalityUse wildcard patterns or aggregate channelsKafka backpressureBuffer in memory with timeout, fail if fullNeed event replayConsume from Kafka from offset, not Redis
Workflow acceleration for inboxes, docs, calendars, planning, and execution loops.
Largest current source with strong distribution and engagement signals.