Requirements
- Target platform
- OpenClaw
- Install method
- Manual import
- Extraction
- Extract archive
- Prerequisites
- OpenClaw
- Primary doc
- SKILL.md
Generate secure SQL queries with validation, pagination helpers, risk analysis, and audit-focused safeguards.
Generate secure SQL queries with validation, pagination helpers, risk analysis, and audit-focused safeguards.
Hand the extracted package to your coding agent with a concrete install brief instead of figuring it out manually.
I downloaded a skill package from Yavira. Read SKILL.md from the extracted folder and install it by following the included instructions. Then review README.md for any prerequisites, environment setup, or post-install checks. Tell me what you changed and call out any manual steps you could not complete.
I downloaded an updated skill package from Yavira. Read SKILL.md from the extracted folder, compare it with my current installation, and upgrade it while preserving any custom configuration unless the package docs explicitly say otherwise. Then review README.md for any prerequisites, environment setup, or post-install checks. Summarize what changed and any follow-up checks I should run.
This skill enables AI agents to generate accurate, optimized SQL queries from natural language descriptions. It supports multiple database systems and follows best practices for query construction, security, and performance.
# Clone or download the repository git clone https://github.com/yourusername/sql-query-generator.git cd sql-query-generator # No external dependencies required for core functionality python sql_query_generator.py
# Copy sql_query_generator.py to your project cp sql_query_generator.py /path/to/your/project/ # Import in your code from sql_query_generator import SQLQueryGenerator, DatabaseType
For AI agents using this skill: Read this SKILL.md file completely before generating queries Follow all security guidelines strictly Always use parameterized queries Validate all inputs before query generation Include security warnings in responses
Install only the drivers you need: # PostgreSQL pip install psycopg2-binary # MySQL pip install mysql-connector-python # SQL Server pip install pyodbc # Oracle pip install cx_Oracle # For testing and development pip install pytest pytest-cov
Python 3.7 or higher No external dependencies for core query generation Database drivers only needed for actual query execution
PostgreSQL MySQL SQLite Microsoft SQL Server Oracle Database MariaDB
SELECT Queries: Simple and complex data retrieval JOIN Operations: INNER, LEFT, RIGHT, FULL OUTER, CROSS Aggregations: GROUP BY, HAVING, aggregate functions Subqueries: Correlated and non-correlated CTEs: Common Table Expressions (WITH clause) Window Functions: OVER, PARTITION BY, ROW_NUMBER, RANK INSERT/UPDATE/DELETE: Data manipulation queries DDL: CREATE, ALTER, DROP statements
Index usage recommendations Query execution plan analysis Performance optimization suggestions Avoiding N+1 query problems
SQL injection prevention Parameterized query generation Input validation patterns Role-based access control patterns
When generating SQL queries, follow these steps: Understand the Request Parse natural language input Identify required tables Determine join conditions Extract filter criteria Generate Base Query -- Example structure SELECT column1, column2, aggregate_function(column3) AS alias FROM table1 JOIN table2 ON table1.id = table2.foreign_id WHERE condition1 = value1 AND condition2 > value2 GROUP BY column1, column2 HAVING aggregate_condition ORDER BY column1 DESC LIMIT 100; Apply Security Measures Use parameterized queries Validate all inputs Escape special characters
Pattern 1: Simple SELECT -- Natural language: "Get all users who registered after January 1, 2024" SELECT id, username, email, registration_date FROM users WHERE registration_date > $1 -- Parameterized ORDER BY registration_date DESC; Pattern 2: JOIN with Aggregation -- Natural language: "Show total orders by customer in 2024" SELECT c.customer_name, c.email, COUNT(o.order_id) AS total_orders, SUM(o.total_amount) AS total_spent FROM customers c INNER JOIN orders o ON c.customer_id = o.customer_id WHERE EXTRACT(YEAR FROM o.order_date) = $1 GROUP BY c.customer_id, c.customer_name, c.email HAVING COUNT(o.order_id) > 5 ORDER BY total_spent DESC; Pattern 3: Subquery -- Natural language: "Find products with above-average prices" SELECT product_name, price, category FROM products WHERE price > ( SELECT AVG(price) FROM products ) ORDER BY price DESC; Pattern 4: CTE (Common Table Expression) -- Natural language: "Get top 3 products per category by sales" WITH product_sales AS ( SELECT p.product_id, p.product_name, p.category_id, c.category_name, SUM(oi.quantity * oi.unit_price) AS total_sales, ROW_NUMBER() OVER ( PARTITION BY p.category_id ORDER BY SUM(oi.quantity * oi.unit_price) DESC ) AS rank_in_category FROM products p JOIN order_items oi ON p.product_id = oi.product_id JOIN categories c ON p.category_id = c.category_id GROUP BY p.product_id, p.product_name, p.category_id, c.category_name ) SELECT category_name, product_name, total_sales, rank_in_category FROM product_sales WHERE rank_in_category <= 3 ORDER BY category_name, rank_in_category; Pattern 5: Window Functions -- Natural language: "Show running total of sales per day" SELECT sale_date, daily_total, SUM(daily_total) OVER ( ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS running_total, AVG(daily_total) OVER ( ORDER BY sale_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW ) AS moving_average_7days FROM ( SELECT DATE(order_date) AS sale_date, SUM(total_amount) AS daily_total FROM orders GROUP BY DATE(order_date) ) daily_sales ORDER BY sale_date;
Always use explicit column names (avoid SELECT *) Use meaningful table aliases Indent for readability Comment complex logic
Create appropriate indexes Avoid SELECT DISTINCT when possible (use GROUP BY instead) Use EXISTS instead of IN for large datasets Limit result sets when appropriate Use EXPLAIN to analyze query plans
3.1 MANDATORY Security Rules THESE RULES ARE NON-NEGOTIABLE AND MUST ALWAYS BE FOLLOWED: NEVER CONCATENATE USER INPUT INTO SQL # WRONG - CRITICAL SECURITY VULNERABILITY query = f"SELECT * FROM users WHERE username = '{user_input}'" # CORRECT - Always use parameters query = "SELECT * FROM users WHERE username = %s" cursor.execute(query, (user_input,)) ALL VALUES MUST BE PARAMETERIZED Even seemingly "safe" values like numbers Even values from "trusted" sources Even internal application values NO EXCEPTIONS VALIDATE AND SANITIZE ALL INPUTS # Whitelist validation VALID_STATUSES = ['active', 'inactive', 'pending'] if status not in VALID_STATUSES: raise ValueError("Invalid status") # Type validation if not isinstance(user_id, int): raise TypeError("user_id must be integer") # Length validation if len(username) > 50: raise ValueError("username too long") ESCAPE DYNAMIC IDENTIFIERS PROPERLY from psycopg2 import sql # For table/column names that must be dynamic query = sql.SQL("SELECT * FROM {} WHERE id = %s").format( sql.Identifier(table_name) ) cursor.execute(query, (user_id,)) 3.2 Input Validation Framework import re from typing import Any, List, Optional class SQLInputValidator: """Comprehensive input validation for SQL queries""" @staticmethod def validate_identifier(identifier: str, max_length: int = 63) -> str: """Validate table/column names""" # Check length if len(identifier) > max_length: raise ValueError(f"Identifier too long: {len(identifier)} > {max_length}") # Only alphanumeric and underscore if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier): raise ValueError(f"Invalid identifier: {identifier}") # Prevent SQL keywords as identifiers SQL_KEYWORDS = { 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'TRUNCATE', 'UNION', 'JOIN', 'WHERE', 'FROM' } if identifier.upper() in SQL_KEYWORDS: raise ValueError(f"SQL keyword not allowed: {identifier}") return identifier @staticmethod def validate_integer(value: Any, min_val: Optional[int] = None, max_val: Optional[int] = None) -> int: """Validate integer values""" try: int_value = int(value) except (ValueError, TypeError): raise ValueError(f"Invalid integer: {value}") if min_val is not None and int_value < min_val: raise ValueError(f"Value {int_value} below minimum {min_val}") if max_val is not None and int_value > max_val: raise ValueError(f"Value {int_value} above maximum {max_val}") return int_value @staticmethod def validate_string(value: str, max_length: int = 255, allow_empty: bool = False) -> str: """Validate string values""" if not isinstance(value, str): raise TypeError("Value must be string") if not allow_empty and len(value) == 0: raise ValueError("Empty string not allowed") if len(value) > max_length: raise ValueError(f"String too long: {len(value)} > {max_length}") # Check for null bytes if '\x00' in value: raise ValueError("Null bytes not allowed in string") return value @staticmethod def validate_email(email: str) -> str: """Validate email format""" email = SQLInputValidator.validate_string(email, max_length=254) # Basic email validation if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): raise ValueError(f"Invalid email format: {email}") return email @staticmethod def validate_date(date_str: str) -> str: """Validate date format (YYYY-MM-DD)""" if not re.match(r'^\d{4}-\d{2}-\d{2}$', date_str): raise ValueError(f"Invalid date format: {date_str}") return date_str @staticmethod def validate_enum(value: str, allowed_values: List[str]) -> str: """Validate value against whitelist""" if value not in allowed_values: raise ValueError(f"Invalid value: {value}. Allowed: {allowed_values}") return value 3.3 SQL Injection Attack Patterns to Prevent # Detect common SQL injection patterns INJECTION_PATTERNS = [ r"('|(\\')|(--)|(\#)|(%23)|(;))", # Basic SQL injection r"((\%27)|(\'))", # Single quote variations r"(union.*select)", # UNION-based injection r"(insert.*into)", # INSERT injection r"(update.*set)", # UPDATE injection r"(delete.*from)", # DELETE injection r"(drop.*table)", # DROP TABLE r"(exec(\s|\+)+(s|x)p\w+)", # Stored procedure execution r"(script.*>)", # XSS attempts ] def detect_injection_attempt(value: str) -> bool: """Detect potential SQL injection attempts""" value_lower = value.lower() for pattern in INJECTION_PATTERNS: if re.search(pattern, value_lower): return True return False 3.4 Secure Query Builder class SecureQueryBuilder: """Build SQL queries with mandatory security checks""" def __init__(self, db_type: DatabaseType): self.db_type = db_type self.validator = SQLInputValidator() self.params = [] def build_select(self, table: str, columns: List[str], conditions: dict) -> tuple: """Build SELECT query with validation""" # Validate table name table = self.validator.validate_identifier(table) # Validate columns validated_columns = [ self.validator.validate_identifier(col) for col in columns ] # Build query query = f"SELECT {', '.join(validated_columns)} FROM {table}" # Add WHERE clause with parameters if conditions: where_parts = [] for key, value in conditions.items(): key = self.validator.validate_identifier(key) where_parts.append(f"{key} = %s") self.params.append(value) query += " WHERE " + " AND ".join(where_parts) return query, tuple(self.params) 3.5 Database Connection Security import ssl from typing import Optional class SecureConnection: """Secure database connection configuration""" @staticmethod def get_postgresql_ssl_config() -> dict: """PostgreSQL SSL configuration""" return { 'sslmode': 'require', # or 'verify-full' for production 'sslrootcert': '/path/to/ca-cert.pem', 'sslcert': '/path/to/client-cert.pem', 'sslkey': '/path/to/client-key.pem' } @staticmethod def get_connection_timeout() -> dict: """Connection timeout settings""" return { 'connect_timeout': 10, 'command_timeout': 30, 'keepalives': 1, 'keepalives_idle': 30, 'keepalives_interval': 10, 'keepalives_count': 5 } @staticmethod def create_secure_connection(database_url: str) -> Any: """Create connection with security settings""" import psycopg2 # Parse connection string securely # NEVER log the connection string (contains credentials) conn = psycopg2.connect( database_url, **SecureConnection.get_postgresql_ssl_config(), **SecureConnection.get_connection_timeout() ) # Set session security parameters cursor = conn.cursor() cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE") cursor.execute("SET statement_timeout = 30000") # 30 seconds cursor.close() return conn 3.6 Rate Limiting import time from collections import defaultdict from threading import Lock class RateLimiter: """Prevent query flooding attacks""" def __init__(self, max_requests: int = 100, window_seconds: int = 60): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = defaultdict(list) self.lock = Lock() def is_allowed(self, identifier: str) -> bool: """Check if request is allowed""" with self.lock: now = time.time() window_start = now - self.window_seconds # Clean old requests self.requests[identifier] = [ req_time for req_time in self.requests[identifier] if req_time > window_start ] # Check limit if len(self.requests[identifier]) >= self.max_requests: return False # Add new request self.requests[identifier].append(now) return True 3.7 Audit Logging import logging import json from datetime import datetime from typing import Any, Dict class SecurityAuditLogger: """Log all database operations for security auditing""" def __init__(self, log_file: str = '/var/log/sql_audit.log'): self.logger = logging.getLogger('sql_audit') handler = logging.FileHandler(log_file) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_query(self, query: str, params: tuple, user_id: str, ip_address: str, result_count: int = None): """Log query execution""" log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'user_id': user_id, 'ip_address': ip_address, 'query': query, 'param_count': len(params), 'result_count': result_count } self.logger.info(json.dumps(log_entry)) def log_security_event(self, event_type: str, details: Dict[str, Any], severity: str = 'WARNING'): """Log security events""" log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'event_type': event_type, 'severity': severity, 'details': details } if severity == 'CRITICAL': self.logger.critical(json.dumps(log_entry)) elif severity == 'ERROR': self.logger.error(json.dumps(log_entry)) else: self.logger.warning(json.dumps(log_entry)) 3.8 Prepared Statement Pool from typing import Dict, Any import hashlib class PreparedStatementPool: """Reuse prepared statements for better performance and security""" def __init__(self, connection): self.connection = connection self.statements: Dict[str, Any] = {} def get_statement(self, query: str): """Get or create prepared statement""" # Create hash of query for lookup query_hash = hashlib.sha256(query.encode()).hexdigest()[:16] if query_hash not in self.statements: # Create new prepared statement cursor = self.connection.cursor() statement_name = f"stmt_{query_hash}" cursor.execute(f"PREPARE {statement_name} AS {query}") self.statements[query_hash] = statement_name return self.statements[query_hash] def execute(self, query: str, params: tuple): """Execute using prepared statement""" stmt_name = self.get_statement(query) cursor = self.connection.cursor() param_list = ', '.join(['%s'] * len(params)) cursor.execute(f"EXECUTE {stmt_name}({param_list})", params) return cursor
PostgreSQL/Python (psycopg2) # CORRECT - Parameterized cursor.execute( "SELECT * FROM users WHERE email = %s AND status = %s", (user_email, status) ) # WRONG - String concatenation (SQL injection risk) cursor.execute( f"SELECT * FROM users WHERE email = '{user_email}'" ) MySQL/Python (mysql-connector) # CORRECT cursor.execute( "SELECT * FROM products WHERE price > %s", (min_price,) ) SQLite/Python # CORRECT cursor.execute( "SELECT * FROM orders WHERE order_date > ?", (start_date,) ) Node.js (PostgreSQL) // CORRECT const result = await client.query( 'SELECT * FROM users WHERE id = $1', [userId] );
PostgreSQL Use $1, $2, $3 for parameters Supports advanced features: JSONB, arrays, full-text search Use RETURNING clause for INSERT/UPDATE/DELETE Case-sensitive text search with ILIKE MySQL Use ? for parameters LIMIT syntax: LIMIT offset, count Use backticks for identifiers with spaces Date functions: DATE_FORMAT, CURDATE() SQL Server Use @param1, @param2 for parameters TOP instead of LIMIT Use square brackets for identifiers Date functions: GETDATE(), DATEADD() SQLite Use ? for parameters Limited ALTER TABLE support No RIGHT JOIN or FULL OUTER JOIN Date functions as strings
When generating queries, include error handling recommendations: import psycopg2 from psycopg2 import sql try: cursor.execute( sql.SQL("SELECT * FROM {} WHERE id = %s").format( sql.Identifier('users') ), (user_id,) ) results = cursor.fetchall() except psycopg2.Error as e: print(f"Database error: {e}") # Log error, return appropriate response finally: cursor.close()
Before providing a query, verify: All table and column names are valid JOIN conditions are correct WHERE clause logic is accurate Parameters are used (not string concatenation) Appropriate indexes exist or are recommended Query is optimized for the expected dataset size Results will be properly limited if needed Error handling is included in implementation code
When responding to a query request, provide: The SQL Query (properly formatted and commented) Explanation of what the query does Parameters that need to be passed Expected Result structure Performance Notes (if applicable) Security Warnings (if applicable) Implementation Example in the requested language
### SQL Query ```sql -- Get active users with their order counts SELECT u.user_id, u.username, u.email, COUNT(o.order_id) AS order_count, COALESCE(SUM(o.total_amount), 0) AS lifetime_value FROM users u LEFT JOIN orders o ON u.user_id = o.user_id WHERE u.status = $1 AND u.created_at >= $2 GROUP BY u.user_id, u.username, u.email HAVING COUNT(o.order_id) >= $3 ORDER BY lifetime_value DESC LIMIT $4;
$1: status (string, e.g., 'active') $2: created_at (date, e.g., '2024-01-01') $3: min_orders (integer, e.g., 5) $4: limit (integer, e.g., 100)
This query retrieves active users who joined after a specified date and have placed a minimum number of orders. It calculates their total order count and lifetime value, sorted by highest spending customers first.
user_idusernameemailorder_countlifetime_value123john_doejohn@example.com152500.00
Ensure index on users.status and users.created_at Ensure index on orders.user_id For large datasets, consider pagination
const resolvers = { Query: { users: async (_, { status, limit }, { db }) => { const result = await db.query( 'SELECT * FROM users WHERE status = $1 LIMIT $2', [status, limit] ); return result.rows; } } };
This skill provides comprehensive SQL query generation capabilities with a focus on security, performance, and best practices. Always prioritize parameterized queries and provide clear documentation with generated SQL.
Code helpers, APIs, CLIs, browser automation, testing, and developer operations.
Largest current source with strong distribution and engagement signals.