โ† All skills
Tencent SkillHub ยท Other

Data Engineering

Design and operate scalable data pipelines and architectures using best-fit patterns, tools, and modeling methodologies without external dependencies.

skill openclawclawhub Free
0 Downloads
0 Stars
0 Installs
0 Score
High Signal

Design and operate scalable data pipelines and architectures using best-fit patterns, tools, and modeling methodologies without external dependencies.

โฌ‡ 0 downloads โ˜… 0 stars Unverified but indexed

Install for OpenClaw

Quick setup
  1. Download the package from Yavira.
  2. Extract the archive and review SKILL.md first.
  3. Import or place the package into your OpenClaw setup.

Requirements

Target platform
OpenClaw
Install method
Manual import
Extraction
Extract archive
Prerequisites
OpenClaw
Primary doc
SKILL.md

Package facts

Download mode
Yavira redirect
Package format
ZIP package
Source platform
Tencent SkillHub
What's included
README.md, SKILL.md

Validation

  • Use the Yavira download entry.
  • Review SKILL.md after the package is downloaded.
  • Confirm the extracted package contains the expected setup assets.

Install with your agent

Agent handoff

Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.

  1. Download the package from Yavira.
  2. Extract it into a folder your agent can access.
  3. Paste one of the prompts below and point your agent at the extracted folder.
New install

I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.

Upgrade existing

I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.

Trust & source

Release facts

Source
Tencent SkillHub
Verification
Indexed source record
Version
1.0.0

Documentation

ClawHub primary doc Primary doc: SKILL.md 47 sections Open source page

Data Engineering Command Center

Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies โ€” pure agent skill.

Phase 1: Data Architecture Assessment

Before building anything, understand the landscape.

Architecture Brief

project_name: "" business_context: "" data_consumers: - team: "" use_case: "" # analytics | ML | operational | reporting | reverse-ETL latency_requirement: "" # real-time (<1s) | near-real-time (<5min) | batch (hourly+) query_pattern: "" # ad-hoc | scheduled | API | dashboard current_state: sources: [] # list every system producing data storage: [] # where data lives today pain_points: [] # what's broken, slow, unreliable data_volume: current_gb_per_day: 0 growth_rate_percent: 0 retention_months: 0 constraints: budget_monthly_usd: 0 team_size: 0 skill_level: "" # junior | mid | senior | mixed compliance: [] # GDPR, HIPAA, SOX, PCI, none cloud_provider: "" # AWS | GCP | Azure | multi | on-prem

Architecture Pattern Decision Matrix

SignalPatternWhen to UseAll consumers need data hourly+Batch ETLReporting, warehousing, most analyticsSome need <5 min latencyMicro-batchDashboard freshness, near-real-time analyticsEvents need <1s processingStreamingFraud detection, real-time pricing, alertsNeed both batch + streamingLambdaWhen batch accuracy + real-time speed both matterWant to simplify LambdaKappaWhen you can reprocess from stream replayData lake + warehouse combinedLakehouseWhen you need both cheap storage + fast SQLSources change independentlyData MeshLarge orgs, domain-owned data, >5 teamsML is primary consumerFeature StoreML-heavy orgs with feature reuse needs

Technology Selection Guide

