← All skills
Tencent SkillHub · Data Analysis

Data Anomaly Detector

Detect anomalies and outliers in construction data: unusual costs, schedule variances, productivity spikes. Statistical and ML-based detection methods.

skill openclawclawhub Free
0 Downloads
0 Stars
0 Installs
0 Score
High Signal

Detect anomalies and outliers in construction data: unusual costs, schedule variances, productivity spikes. Statistical and ML-based detection methods.

⬇ 0 downloads ★ 0 stars Unverified but indexed

Install for OpenClaw

Quick setup
  1. Download the package from Yavira.
  2. Extract the archive and review SKILL.md first.
  3. Import or place the package into your OpenClaw setup.

Requirements

Target platform
OpenClaw
Install method
Manual import
Extraction
Extract archive
Prerequisites
OpenClaw
Primary doc
SKILL.md

Package facts

Download mode
Yavira redirect
Package format
ZIP package
Source platform
Tencent SkillHub
What's included
claw.json, instructions.md, SKILL.md

Validation

  • Use the Yavira download entry.
  • Review SKILL.md after the package is downloaded.
  • Confirm the extracted package contains the expected setup assets.

Install with your agent

Agent handoff

Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.

  1. Download the package from Yavira.
  2. Extract it into a folder your agent can access.
  3. Paste one of the prompts below and point your agent at the extracted folder.
New install

I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Tell me what you changed and call out any manual steps you could not complete.

Upgrade existing

I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Summarize what changed and any follow-up checks I should run.

Trust & source

Release facts

Source
Tencent SkillHub
Verification
Indexed source record
Version
2.1.0

Documentation

ClawHub primary doc Primary doc: SKILL.md 6 sections Open source page

Overview

Detect unusual patterns, outliers, and anomalies in construction data. Identify cost overruns, schedule delays, productivity issues, and data quality problems before they impact projects.

Business Case

Construction data often contains anomalies that indicate: Cost estimate errors or fraud Schedule logic issues Productivity problems Data entry mistakes Equipment or material issues Early detection prevents costly corrections and project delays.

Technical Implementation

