Requirements
- Target platform
- OpenClaw
- Install method
- Manual import
- Extraction
- Extract archive
- Prerequisites
- OpenClaw
- Primary doc
- SKILL.md
Profile construction data to understand characteristics, distributions, quality metrics, and patterns. Essential for data quality assessment and ETL planning.
Profile construction data to understand characteristics, distributions, quality metrics, and patterns. Essential for data quality assessment and ETL planning.
Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.
I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Tell me what you changed and call out any manual steps you could not complete.
I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Summarize what changed and any follow-up checks I should run.
Analyze construction data to understand its characteristics, distributions, quality, and patterns. Essential for data quality assessment, ETL planning, and identifying data issues before they impact projects.
Before using any construction data, you need to understand: What data types are present Distribution of values Missing data patterns Anomalies and outliers Referential integrity issues This skill profiles data to answer these questions and provides actionable insights.
from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple import pandas as pd import numpy as np from datetime import datetime import json @dataclass class ColumnProfile: name: str data_type: str inferred_type: str # More specific: project_id, cost, date, csi_code, etc. total_count: int null_count: int null_percentage: float unique_count: int uniqueness_ratio: float # For numeric columns min_value: Optional[float] = None max_value: Optional[float] = None mean_value: Optional[float] = None median_value: Optional[float] = None std_dev: Optional[float] = None # For string columns min_length: Optional[int] = None max_length: Optional[int] = None avg_length: Optional[float] = None # Top values top_values: List[Tuple[Any, int]] = field(default_factory=list) # Patterns common_patterns: List[str] = field(default_factory=list) # Quality flags quality_issues: List[str] = field(default_factory=list) @dataclass class DataProfile: source_name: str row_count: int column_count: int columns: List[ColumnProfile] duplicate_rows: int memory_usage: str profiled_at: datetime quality_score: float recommendations: List[str] class ConstructionDataProfiler: """Profile construction data for quality and characteristics.""" # Known construction data patterns CONSTRUCTION_PATTERNS = { 'csi_code': r'^\d{2}\s?\d{2}\s?\d{2}$', 'project_id': r'^[A-Z]{2,4}[-_]?\d{3,6}$', 'cost_code': r'^\d{2}[-.]?\d{2,4}$', 'wbs': r'^[\d.]+$', 'phone': r'^\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$', 'email': r'^[\w.-]+@[\w.-]+\.\w+$', 'date_iso': r'^\d{4}-\d{2}-\d{2}', 'date_us': r'^\d{1,2}/\d{1,2}/\d{2,4}$', 'currency': r'^\$?[\d,]+\.?\d{0,2}$', 'percentage': r'^\d+\.?\d*%?$', } # Construction-specific column name patterns COLUMN_TYPE_HINTS = { 'project': ['project_id', 'project_name', 'proj', 'job'], 'cost': ['cost', 'amount', 'price', 'total', 'budget', 'actual'], 'date': ['date', 'start', 'finish', 'end', 'created', 'modified'], 'quantity': ['qty', 'quantity', 'count', 'units'], 'csi': ['csi', 'division', 'masterformat', 'spec'], 'location': ['location', 'area', 'zone', 'floor', 'level'], 'person': ['owner', 'manager', 'superintendent', 'foreman', 'contact'], } def __init__(self): self.profiles: Dict[str, DataProfile] = {} def profile_dataframe(self, df: pd.DataFrame, source_name: str) -> DataProfile: """Profile a pandas DataFrame.""" columns = [] for col in df.columns: col_profile = self._profile_column(df[col], col) columns.append(col_profile) # Calculate duplicates duplicate_rows = len(df) - len(df.drop_duplicates()) # Calculate memory usage memory_bytes = df.memory_usage(deep=True).sum() if memory_bytes < 1024: memory_usage = f"{memory_bytes} B" elif memory_bytes < 1024**2: memory_usage = f"{memory_bytes/1024:.1f} KB" else: memory_usage = f"{memory_bytes/1024**2:.1f} MB" # Calculate overall quality score quality_score = self._calculate_quality_score(columns) # Generate recommendations recommendations = self._generate_recommendations(columns, df) profile = DataProfile( source_name=source_name, row_count=len(df), column_count=len(df.columns), columns=columns, duplicate_rows=duplicate_rows, memory_usage=memory_usage, profiled_at=datetime.now(), quality_score=quality_score, recommendations=recommendations ) self.profiles[source_name] = profile return profile def _profile_column(self, series: pd.Series, name: str) -> ColumnProfile: """Profile a single column.""" total_count = len(series) null_count = series.isnull().sum() null_percentage = (null_count / total_count * 100) if total_count > 0 else 0 # Get non-null values for analysis non_null = series.dropna() unique_count = non_null.nunique() uniqueness_ratio = unique_count / len(non_null) if len(non_null) > 0 else 0 profile = ColumnProfile( name=name, data_type=str(series.dtype), inferred_type=self._infer_construction_type(series, name), total_count=total_count, null_count=null_count, null_percentage=round(null_percentage, 2), unique_count=unique_count, uniqueness_ratio=round(uniqueness_ratio, 4) ) # Numeric analysis if pd.api.types.is_numeric_dtype(series): profile.min_value = float(non_null.min()) if len(non_null) > 0 else None profile.max_value = float(non_null.max()) if len(non_null) > 0 else None profile.mean_value = float(non_null.mean()) if len(non_null) > 0 else None profile.median_value = float(non_null.median()) if len(non_null) > 0 else None profile.std_dev = float(non_null.std()) if len(non_null) > 1 else None # Check for outliers if len(non_null) > 10 and profile.std_dev: outliers = non_null[abs(non_null - profile.mean_value) > 3 * profile.std_dev] if len(outliers) > 0: profile.quality_issues.append(f"{len(outliers)} potential outliers detected") # Check for negative costs if any(hint in name.lower() for hint in ['cost', 'amount', 'price', 'total']): negatives = (non_null < 0).sum() if negatives > 0: profile.quality_issues.append(f"{negatives} negative values in cost column") # String analysis elif pd.api.types.is_object_dtype(series) or pd.api.types.is_string_dtype(series): str_series = non_null.astype(str) lengths = str_series.str.len() profile.min_length = int(lengths.min()) if len(lengths) > 0 else None profile.max_length = int(lengths.max()) if len(lengths) > 0 else None profile.avg_length = float(lengths.mean()) if len(lengths) > 0 else None # Detect patterns profile.common_patterns = self._detect_patterns(str_series) # Top values if len(non_null) > 0: value_counts = non_null.value_counts().head(5) profile.top_values = list(zip(value_counts.index.tolist(), value_counts.values.tolist())) # Quality checks if null_percentage > 50: profile.quality_issues.append("High null rate (>50%)") if uniqueness_ratio == 1.0 and total_count > 100: profile.quality_issues.append("All unique values - possible ID column") if uniqueness_ratio < 0.01 and unique_count > 1: profile.quality_issues.append("Low cardinality - possible category") return profile def _infer_construction_type(self, series: pd.Series, name: str) -> str: """Infer construction-specific data type.""" name_lower = name.lower() # Check column name hints for type_name, hints in self.COLUMN_TYPE_HINTS.items(): if any(hint in name_lower for hint in hints): return type_name # Check data patterns non_null = series.dropna().astype(str) if len(non_null) == 0: return "unknown" sample = non_null.head(100) for pattern_name, pattern in self.CONSTRUCTION_PATTERNS.items(): matches = sample.str.match(pattern, na=False).sum() if matches / len(sample) > 0.8: return pattern_name # Default to pandas dtype if pd.api.types.is_numeric_dtype(series): return "numeric" elif pd.api.types.is_datetime64_any_dtype(series): return "datetime" else: return "text" def _detect_patterns(self, str_series: pd.Series) -> List[str]: """Detect common patterns in string data.""" patterns_found = [] sample = str_series.head(1000) for pattern_name, pattern in self.CONSTRUCTION_PATTERNS.items(): matches = sample.str.match(pattern, na=False).sum() if matches / len(sample) > 0.1: patterns_found.append(f"{pattern_name} ({matches/len(sample):.0%})") return patterns_found[:3] def _calculate_quality_score(self, columns: List[ColumnProfile]) -> float: """Calculate overall data quality score (0-100).""" if not columns: return 0.0 scores = [] for col in columns: col_score = 100 # Penalize for nulls col_score -= min(col.null_percentage, 50) # Penalize for quality issues col_score -= len(col.quality_issues) * 10 scores.append(max(col_score, 0)) return round(sum(scores) / len(scores), 1) def _generate_recommendations(self, columns: List[ColumnProfile], df: pd.DataFrame) -> List[str]: """Generate recommendations based on profile.""" recommendations = [] # High null columns high_null = [c for c in columns if c.null_percentage > 30] if high_null: recommendations.append( f"Review {len(high_null)} columns with >30% null values: " f"{', '.join(c.name for c in high_null[:3])}" ) # Potential ID columns without uniqueness for col in columns: if 'id' in col.name.lower() and col.uniqueness_ratio < 1.0: recommendations.append( f"Column '{col.name}' appears to be an ID but has duplicate values" ) # Date columns that should be datetime for col in columns: if col.inferred_type in ['date_iso', 'date_us'] and col.data_type == 'object': recommendations.append( f"Convert '{col.name}' to datetime type for better analysis" ) # Cost columns that are strings for col in columns: if col.inferred_type == 'currency' and col.data_type == 'object': recommendations.append( f"Convert '{col.name}' to numeric type (remove $ and commas)" ) return recommendations def profile_to_dict(self, profile: DataProfile) -> Dict: """Convert profile to dictionary for JSON export.""" return { 'source_name': profile.source_name, 'row_count': profile.row_count, 'column_count': profile.column_count, 'duplicate_rows': profile.duplicate_rows, 'memory_usage': profile.memory_usage, 'profiled_at': profile.profiled_at.isoformat(), 'quality_score': profile.quality_score, 'recommendations': profile.recommendations, 'columns': [ { 'name': c.name, 'data_type': c.data_type, 'inferred_type': c.inferred_type, 'null_percentage': c.null_percentage, 'unique_count': c.unique_count, 'quality_issues': c.quality_issues, 'top_values': c.top_values[:3] } for c in profile.columns ] } def generate_profile_report(self, profile: DataProfile) -> str: """Generate markdown profile report.""" report = [f"# Data Profile: {profile.source_name}", ""] report.append(f"**Profiled At:** {profile.profiled_at.strftime('%Y-%m-%d %H:%M')}") report.append(f"**Quality Score:** {profile.quality_score}/100") report.append("") # Summary report.append("## Summary") report.append(f"- **Rows:** {profile.row_count:,}") report.append(f"- **Columns:** {profile.column_count}") report.append(f"- **Duplicate Rows:** {profile.duplicate_rows:,}") report.append(f"- **Memory Usage:** {profile.memory_usage}") report.append("") # Recommendations if profile.recommendations: report.append("## Recommendations") for rec in profile.recommendations: report.append(f"- {rec}") report.append("") # Column Details report.append("## Column Details") report.append("") report.append("| Column | Type | Inferred | Nulls | Unique | Issues |") report.append("|--------|------|----------|-------|--------|--------|") for col in profile.columns: issues = len(col.quality_issues) report.append( f"| {col.name} | {col.data_type} | {col.inferred_type} | " f"{col.null_percentage:.1f}% | {col.unique_count:,} | {issues} |" ) # Detailed column profiles report.append("") report.append("## Detailed Column Profiles") for col in profile.columns: report.append(f"\n### {col.name}") report.append(f"- **Type:** {col.data_type} (inferred: {col.inferred_type})") report.append(f"- **Nulls:** {col.null_count:,} ({col.null_percentage:.1f}%)") report.append(f"- **Unique Values:** {col.unique_count:,} ({col.uniqueness_ratio:.1%})") if col.min_value is not None: report.append(f"- **Range:** {col.min_value:,.2f} to {col.max_value:,.2f}") report.append(f"- **Mean:** {col.mean_value:,.2f}, Median: {col.median_value:,.2f}") if col.min_length is not None: report.append(f"- **Length:** {col.min_length} to {col.max_length} (avg: {col.avg_length:.1f})") if col.top_values: report.append(f"- **Top Values:** {col.top_values[:3]}") if col.common_patterns: report.append(f"- **Patterns:** {', '.join(col.common_patterns)}") if col.quality_issues: report.append(f"- **Issues:** {', '.join(col.quality_issues)}") return "\n".join(report) def compare_profiles(self, profile1: DataProfile, profile2: DataProfile) -> Dict: """Compare two profiles to detect schema changes or data drift.""" comparison = { 'profiles': [profile1.source_name, profile2.source_name], 'row_count_change': profile2.row_count - profile1.row_count, 'quality_change': profile2.quality_score - profile1.quality_score, 'new_columns': [], 'removed_columns': [], 'type_changes': [], 'null_rate_changes': [] } cols1 = {c.name: c for c in profile1.columns} cols2 = {c.name: c for c in profile2.columns} # Find new/removed columns comparison['new_columns'] = [n for n in cols2 if n not in cols1] comparison['removed_columns'] = [n for n in cols1 if n not in cols2] # Compare common columns for name in cols1: if name in cols2: c1, c2 = cols1[name], cols2[name] if c1.data_type != c2.data_type: comparison['type_changes'].append({ 'column': name, 'from': c1.data_type, 'to': c2.data_type }) null_change = c2.null_percentage - c1.null_percentage if abs(null_change) > 10: comparison['null_rate_changes'].append({ 'column': name, 'change': null_change }) return comparison
import pandas as pd # Load construction data df = pd.read_excel("project_costs.xlsx") # Profile the data profiler = ConstructionDataProfiler() profile = profiler.profile_dataframe(df, "Project Costs 2025") # Generate report report = profiler.generate_profile_report(profile) print(report) # Export to JSON profile_dict = profiler.profile_to_dict(profile) with open("profile.json", "w") as f: json.dump(profile_dict, f, indent=2) # Compare with previous profile old_profile = profiler.profile_dataframe(old_df, "Project Costs 2024") comparison = profiler.compare_profiles(old_profile, profile) print(f"Quality changed by: {comparison['quality_change']}")
Pre-ETL Analysis: Profile source data before building pipelines Quality Monitoring: Track data quality over time Schema Validation: Detect unexpected changes in data structure Anomaly Detection: Find outliers and data quality issues
pip install pandas numpy
Data Profiling Best Practices: DAMA DMBOK Construction Data Standards: CSI MasterFormat, UniFormat
Data access, storage, extraction, analysis, reporting, and insight generation.
Largest current source with strong distribution and engagement signals.