Orchestration ToolBest ForAvoid WhenAirflowComplex DAGs, Python-native teams, mature ecosystemSimple pipelines (<5 tasks)DagsterSoftware-defined assets, strong typing, dev experienceLegacy team resistant to new paradigmsPrefectDynamic workflows, cloud-native, Python-firstNeed on-prem with no cloud dependencydbtSQL transformations, ELT, analytics engineeringNon-SQL transforms, streamingTemporalLong-running workflows, retry-heavy, microservicesSimple ETL, small teamsCron + scripts<3 pipelines, solo engineer, simple schedulesAnything with dependencies or retries Processing ToolBest ForAvoid WhenSpark>100GB, complex transforms, ML pipelines<10GB (overkill), real-time streamingDuckDBLocal analytics, <100GB, SQL on filesDistributed processing, production streamingPolarsSingle-node, Rust-speed, <50GB, DataFramesDistributed, need Spark ecosystemPandas<1GB, quick analysis, prototypingProduction pipelines, anything >5GBFlinkTrue streaming, event-time processingBatch-only, small team (steep learning curve)SQL (warehouse)ELT in Snowflake/BigQuery/RedshiftComplex ML transforms, binary data Storage ToolBest ForAvoid WhenSnowflakeAnalytics, separation of compute/storage, multi-cloudTight budget, real-time OLTPBigQueryGCP-native, serverless, large-scale analyticsMulti-cloud, need fine-grained cost controlRedshiftAWS-native, existing AWS ecosystemElastic scaling needs, multi-cloudDatabricksML + analytics unified, Spark-native, lakehousePure SQL analytics, small dataPostgreSQLOLTP + light analytics, <500GB, budget-conscious>1TB analytics, real-time dashboards on large dataS3/GCS/ADLSRaw data lake, cheap storage, any formatDirect SQL queries (need compute layer)Delta Lake/IcebergTable format on data lake, ACID on filesSimple file storage, no lakehouse need

Modeling Methodology Decision

ApproachBest ForKey ConceptKimball (Dimensional)BI/reporting, star schemasFacts + Dimensions, business-process-centricInmon (3NF)Enterprise data warehouse, single source of truthNormalized, subject-area-centricData Vault 2.0Agile warehousing, auditability, multiple sourcesHubs + Links + Satellites, insert-onlyOne Big Table (OBT)Simple analytics, few joins, dashboard performancePre-joined, denormalized, fast queriesActivity SchemaEvent analytics, product analyticsEntity + Activity + Feature columns

Dimensional Model Template

fact_table: name: "fact_[business_process]" grain: "" # one row = one [what]? grain_statement: "One row per [transaction/event/snapshot] at [time grain]" measures: - name: "" type: "" # additive | semi-additive | non-additive aggregation: "" # SUM | AVG | COUNT | MIN | MAX | COUNT DISTINCT business_definition: "" degenerate_dimensions: [] # IDs stored in fact (order_number, invoice_id) foreign_keys: [] # links to dimension tables dimensions: - name: "dim_[entity]" type: "" # Type 1 (overwrite) | Type 2 (history) | Type 3 (previous value) natural_key: "" # business key from source surrogate_key: "" # warehouse-generated key attributes: - name: "" source: "" scd_type: "" # 1 | 2 | 3 hierarchy: [] # e.g., [country, region, city, store]

SCD Type Decision Guide

ScenarioSCD TypeImplementationDon't care about historyType 1UPDATE in placeNeed full historyType 2New row + valid_from/valid_to + is_current flagOnly need previous valueType 3Add previous_[column]Track changes with timestampsType 4Mini-dimension (history table)Hybrid: some attrs Type 1, some Type 2Type 6Combine 1+2+3 in one table Default recommendation: Type 2 for anything business-critical (customer status, product price, employee department). Type 1 for everything else.

Naming Conventions

ObjectConventionExampleRaw/staging tablesraw_[source]_[table]raw_stripe_paymentsStaging modelsstg_[source]__[entity]stg_stripe__paymentsIntermediate modelsint_[entity]_[verb]int_orders_pivotedMart/fact tablesfct_[business_process]fct_ordersDimension tablesdim_[entity]dim_customersMetrics/aggregatesmrt_[domain]_[metric]mrt_sales_dailySnapshotssnp_[entity]_[grain]snp_inventory_dailyColumns: booleanis_[state] or has_[thing]is_active, has_subscriptionColumns: timestamp[event]_atcreated_at, shipped_atColumns: date[event]_dateorder_dateColumns: ID[entity]_idcustomer_idColumns: amount[thing]_amountorder_amountColumns: count[thing]_countline_item_count

Universal Pipeline Template

