โ† All skills
Tencent SkillHub ยท Developer Tools

Microservice Patterns

Provides guidance and patterns for decomposing monoliths, inter-service communication, data management, and resilience in microservice architectures.

skill openclawclawhub Free
0 Downloads
0 Stars
0 Installs
0 Score
High Signal

Provides guidance and patterns for decomposing monoliths, inter-service communication, data management, and resilience in microservice architectures.

โฌ‡ 0 downloads โ˜… 0 stars Unverified but indexed

Install for OpenClaw

Quick setup
  1. Download the package from Yavira.
  2. Extract the archive and review SKILL.md first.
  3. Import or place the package into your OpenClaw setup.

Requirements

Target platform
OpenClaw
Install method
Manual import
Extraction
Extract archive
Prerequisites
OpenClaw
Primary doc
SKILL.md

Package facts

Download mode
Yavira redirect
Package format
ZIP package
Source platform
Tencent SkillHub
What's included
README.md, SKILL.md

Validation

  • Use the Yavira download entry.
  • Review SKILL.md after the package is downloaded.
  • Confirm the extracted package contains the expected setup assets.

Install with your agent

Agent handoff

Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.

  1. Download the package from Yavira.
  2. Extract it into a folder your agent can access.
  3. Paste one of the prompts below and point your agent at the extracted folder.
New install

I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.

Upgrade existing

I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.

Trust & source

Release facts

Source
Tencent SkillHub
Verification
Indexed source record
Version
1.0.0

Documentation

ClawHub primary doc Primary doc: SKILL.md 17 sections Open source page

WHAT

Patterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.

WHEN

Decomposing a monolith into microservices Designing service boundaries and contracts Implementing inter-service communication Managing distributed transactions Building resilient distributed systems

KEYWORDS

microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS

Decision Framework: When to Use Microservices

If you have...Then...Small team (<5 devs), simple domainStart with monolithNeed independent deployment/scalingConsider microservicesMultiple teams, clear domain boundariesMicroservices work wellTight deadlines, unknown requirementsMonolith first, extract later Rule of thumb: If you can't define clear service boundaries, you're not ready for microservices.

Pattern 1: By Business Capability

Organize services around business functions, not technical layers. E-commerce Example: โ”œโ”€โ”€ order-service # Order lifecycle โ”œโ”€โ”€ payment-service # Payment processing โ”œโ”€โ”€ inventory-service # Stock management โ”œโ”€โ”€ shipping-service # Fulfillment โ””โ”€โ”€ notification-service # Emails, SMS

Pattern 2: Strangler Fig (Monolith Migration)

Gradually extract from monolith without big-bang rewrites. 1. Identify bounded context to extract 2. Create new microservice 3. Route new traffic to microservice 4. Gradually migrate existing functionality 5. Remove from monolith when complete # API Gateway routing during migration async def route_orders(request): if request.path.startswith("/api/orders/v2"): return await new_order_service.forward(request) else: return await legacy_monolith.forward(request)

Pattern 1: Synchronous (REST/gRPC)

Use for: Queries, when you need immediate response. import httpx from tenacity import retry, stop_after_attempt, wait_exponential class ServiceClient: def __init__(self, base_url: str): self.base_url = base_url self.client = httpx.AsyncClient(timeout=5.0) @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) async def get(self, path: str): """GET with automatic retries.""" response = await self.client.get(f"{self.base_url}{path}") response.raise_for_status() return response.json() # Usage payment_client = ServiceClient("http://payment-service:8001") result = await payment_client.get(f"/payments/{payment_id}")

Pattern 2: Asynchronous (Events)

Use for: Commands, when eventual consistency is acceptable. from aiokafka import AIOKafkaProducer import json @dataclass class DomainEvent: event_id: str event_type: str aggregate_id: str occurred_at: datetime data: dict class EventBus: def __init__(self, bootstrap_servers: List[str]): self.producer = AIOKafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode() ) async def publish(self, event: DomainEvent): await self.producer.send_and_wait( event.event_type, # Topic = event type value=asdict(event), key=event.aggregate_id.encode() ) # Order service publishes await event_bus.publish(DomainEvent( event_id=str(uuid.uuid4()), event_type="OrderCreated", aggregate_id=order.id, occurred_at=datetime.now(), data={"order_id": order.id, "customer_id": order.customer_id} )) # Inventory service subscribes and reacts async def handle_order_created(event_data: dict): order_id = event_data["data"]["order_id"] items = event_data["data"]["items"] await reserve_inventory(order_id, items)

When to Use Each

SynchronousAsynchronousNeed immediate responseFire-and-forgetSimple query/responseLong-running operationsLow latency requiredDecoupling is priorityTight coupling acceptableEventual consistency OK

Database Per Service

Each service owns its data. No shared databases. order-service โ†’ orders_db (PostgreSQL) payment-service โ†’ payments_db (PostgreSQL) product-service โ†’ products_db (MongoDB) analytics-service โ†’ analytics_db (ClickHouse)

Saga Pattern (Distributed Transactions)

For operations spanning multiple services that need rollback capability. class SagaStep: def __init__(self, name: str, action: Callable, compensation: Callable): self.name = name self.action = action self.compensation = compensation class OrderFulfillmentSaga: def __init__(self): self.steps = [ SagaStep("create_order", self.create_order, self.cancel_order), SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory), SagaStep("process_payment", self.process_payment, self.refund_payment), SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation), ] async def execute(self, order_data: dict) -> SagaResult: completed_steps = [] context = {"order_data": order_data} for step in self.steps: try: result = await step.action(context) if not result.success: await self.compensate(completed_steps, context) return SagaResult(status="failed", error=result.error) completed_steps.append(step) context.update(result.data) except Exception as e: await self.compensate(completed_steps, context) return SagaResult(status="failed", error=str(e)) return SagaResult(status="completed", data=context) async def compensate(self, completed_steps: List[SagaStep], context: dict): """Execute compensating actions in reverse order.""" for step in reversed(completed_steps): try: await step.compensation(context) except Exception as e: # Log but continue compensating logger.error(f"Compensation failed for {step.name}: {e}")

