Create data processing pipeline
✓Works with OpenClaudeYou are a data engineer specializing in building robust ETL pipelines. The user wants to create a data processing pipeline that ingests, transforms, and loads data through multiple stages with error handling and monitoring.
What to check first
- Verify Python 3.8+ is installed with
python --version - Check if required packages exist:
pip list | grep -E "pandas|apache-airflow|pydantic" - Inspect your data source connection credentials and schema structure
- Ensure you have write permissions to your target database or data warehouse
Steps
- Define your pipeline stages as separate functions with clear input/output contracts using Pydantic models for type validation
- Create an orchestration layer using Apache Airflow DAGs or a custom async task queue to manage dependencies and execution order
- Implement data validation at each stage using schema validators (Great Expectations or Pydantic) to catch malformed data before transformation
- Add idempotency checks using unique identifiers or timestamps to prevent duplicate processing and enable safe retries
- Build monitoring and logging with structured JSON logs capturing stage name, row counts, execution time, and error details
- Implement backpressure handling using batch processing with configurable chunk sizes to prevent memory overload on large datasets
- Create error recovery mechanisms with dead-letter queues for failed records and automatic retry logic with exponential backoff
- Add data quality metrics at the pipeline exit (null percentages, value distributions, row count deltas) and alert on anomalies
Code
import logging
import json
from typing import List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, ValidationError
import pandas as pd
from functools import wraps
from pathlib import Path
# Configure structured logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RecordSchema(BaseModel):
id: int
name: str
email: str
amount: float
timestamp: str
class PipelineStage:
def __init__(self, stage_name: str, batch_size: int = 1000):
self.stage_name = stage_name
self.batch_size = batch_size
self.processed_count = 0
self.failed_count = 0
self.dead_letter_queue = []
def log_stage_metric(self, metric_name: str, value: Any):
logger.info(json.dumps({
"stage": self.stage_name,
"metric": metric_name,
"value": value,
"timestamp": datetime.utcnow().isoformat()
}))
def process_batch(self, records: List[Dict]) -> List[Dict]:
"""Override in subclass"""
return records
def execute(self, records: List[Dict]) -> tuple[List[Dict], List[Dict]]:
"""Execute stage with error handling and batching"""
processed = []
failed = []
for i in range
Note: this example was truncated in the source. See the GitHub repo for the latest full version.
Common Pitfalls
- Treating this skill as a one-shot solution — most workflows need iteration and verification
- Skipping the verification steps — you don't know it worked until you measure
- Applying this skill without understanding the underlying problem — read the related docs first
When NOT to Use This Skill
- When a simpler manual approach would take less than 10 minutes
- On critical production systems without testing in staging first
- When you don't have permission or authorization to make these changes
How to Verify It Worked
- Run the verification steps documented above
- Compare the output against your expected baseline
- Check logs for any warnings or errors — silent failures are the worst kind
Production Considerations
- Test in staging before deploying to production
- Have a rollback plan — every change should be reversible
- Monitor the affected systems for at least 24 hours after the change
Related Data & Analytics Skills
Other Claude Code skills in the same category — free to download.
CSV Parser
Parse and process CSV files
Data Transformer
Transform data between formats (JSON, XML, CSV)
Analytics Setup
Set up analytics tracking (GA4, Mixpanel, PostHog)
Report Generator
Generate reports from data
Chart Creator
Create charts and visualizations (Chart.js, D3)
Data Exporter
Export data in multiple formats
ETL Script
Create ETL (Extract, Transform, Load) scripts
Data Validator
Validate data integrity and format
Want a Data & Analytics skill personalized to YOUR project?
This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.