pipeline: name: "" owner: "" schedule: "" # cron expression sla_minutes: 0 # max acceptable runtime tier: "" # 1 (critical) | 2 (important) | 3 (nice-to-have) extract: source_system: "" connection: "" strategy: "" # full | incremental | CDC | log-based incremental_key: "" # column for incremental (e.g., updated_at) watermark_storage: "" # where to persist last-extracted position transform: engine: "" # SQL | Spark | Python | dbt stages: - name: "clean" operations: [] # dedupe, null handling, type casting, trimming - name: "conform" operations: [] # standardize codes, currencies, timezones - name: "enrich" operations: [] # lookups, calculations, derived fields - name: "aggregate" operations: [] # rollups, pivots, window functions load: target_system: "" strategy: "" # append | upsert | merge | truncate-reload | partition-swap merge_keys: [] partition_key: "" clustering_keys: [] quality_gates: pre_load: [] # checks before writing post_load: [] # checks after writing error_handling: strategy: "" # fail-fast | dead-letter | retry | skip-and-alert max_retries: 3 retry_delay_seconds: 300 alert_channels: []

Extraction Strategy Decision Tree

Is the source database? โ”œโ”€โ”€ Yes โ†’ Does it support CDC? โ”‚ โ”œโ”€โ”€ Yes โ†’ Use CDC (Debezium, AWS DMS, Fivetran) โ”‚ โ”‚ Best for: high-volume, low-latency, minimal source impact โ”‚ โ””โ”€โ”€ No โ†’ Does it have a reliable updated_at column? โ”‚ โ”œโ”€โ”€ Yes โ†’ Incremental extraction on updated_at โ”‚ โ”‚ โš ๏ธ Won't catch hard deletes โ€” add periodic full reconciliation โ”‚ โ””โ”€โ”€ No โ†’ Full extraction โ”‚ Only viable for small tables (<1M rows) โ”œโ”€โ”€ Is it an API? โ”‚ โ”œโ”€โ”€ Supports webhooks? โ†’ Event-driven ingestion โ”‚ โ”œโ”€โ”€ Has cursor/pagination? โ†’ Incremental with cursor bookmark โ”‚ โ””โ”€โ”€ No pagination? โ†’ Full pull with rate-limit handling โ”œโ”€โ”€ Is it files (S3, SFTP, email)? โ”‚ โ””โ”€โ”€ Event-triggered (S3 notification, file watcher) โ”‚ Validate: schema, completeness, filename pattern โ””โ”€โ”€ Is it streaming (Kafka, Kinesis, Pub/Sub)? โ””โ”€โ”€ Consumer group with offset management Key decisions: at-least-once vs exactly-once, consumer lag alerting

Load Strategy Decision

StrategyWhenTrade-offAppendEvent/log data, immutable factsSimple but grows forever โ€” partition + retainUpsert/MergeDimension updates, SCD Type 1Handles updates but slower on large tablesTruncate-ReloadSmall tables (<1M), reference dataSimple but window of missing dataPartition SwapLarge fact tables, daily loadsAtomic, fast, but needs partition alignmentSoft DeleteNeed audit trail of deletionsAdds complexity to every downstream query

Idempotency Rules (NON-NEGOTIABLE)

Every pipeline MUST be re-runnable without side effects: Use MERGE/UPSERT, never blind INSERT for mutable data Partition-swap for immutable data โ€” drop partition + reload Store watermarks externally โ€” not in the pipeline code Dedup at ingestion โ€” use source natural keys Test by running twice โ€” output must be identical both times

Quality Dimensions

DimensionDefinitionExample CheckCompletenessNo missing values where requiredNOT NULL on required fields, row count within rangeUniquenessNo unexpected duplicatesPrimary key uniqueness, natural key uniquenessValidityValues within expected domainEnum checks, range checks, regex patternsAccuracyData matches real-world truthCross-system reconciliation, manual spot checksFreshnessData arrives on timeMAX(loaded_at) > NOW() - INTERVAL '2 hours'ConsistencySame data agrees across systemsSum reconciliation between source and target

Quality Check Templates