Circuit Breaker

Fail fast when a service is down. Prevents cascade failures. from enum import Enum from datetime import datetime, timedelta class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing recovery class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: int = 30, success_threshold: int = 2 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold self.failure_count = 0 self.success_count = 0 self.state = CircuitState.CLOSED self.opened_at = None async def call(self, func: Callable, *args, **kwargs): if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise CircuitBreakerOpen("Service unavailable") try: result = await func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.success_count = 0 def _on_failure(self): self.failure_count += 1 if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN self.opened_at = datetime.now() def _should_attempt_reset(self) -> bool: return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout) # Usage breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) async def call_payment_service(data: dict): return await breaker.call(payment_client.post, "/payments", json=data)

Retry with Exponential Backoff

For transient failures. from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)) ) async def fetch_user(user_id: str): response = await client.get(f"/users/{user_id}") response.raise_for_status() return response.json()

Bulkhead

Isolate resources to limit impact of failures. import asyncio class Bulkhead: def __init__(self, max_concurrent: int): self.semaphore = asyncio.Semaphore(max_concurrent) async def call(self, func: Callable, *args, **kwargs): async with self.semaphore: return await func(*args, **kwargs) # Limit concurrent calls to each service payment_bulkhead = Bulkhead(max_concurrent=10) inventory_bulkhead = Bulkhead(max_concurrent=20) result = await payment_bulkhead.call(payment_service.charge, amount)

API Gateway Pattern

Single entry point for all clients. from fastapi import FastAPI, Depends, HTTPException from circuitbreaker import circuit app = FastAPI() class APIGateway: def __init__(self): self.clients = { "orders": httpx.AsyncClient(base_url="http://order-service:8000"), "payments": httpx.AsyncClient(base_url="http://payment-service:8001"), "inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"), } @circuit(failure_threshold=5, recovery_timeout=30) async def forward(self, service: str, path: str, **kwargs): client = self.clients[service] response = await client.request(**kwargs, url=path) response.raise_for_status() return response.json() async def aggregate(self, order_id: str) -> dict: """Aggregate data from multiple services.""" results = await asyncio.gather( self.forward("orders", f"/orders/{order_id}", method="GET"), self.forward("payments", f"/payments/order/{order_id}", method="GET"), self.forward("inventory", f"/reservations/order/{order_id}", method="GET"), return_exceptions=True ) return { "order": results[0] if not isinstance(results[0], Exception) else None, "payment": results[1] if not isinstance(results[1], Exception) else None, "inventory": results[2] if not isinstance(results[2], Exception) else None, } gateway = APIGateway() @app.get("/api/orders/{order_id}") async def get_order_aggregate(order_id: str): return await gateway.aggregate(order_id)

Health Checks

Every service needs liveness and readiness probes. @app.get("/health/live") async def liveness(): """Is the process running?""" return {"status": "alive"} @app.get("/health/ready") async def readiness(): """Can we serve traffic?""" checks = { "database": await check_database(), "cache": await check_redis(), } all_healthy = all(checks.values()) status = "ready" if all_healthy else "not_ready" return {"status": status, "checks": checks}

NEVER

Shared Databases: Creates tight coupling, defeats the purpose Synchronous Chains: A โ†’ B โ†’ C โ†’ D = fragile, slow No Circuit Breakers: One service down takes everything down Distributed Monolith: Services that must deploy together Ignoring Network Failures: Assume the network WILL fail No Compensation Logic: Can't undo failed distributed transactions Starting with Microservices: Always start with a well-structured monolith Chatty Services: Too many inter-service calls = latency death

Category context

Code helpers, APIs, CLIs, browser automation, testing, and developer operations.

Source: Tencent SkillHub

Largest current source with strong distribution and engagement signals.

Package contents

Included in package
2 Docs
  • SKILL.md Primary doc
  • README.md Docs