from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple from enum import Enum import pandas as pd import numpy as np from datetime import datetime from scipy import stats class AnomalyType(Enum): OUTLIER = "outlier" PATTERN_BREAK = "pattern_break" MISSING_SEQUENCE = "missing_sequence" DUPLICATE = "duplicate" IMPOSSIBLE_VALUE = "impossible_value" TREND_DEVIATION = "trend_deviation" class AnomalySeverity(Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" @dataclass class Anomaly: id: str anomaly_type: AnomalyType severity: AnomalySeverity field: str value: Any expected_range: Optional[Tuple[float, float]] = None description: str = "" row_index: Optional[int] = None detection_method: str = "" confidence: float = 0.0 suggested_action: str = "" @dataclass class AnomalyReport: source: str detected_at: datetime total_records: int anomalies: List[Anomaly] summary: Dict[str, int] class ConstructionAnomalyDetector: """Detect anomalies in construction data.""" # Construction-specific thresholds COST_THRESHOLDS = { 'concrete_per_cy': (200, 800), 'steel_per_ton': (1500, 4000), 'labor_per_hour': (25, 150), 'overhead_percentage': (5, 25), 'contingency_percentage': (3, 20), } SCHEDULE_THRESHOLDS = { 'max_activity_duration': 365, # days 'max_lag': 30, # days 'min_productivity': 0.1, 'max_productivity': 10.0, } def __init__(self): self.anomalies: List[Anomaly] = [] self.detection_history: List[AnomalyReport] = [] def detect_cost_anomalies(self, df: pd.DataFrame, cost_column: str, group_by: str = None) -> List[Anomaly]: """Detect anomalies in cost data.""" anomalies = [] # Statistical outlier detection (IQR method) Q1 = df[cost_column].quantile(0.25) Q3 = df[cost_column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = df[(df[cost_column] < lower_bound) | (df[cost_column] > upper_bound)] for idx, row in outliers.iterrows(): value = row[cost_column] severity = AnomalySeverity.HIGH if abs(value - df[cost_column].median()) > 3 * IQR else AnomalySeverity.MEDIUM anomalies.append(Anomaly( id=f"COST-{idx}", anomaly_type=AnomalyType.OUTLIER, severity=severity, field=cost_column, value=value, expected_range=(lower_bound, upper_bound), description=f"Cost value {value:,.2f} outside expected range", row_index=idx, detection_method="IQR", confidence=0.95, suggested_action="Review cost estimate for errors" )) # Negative cost check negatives = df[df[cost_column] < 0] for idx, row in negatives.iterrows(): anomalies.append(Anomaly( id=f"COST-NEG-{idx}", anomaly_type=AnomalyType.IMPOSSIBLE_VALUE, severity=AnomalySeverity.CRITICAL, field=cost_column, value=row[cost_column], expected_range=(0, None), description="Negative cost value detected", row_index=idx, detection_method="Business Rule", confidence=1.0, suggested_action="Correct data entry error or investigate credit" )) # Group-based anomalies (if grouped) if group_by and group_by in df.columns: group_stats = df.groupby(group_by)[cost_column].agg(['mean', 'std']) for group_name, stats in group_stats.iterrows(): group_data = df[df[group_by] == group_name] z_scores = np.abs((group_data[cost_column] - stats['mean']) / stats['std']) for idx, z in z_scores.items(): if z > 3: anomalies.append(Anomaly( id=f"COST-GROUP-{idx}", anomaly_type=AnomalyType.OUTLIER, severity=AnomalySeverity.MEDIUM, field=cost_column, value=df.loc[idx, cost_column], description=f"Unusual cost for group {group_name} (z-score: {z:.2f})", row_index=idx, detection_method="Z-Score by Group", confidence=min(z / 5, 1.0) )) return anomalies def detect_schedule_anomalies(self, df: pd.DataFrame) -> List[Anomaly]: """Detect anomalies in schedule data.""" anomalies = [] # Check for required columns required = ['start_date', 'end_date'] if not all(col in df.columns for col in required): return anomalies # Convert dates df['start_date'] = pd.to_datetime(df['start_date']) df['end_date'] = pd.to_datetime(df['end_date']) # Calculate duration df['duration'] = (df['end_date'] - df['start_date']).dt.days # Negative duration (end before start) negative_duration = df[df['duration'] < 0] for idx, row in negative_duration.iterrows(): anomalies.append(Anomaly( id=f"SCHED-NEG-{idx}", anomaly_type=AnomalyType.IMPOSSIBLE_VALUE, severity=AnomalySeverity.CRITICAL, field="duration", value=row['duration'], description="End date before start date", row_index=idx, detection_method="Business Rule", confidence=1.0, suggested_action="Correct dates" )) # Extremely long durations long_tasks = df[df['duration'] > self.SCHEDULE_THRESHOLDS['max_activity_duration']] for idx, row in long_tasks.iterrows(): anomalies.append(Anomaly( id=f"SCHED-LONG-{idx}", anomaly_type=AnomalyType.OUTLIER, severity=AnomalySeverity.MEDIUM, field="duration", value=row['duration'], expected_range=(0, self.SCHEDULE_THRESHOLDS['max_activity_duration']), description=f"Task duration {row['duration']} days exceeds threshold", row_index=idx, detection_method="Threshold", confidence=0.9, suggested_action="Review if task should be broken down" )) # Zero duration non-milestones if 'is_milestone' in df.columns: zero_duration = df[(df['duration'] == 0) & (~df['is_milestone'])] for idx, row in zero_duration.iterrows(): anomalies.append(Anomaly( id=f"SCHED-ZERO-{idx}", anomaly_type=AnomalyType.IMPOSSIBLE_VALUE, severity=AnomalySeverity.HIGH, field="duration", value=0, description="Zero duration task that is not a milestone", row_index=idx, detection_method="Business Rule", confidence=1.0, suggested_action="Add duration or mark as milestone" )) return anomalies def detect_productivity_anomalies(self, df: pd.DataFrame, quantity_col: str, hours_col: str) -> List[Anomaly]: """Detect productivity anomalies.""" anomalies = [] # Calculate productivity df['productivity'] = df[quantity_col] / df[hours_col].replace(0, np.nan) # Use Modified Z-Score (more robust for skewed data) median = df['productivity'].median() mad = np.abs(df['productivity'] - median).median() modified_z = 0.6745 * (df['productivity'] - median) / mad outliers = df[np.abs(modified_z) > 3.5] for idx, row in outliers.iterrows(): prod = row['productivity'] z = modified_z.loc[idx] severity = AnomalySeverity.HIGH if abs(z) > 5 else AnomalySeverity.MEDIUM direction = "high" if z > 0 else "low" anomalies.append(Anomaly( id=f"PROD-{idx}", anomaly_type=AnomalyType.OUTLIER, severity=severity, field="productivity", value=prod, description=f"Unusually {direction} productivity: {prod:.2f} units/hour", row_index=idx, detection_method="Modified Z-Score", confidence=min(abs(z) / 7, 1.0), suggested_action=f"Investigate {direction} productivity cause" )) return anomalies def detect_time_series_anomalies(self, df: pd.DataFrame, date_col: str, value_col: str, window: int = 7) -> List[Anomaly]: """Detect anomalies in time series data (e.g., daily costs, progress).""" anomalies = [] df = df.sort_values(date_col).copy() df['rolling_mean'] = df[value_col].rolling(window=window, center=True).mean() df['rolling_std'] = df[value_col].rolling(window=window, center=True).std() # Points outside 2 standard deviations from rolling mean df['z_score'] = (df[value_col] - df['rolling_mean']) / df['rolling_std'] outliers = df[np.abs(df['z_score']) > 2].dropna() for idx, row in outliers.iterrows(): anomalies.append(Anomaly( id=f"TS-{idx}", anomaly_type=AnomalyType.TREND_DEVIATION, severity=AnomalySeverity.MEDIUM if abs(row['z_score']) < 3 else AnomalySeverity.HIGH, field=value_col, value=row[value_col], expected_range=( row['rolling_mean'] - 2 * row['rolling_std'], row['rolling_mean'] + 2 * row['rolling_std'] ), description=f"Value deviates from {window}-day trend", row_index=idx, detection_method="Rolling Z-Score", confidence=min(abs(row['z_score']) / 4, 1.0) )) return anomalies def detect_duplicate_anomalies(self, df: pd.DataFrame, key_columns: List[str]) -> List[Anomaly]: """Detect duplicate records.""" anomalies = [] duplicates = df[df.duplicated(subset=key_columns, keep=False)] if len(duplicates) > 0: dup_groups = duplicates.groupby(key_columns).size() for keys, count in dup_groups.items(): anomalies.append(Anomaly( id=f"DUP-{hash(str(keys)) % 10000}", anomaly_type=AnomalyType.DUPLICATE, severity=AnomalySeverity.HIGH, field=str(key_columns), value=keys, description=f"Found {count} duplicate records for {keys}", detection_method="Exact Match", confidence=1.0, suggested_action="Review and remove duplicates" )) return anomalies def detect_sequence_gaps(self, df: pd.DataFrame, sequence_col: str) -> List[Anomaly]: """Detect gaps in sequential data (invoice numbers, PO numbers, etc.).""" anomalies = [] # Extract numeric part if mixed format df['seq_num'] = pd.to_numeric( df[sequence_col].astype(str).str.extract(r'(\d+)')[0], errors='coerce' ) sorted_seq = df['seq_num'].dropna().sort_values() expected = range(int(sorted_seq.min()), int(sorted_seq.max()) + 1) actual = set(sorted_seq.astype(int)) missing = set(expected) - actual if missing: # Group consecutive missing numbers missing_ranges = [] sorted_missing = sorted(missing) start = sorted_missing[0] end = start for num in sorted_missing[1:]: if num == end + 1: end = num else: missing_ranges.append((start, end)) start = num end = num missing_ranges.append((start, end)) for start, end in missing_ranges: range_str = str(start) if start == end else f"{start}-{end}" anomalies.append(Anomaly( id=f"SEQ-{start}", anomaly_type=AnomalyType.MISSING_SEQUENCE, severity=AnomalySeverity.MEDIUM, field=sequence_col, value=range_str, description=f"Missing sequence number(s): {range_str}", detection_method="Sequence Analysis", confidence=1.0, suggested_action="Investigate missing numbers" )) return anomalies def run_full_detection(self, df: pd.DataFrame, config: Dict) -> AnomalyReport: """Run all applicable anomaly detection methods.""" all_anomalies = [] # Cost anomalies if 'cost_columns' in config: for col in config['cost_columns']: if col in df.columns: all_anomalies.extend( self.detect_cost_anomalies(df, col, config.get('group_by')) ) # Schedule anomalies if 'start_date' in df.columns and 'end_date' in df.columns: all_anomalies.extend(self.detect_schedule_anomalies(df)) # Productivity if 'quantity_col' in config and 'hours_col' in config: all_anomalies.extend( self.detect_productivity_anomalies( df, config['quantity_col'], config['hours_col'] ) ) # Duplicates if 'key_columns' in config: all_anomalies.extend( self.detect_duplicate_anomalies(df, config['key_columns']) ) # Sequence gaps if 'sequence_column' in config: all_anomalies.extend( self.detect_sequence_gaps(df, config['sequence_column']) ) # Create summary summary = {} for a in all_anomalies: key = f"{a.anomaly_type.value}_{a.severity.value}" summary[key] = summary.get(key, 0) + 1 report = AnomalyReport( source=config.get('source_name', 'Unknown'), detected_at=datetime.now(), total_records=len(df), anomalies=all_anomalies, summary=summary ) self.detection_history.append(report) return report def generate_report(self, report: AnomalyReport) -> str: """Generate markdown anomaly report.""" lines = [f"# Anomaly Detection Report", ""] lines.append(f"**Source:** {report.source}") lines.append(f"**Detected At:** {report.detected_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Total Records:** {report.total_records:,}") lines.append(f"**Anomalies Found:** {len(report.anomalies)}") lines.append("") # Summary by severity lines.append("## Summary by Severity") for severity in AnomalySeverity: count = sum(1 for a in report.anomalies if a.severity == severity) if count > 0: lines.append(f"- **{severity.value.upper()}:** {count}") lines.append("") # Critical anomalies first critical = [a for a in report.anomalies if a.severity == AnomalySeverity.CRITICAL] if critical: lines.append("## Critical Anomalies") for a in critical: lines.append(f"\n### {a.id}") lines.append(f"- **Type:** {a.anomaly_type.value}") lines.append(f"- **Field:** {a.field}") lines.append(f"- **Value:** {a.value}") lines.append(f"- **Description:** {a.description}") lines.append(f"- **Action:** {a.suggested_action}") # All anomalies table lines.append("\n## All Anomalies") lines.append("| ID | Type | Severity | Field | Description |") lines.append("|-----|------|----------|-------|-------------|") for a in report.anomalies[:50]: lines.append(f"| {a.id} | {a.anomaly_type.value} | {a.severity.value} | {a.field} | {a.description[:50]} |") if len(report.anomalies) > 50: lines.append(f"\n*... and {len(report.anomalies) - 50} more anomalies*") return "\n".join(lines)

Quick Start

import pandas as pd # Load data df = pd.read_excel("project_costs.xlsx") # Initialize detector detector = ConstructionAnomalyDetector() # Run detection config = { 'source_name': 'Project Costs Q1 2026', 'cost_columns': ['total_cost', 'labor_cost', 'material_cost'], 'group_by': 'cost_code', 'key_columns': ['project_id', 'cost_code', 'date'], 'sequence_column': 'invoice_number' } report = detector.run_full_detection(df, config) # Generate report print(detector.generate_report(report)) # Get critical anomalies for immediate action critical = [a for a in report.anomalies if a.severity == AnomalySeverity.CRITICAL] print(f"\n{len(critical)} critical anomalies require immediate attention")

Dependencies

pip install pandas numpy scipy

Resources

Statistical Methods: IQR, Z-Score, Modified Z-Score Construction Benchmarks: RSMeans, ENR indices

Category context

Data access, storage, extraction, analysis, reporting, and insight generation.

Source: Tencent SkillHub

Largest current source with strong distribution and engagement signals.

Package contents

Included in package
2 Docs1 Config
  • SKILL.md Primary doc
  • instructions.md Docs
  • claw.json Config