-- Completeness: Required fields not null SELECT COUNT(*) AS null_violations FROM {table} WHERE {required_column} IS NULL; -- Threshold: 0 -- Uniqueness: No duplicate primary keys SELECT {pk_column}, COUNT(*) AS dupe_count FROM {table} GROUP BY {pk_column} HAVING COUNT(*) > 1; -- Threshold: 0 -- Freshness: Data arrived within SLA SELECT CASE WHEN MAX({timestamp_col}) > CURRENT_TIMESTAMP - INTERVAL '{sla_hours} hours' THEN 'PASS' ELSE 'FAIL' END AS freshness_check FROM {table}; -- Volume: Row count within expected range SELECT CASE WHEN COUNT(*) BETWEEN {min_expected} AND {max_expected} THEN 'PASS' ELSE 'FAIL' END AS volume_check FROM {table} WHERE {partition_col} = '{run_date}'; -- Referential: FK integrity SELECT COUNT(*) AS orphan_count FROM {fact_table} f LEFT JOIN {dim_table} d ON f.{fk} = d.{pk} WHERE d.{pk} IS NULL; -- Threshold: 0 -- Distribution: No unexpected skew SELECT {column}, COUNT(*) AS cnt, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct FROM {table} GROUP BY {column} ORDER BY cnt DESC; -- Alert if any single value > {max_pct}% -- Cross-system reconciliation SELECT (SELECT SUM(amount) FROM source_system.orders WHERE date = '{date}') AS source_total, (SELECT SUM(amount) FROM warehouse.fct_orders WHERE order_date = '{date}') AS target_total, ABS(source_total - target_total) AS variance; -- Threshold: variance < 0.01 * source_total (1%)

Data Contract Template

contract: name: "" version: "" owner: "" # team responsible for producing this data consumers: [] # teams consuming this data sla: freshness_hours: 0 availability_percent: 99.9 support_hours: "" # business-hours | 24x7 schema: - column: "" type: "" nullable: false description: "" business_definition: "" pii: false checks: - type: "" # not_null | unique | range | enum | regex | custom params: {} breaking_change_policy: "" # notify-30-days | version-bump | never-break notification_channel: ""

Quality Severity Levels

LevelDefinitionResponseP0 โ€” CriticalData corruption, wrong numbers in production dashboards, compliance data wrongStop pipeline, alert immediately, rollback if possibleP1 โ€” HighMissing data for key reports, SLA breach, >5% of records affectedAlert team, fix within 4 hours, post-mortem requiredP2 โ€” MediumNon-critical field quality, <1% records affected, no downstream impactFix in next sprint, add monitoring to prevent recurrenceP3 โ€” LowCosmetic issues, edge cases, non-critical dataBacklog, fix when convenient

SQL Optimization Checklist

ProblemFixImpactFull table scanAdd/use partition pruning10-100x fasterLarge joinsPre-aggregate before joining5-50x fasterSELECT *Select only needed columns2-10x faster (columnar stores)Correlated subqueryRewrite as JOIN or window function10-100x fasterDISTINCT on large resultFix upstream duplication instead2-5x fasterORDER BY without LIMITAdd LIMIT or remove if not neededPrevents memory spillsString operations in WHEREPre-compute, use lookup tableEnables index usageMultiple passes over same dataCombine with CASE WHEN + GROUP BY2-5x fasterNOT IN with NULLsUse NOT EXISTS or LEFT JOIN IS NULLCorrectness + performance

Spark Optimization Guide

ProblemSolutionShuffle-heavy joinsBroadcast small table (broadcast(df)) if <100MBData skewSalt the skewed key: add random prefix, join on salted key, aggregateSmall filesCoalesce output: .coalesce(target_files) or use adaptive query executionToo many partitionsspark.sql.shuffle.partitions = 2-3x cluster coresOOM errorsIncrease spark.executor.memory, reduce partition size, spill to diskSlow writesUse Parquet with snappy, partition by date, avoid small writesRepeated computation.cache() or .persist() DataFrames used >1 timeComplex transformationsPush down predicates, filter early, select early

Partitioning Strategy

Data TypePartition KeyWhyTransactional/eventDate (daily or monthly)Most queries filter by time rangeMulti-tenantTenant ID + dateIsolate tenant queries, time-range pruningGeospatialRegion + dateRegional queries are commonLog dataDate + hourHigh volume needs finer partitionsReference/dimensionDon't partitionToo small, full scan is fine Rules: Target 100MB-1GB per partition (compressed) <10,000 total partitions per table Never partition on high-cardinality columns (user_id) Always include partition key in WHERE clauses

