$120 tested Claude codes · real before/after data · Full tier $15 one-timebuy --sheet=15 →
$Free 40-page Claude guide — setup, 120 prompt codes, MCP servers, AI agents. download --free →
clskills.sh — terminal v2.4 — 2,347 skills indexed● online
[CL]Skills_
Data & Analyticsadvanced

Data Pipeline

Share

Create data processing pipeline

Works with OpenClaude

You are a data engineer specializing in building robust ETL pipelines. The user wants to create a data processing pipeline that ingests, transforms, and loads data through multiple stages with error handling and monitoring.

What to check first

  • Verify Python 3.8+ is installed with python --version
  • Check if required packages exist: pip list | grep -E "pandas|apache-airflow|pydantic"
  • Inspect your data source connection credentials and schema structure
  • Ensure you have write permissions to your target database or data warehouse

Steps

  1. Define your pipeline stages as separate functions with clear input/output contracts using Pydantic models for type validation
  2. Create an orchestration layer using Apache Airflow DAGs or a custom async task queue to manage dependencies and execution order
  3. Implement data validation at each stage using schema validators (Great Expectations or Pydantic) to catch malformed data before transformation
  4. Add idempotency checks using unique identifiers or timestamps to prevent duplicate processing and enable safe retries
  5. Build monitoring and logging with structured JSON logs capturing stage name, row counts, execution time, and error details
  6. Implement backpressure handling using batch processing with configurable chunk sizes to prevent memory overload on large datasets
  7. Create error recovery mechanisms with dead-letter queues for failed records and automatic retry logic with exponential backoff
  8. Add data quality metrics at the pipeline exit (null percentages, value distributions, row count deltas) and alert on anomalies

Code

import logging
import json
from typing import List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, ValidationError
import pandas as pd
from functools import wraps
from pathlib import Path

# Configure structured logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RecordSchema(BaseModel):
    id: int
    name: str
    email: str
    amount: float
    timestamp: str

class PipelineStage:
    def __init__(self, stage_name: str, batch_size: int = 1000):
        self.stage_name = stage_name
        self.batch_size = batch_size
        self.processed_count = 0
        self.failed_count = 0
        self.dead_letter_queue = []

    def log_stage_metric(self, metric_name: str, value: Any):
        logger.info(json.dumps({
            "stage": self.stage_name,
            "metric": metric_name,
            "value": value,
            "timestamp": datetime.utcnow().isoformat()
        }))

    def process_batch(self, records: List[Dict]) -> List[Dict]:
        """Override in subclass"""
        return records

    def execute(self, records: List[Dict]) -> tuple[List[Dict], List[Dict]]:
        """Execute stage with error handling and batching"""
        processed = []
        failed = []
        
        for i in range

Note: this example was truncated in the source. See the GitHub repo for the latest full version.

Common Pitfalls

  • Treating this skill as a one-shot solution — most workflows need iteration and verification
  • Skipping the verification steps — you don't know it worked until you measure
  • Applying this skill without understanding the underlying problem — read the related docs first

When NOT to Use This Skill

  • When a simpler manual approach would take less than 10 minutes
  • On critical production systems without testing in staging first
  • When you don't have permission or authorization to make these changes

How to Verify It Worked

  • Run the verification steps documented above
  • Compare the output against your expected baseline
  • Check logs for any warnings or errors — silent failures are the worst kind

Production Considerations

  • Test in staging before deploying to production
  • Have a rollback plan — every change should be reversible
  • Monitor the affected systems for at least 24 hours after the change

Quick Info

Difficultyadvanced
Version1.0.0
AuthorClaude Skills Hub
datapipelineprocessing

Install command:

curl -o ~/.claude/skills/data-pipeline.md https://claude-skills-hub.vercel.app/skills/data/data-pipeline.md

Related Data & Analytics Skills

Other Claude Code skills in the same category — free to download.

Want a Data & Analytics skill personalized to YOUR project?

This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.