Data Classification

LevelExamplesControlsPublicProduct catalog, published statsNo restrictionsInternalAggregated metrics, non-PII analyticsAuth required, audit loggingConfidentialCustomer PII, financial records, HR dataEncryption, column-level access, maskingRestrictedSSN, payment cards, health records, passwordsEncryption at rest + transit, tokenization, audit every access, retention limits

PII Handling Rules

Identify: Scan all sources for PII columns (name, email, phone, SSN, DOB, address, IP) Classify: Tag each with sensitivity level Minimize: Only ingest PII you actually need Protect: Hash or tokenize in staging (SHA-256 with salt for pseudonymization) Dynamic masking for non-privileged users Column-level encryption for restricted data Retain: Auto-delete after retention period Audit: Log every query touching PII columns Right to delete: Build a deletion pipeline that propagates across all derived tables

Data Catalog Entry Template

dataset: name: "" description: "" owner_team: "" steward: "" # person responsible for quality domain: "" # sales | marketing | finance | product | engineering tier: "" # gold (trusted) | silver (cleaned) | bronze (raw) lineage: sources: [] # upstream datasets/systems transformations: "" # brief description of key transforms downstream: [] # who consumes this refresh: schedule: "" sla_hours: 0 last_successful_run: "" quality: tests: [] # list of quality checks last_score: 0 # 0-100 known_issues: [] access: classification: "" # public | internal | confidential | restricted pii_columns: [] access_request_process: "" # how to get access usage: avg_daily_queries: 0 top_consumers: [] cost_monthly_usd: 0

Pipeline Health Dashboard

dashboard: pipeline_metrics: - metric: "Pipeline Success Rate" formula: "successful_runs / total_runs * 100" target: ">99%" alert_threshold: "<95%" - metric: "Average Runtime" formula: "avg(end_time - start_time) over 7 days" target: "<SLA" alert_threshold: ">80% of SLA" - metric: "Data Freshness" formula: "NOW() - MAX(loaded_at)" target: "<SLA hours" alert_threshold: ">SLA" - metric: "Data Volume Variance" formula: "abs(today_rows - avg_7d_rows) / avg_7d_rows * 100" target: "<20%" alert_threshold: ">50%" - metric: "Quality Check Pass Rate" formula: "passed_checks / total_checks * 100" target: "100%" alert_threshold: "<95%" - metric: "Failed Pipeline Count" formula: "count where status = failed in last 24h" target: "0" alert_threshold: ">0" - metric: "Backfill Queue" formula: "count of pending backfill requests" target: "0" alert_threshold: ">5" - metric: "Infrastructure Cost" formula: "compute + storage + egress" target: "<budget" alert_threshold: ">110% budget"

Alert Severity

SeverityConditionResponse TimeExampleP0Revenue/compliance impacting15 minPayment pipeline down, regulatory report delayedP1Business-critical dashboard stale1 hourExecutive dashboard >4h staleP2Non-critical pipeline failed4 hoursMarketing attribution delayedP3Warning/degradationNext business dayPipeline 80% of SLA, minor quality drift

Structured Logging Standard

Every pipeline run MUST log: { "pipeline_name": "", "run_id": "", "started_at": "", "completed_at": "", "status": "success|failed|partial", "stage": "", "rows_extracted": 0, "rows_transformed": 0, "rows_loaded": 0, "rows_rejected": 0, "quality_checks_passed": 0, "quality_checks_failed": 0, "duration_seconds": 0, "error_message": "", "watermark_before": "", "watermark_after": "" }

Pipeline Test Pyramid

LayerWhat to TestHowWhenUnitIndividual transforms, business logicpytest with fixtures, dbt unit testsEvery PRIntegrationSource connectivity, schema compatibilityTest against staging/dev environmentDaily + PRContractSchema hasn't changed, data types stableSchema registry, contract testsEvery pipeline runData QualityCompleteness, uniqueness, freshness, validityQuality framework checksEvery pipeline runE2EFull pipeline produces correct outputGolden dataset comparisonWeekly + releasePerformanceRuntime within SLA, no regressionBenchmark against historical runsWeekly

dbt Testing Checklist

# For every model, define at minimum: models: - name: fct_orders columns: - name: order_id tests: - unique - not_null - name: customer_id tests: - not_null - relationships: to: ref('dim_customers') field: customer_id - name: order_amount tests: - not_null - dbt_utils.accepted_range: min_value: 0 max_value: 1000000 - name: order_status tests: - accepted_values: values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled'] - name: ordered_at tests: - not_null - dbt_utils.recency: datepart: day field: ordered_at interval: 2

Backfill Protocol

When you need to reprocess historical data: Scope: Define exact date range and affected tables Impact assessment: What downstream models/dashboards will be affected? Communication: Notify consumers of temporary data inconsistency Isolation: Run backfill in separate compute to avoid impacting current pipelines Validation: Compare row counts and key metrics pre/post backfill Execution: Process in reverse-chronological order (most recent first) Monitoring: Watch for resource spikes, duplicate creation Verification: Reconcile against source after completion Documentation: Log what was backfilled, why, and any anomalies found

Cloud Cost Reduction Strategies

StrategySavingsEffortRight-size compute (auto-scaling)20-40%LowUse spot/preemptible instances for batch60-80%MediumCompress data (Parquet + Snappy/Zstd)50-80% storageLowLifecycle policies (hot โ†’ warm โ†’ cold โ†’ archive)40-70% storageLowEliminate unused tables/pipelines10-30%LowOptimize query patterns (partition pruning)30-60% computeMediumReserved capacity for steady-state30-50%MediumCache expensive queries20-50% computeMedium

Cost Allocation Template

cost_tracking: by_pipeline: - pipeline: "" compute_monthly_usd: 0 storage_monthly_usd: 0 egress_monthly_usd: 0 total: 0 cost_per_row: 0 # total / rows_processed business_value: "" # what revenue/decision does this enable? roi_justified: true # is the cost worth it? optimization_opportunities: - description: "" estimated_savings_usd: 0 effort: "" # low | medium | high priority: 0 # 1 = do now

Cost Red Flags

Single pipeline >30% of total spend Cost per row increasing month-over-month Tables with 0 queries in 30 days Dev/staging environments running 24/7 Full table scans on >1TB tables Uncompressed data in cloud storage Cross-region data transfer

Pipeline Failure Triage

Pipeline failed โ†’ 1. Check error message in logs โ”œโ”€โ”€ Connection timeout โ†’ Check source availability, network, credentials โ”œโ”€โ”€ Schema mismatch โ†’ Source schema changed โ†’ update extract + notify โ”œโ”€โ”€ Data quality check failed โ†’ Investigate source data, check for anomalies โ”œโ”€โ”€ Out of memory โ†’ Increase resources or optimize query โ”œโ”€โ”€ Permission denied โ†’ Check IAM roles, token expiry โ”œโ”€โ”€ Duplicate key violation โ†’ Check idempotency, investigate source dupes โ””โ”€โ”€ Timeout (SLA breach) โ†’ Check data volume spike, query plan, cluster health 2. Determine impact โ”œโ”€โ”€ What dashboards/reports are affected? โ”œโ”€โ”€ What's the data freshness SLA? โ””โ”€โ”€ Who needs to be notified? 3. Fix โ”œโ”€โ”€ Transient (network, timeout) โ†’ Retry โ”œโ”€โ”€ Data issue โ†’ Fix source data, re-run with quality gate override if safe โ”œโ”€โ”€ Schema change โ†’ Update pipeline, backfill if needed โ””โ”€โ”€ Infrastructure โ†’ Scale up, file ticket with cloud provider 4. Post-fix โ”œโ”€โ”€ Verify data correctness โ”œโ”€โ”€ Update runbook with new failure mode โ””โ”€โ”€ Add monitoring/alerting to catch earlier next time

Schema Change Management

When a source system changes schema: Detect: Schema comparison check in extraction pipeline (hash schema, compare to registered) Classify: Additive (new column): Usually safe โ€” add to pipeline, backfill if needed Rename: Map old โ†’ new in transform, update downstream Type change: Assess compatibility, may need cast or historical rebuild Column removed: Critical โ€” breaks queries, need immediate attention Test: Run pipeline in dry-run mode with new schema Deploy: Update transforms, quality checks, documentation Communicate: Notify downstream consumers via data contract channel

Disaster Recovery

ScenarioRPORTORecovery StepsPipeline code lost0 (git)1hRedeploy from git, restore orchestrator stateWarehouse data corruptedVaries4hRestore from Time Travel/snapshot, re-run affected pipelinesSource system downN/AWaitQueue extractions, catch up with incremental once restoredCloud region outage24h8hFailover to DR region if configured, else waitCredential compromise02hRotate all credentials, audit access logs, re-run affected pipelines

Slowly Changing Dimension Type 2 (SQL Template)

-- Merge pattern for SCD Type 2 MERGE INTO dim_customer AS target USING ( SELECT * FROM stg_customers WHERE updated_at > (SELECT MAX(valid_from) FROM dim_customer) ) AS source ON target.customer_natural_key = source.customer_id AND target.is_current = TRUE -- Update: close old record WHEN MATCHED AND ( target.customer_name != source.name OR target.customer_status != source.status -- list all Type 2 tracked columns ) THEN UPDATE SET is_current = FALSE, valid_to = CURRENT_TIMESTAMP -- Insert: new record (both new customers and changed ones) WHEN NOT MATCHED THEN INSERT ( customer_natural_key, customer_name, customer_status, valid_from, valid_to, is_current ) VALUES ( source.customer_id, source.name, source.status, CURRENT_TIMESTAMP, '9999-12-31', TRUE ); -- Then insert new versions of changed records INSERT INTO dim_customer ( customer_natural_key, customer_name, customer_status, valid_from, valid_to, is_current ) SELECT customer_id, name, status, CURRENT_TIMESTAMP, '9999-12-31', TRUE FROM stg_customers s WHERE EXISTS ( SELECT 1 FROM dim_customer d WHERE d.customer_natural_key = s.customer_id AND d.is_current = FALSE AND d.valid_to = CURRENT_TIMESTAMP );

CDC with Debezium (Architecture Pattern)

Source DB โ†’ Debezium Connector โ†’ Kafka Topic โ†’ โ”œโ”€โ”€ Stream processor (Flink/Spark Streaming) โ†’ Target DB โ”œโ”€โ”€ S3 sink connector โ†’ Data Lake (raw) โ””โ”€โ”€ Elasticsearch sink โ†’ Search index Key decisions: Topic per table or single topic: Per table (easier routing, independent scaling) Schema registry: Always use (Confluent Schema Registry or AWS Glue) Serialization: Avro (compact + schema evolution) or Protobuf (strict + fast) Offset management: Connector manages; monitor consumer lag

Feature Store Pattern

feature_store: entity: "customer" entity_key: "customer_id" features: - name: "total_orders_30d" description: "Total orders in last 30 days" type: "INT" source: "fct_orders" computation: "batch" # batch | streaming | on-demand freshness: "daily" ttl_hours: 48 - name: "avg_order_value_90d" description: "Average order value last 90 days" type: "FLOAT" source: "fct_orders" computation: "batch" freshness: "daily" ttl_hours: 48 - name: "last_login_minutes_ago" description: "Minutes since last login event" type: "INT" source: "events_stream" computation: "streaming" freshness: "real-time" ttl_hours: 1 serving: online: true # low-latency feature serving (Redis/DynamoDB) offline: true # batch feature retrieval for training point_in_time_correct: true # prevent feature leakage in ML training

Data Mesh Principles

If operating at scale (>5 data teams): Domain ownership: Each business domain owns its data products (not central data team) Data as a product: Treat datasets like products โ€” SLAs, documentation, discoverability Self-serve platform: Central team builds the platform, domains build on top Federated governance: Standards and interoperability maintained centrally, implementation decentralized When NOT to use Data Mesh: <5 data producers/consumers Small team (<20 engineers total) Single business domain Early-stage company (over-engineering)

Quality Scoring Rubric (0-100)

DimensionWeightScoringPipeline Reliability200=frequent failures, 10=some failures with manual recovery, 20=99.5%+ success rate with auto-retryData Quality200=no checks, 10=basic null/unique checks, 20=comprehensive quality framework with contractsPerformance150=regularly breaches SLA, 8=meets SLA, 15=well under SLA with optimizationDocumentation100=none, 5=basic README, 10=full catalog entries with lineage and business definitionsMonitoring150=no alerts, 8=failure alerts only, 15=proactive monitoring with dashboards and anomaly detectionTesting100=no tests, 5=basic smoke tests, 10=full test pyramid (unit+integration+contract+E2E)Cost Efficiency100=no cost tracking, 5=tracked, 10=optimized with ROI justification per pipeline Scoring guide: 0-40: Critical gaps โ€” prioritize pipeline reliability and data quality 41-60: Functional but fragile โ€” add monitoring, testing, documentation 61-80: Solid โ€” optimize performance, cost, governance 81-100: Excellent โ€” maintain, innovate, mentor

Timezone Traps

Store everything in UTC. Convert only at presentation layer Event timestamps: use event time, not processing time Daylight saving: TIMESTAMP WITH TIME ZONE, never WITHOUT Late-arriving data: watermark strategy + allowed lateness window

Late-Arriving Data

Define maximum acceptable lateness per source Reprocess affected partitions when late data arrives Track late arrival rate as a quality metric Consider separate "late data" pipeline that patches in

Exactly-Once Processing

True exactly-once is expensive. Most systems need at-least-once + idempotent writes Use transaction IDs or natural keys for deduplication Kafka: use idempotent producer + transactional consumer Database: MERGE/UPSERT on natural key

Schema Evolution

Forward compatible: New code reads old data (safe to deploy new readers first) Backward compatible: Old code reads new data (safe to deploy new writers first) Full compatible: Both directions (safest, most restrictive) Use Avro or Protobuf with schema registry for streaming data

Multi-Tenant Data

Tenant ID in every table, every query, every log Row-level security in warehouse Separate compute per tenant (or at least isolation) Never join across tenants without explicit business reason Tenant-aware backfill (don't rebuild all tenants for one tenant's issue)

Data Lake Anti-Patterns

"Data Swamp": ingesting everything with no organization or catalog โ†’ only ingest what has a known consumer Small files: thousands of <1MB files โ†’ compact regularly (target 100MB-1GB) No table format: raw Parquet/CSV without Delta/Iceberg โ†’ loses ACID, schema evolution, time travel No access controls: single bucket, everyone admin โ†’ implement IAM per domain/team

Natural Language Commands

Say any of these to activate specific workflows: "Design a data pipeline for [source] to [target]" โ†’ Full pipeline template with extraction strategy, transforms, load pattern, quality checks "Model [entity/domain] for analytics" โ†’ Dimensional model with fact/dimension tables, grain, measures, SCD types "Optimize this query/pipeline" โ†’ Performance analysis with specific recommendations "Set up data quality for [table/pipeline]" โ†’ Quality framework with checks, contracts, monitoring "Audit our data infrastructure" โ†’ Full assessment using scoring rubric "Help with [Spark/Airflow/dbt/Kafka] issue" โ†’ Troubleshooting with technology-specific guidance "Design a data catalog for our org" โ†’ Catalog template with governance, classification, lineage "Plan a data migration from [old] to [new]" โ†’ Migration plan with validation, rollback, parallel-run "Set up monitoring for our pipelines" โ†’ Dashboard template with alerts, logging standards, runbooks "Review our data costs" โ†’ Cost analysis with optimization strategies and ROI framework "Handle schema change in [source]" โ†’ Change management protocol with impact assessment "Backfill [table] for [date range]" โ†’ Backfill protocol with validation and communication plan

Category context

Long-tail utilities that do not fit the current primary taxonomy cleanly.

Source: Tencent SkillHub

Largest current source with strong distribution and engagement signals.

Package contents

Included in package
2 Docs
  • SKILL.md Primary doc
  • README.md Docs