Configure source and sink connectors for data integration
✓Works with OpenClaudeYou are a Kafka Connect administrator. The user wants to configure and deploy source and sink connectors for reliable data integration between Kafka and external systems.
What to check first
- Run
curl http://localhost:8083/connector-pluginsto verify Kafka Connect REST API is accessible - Check
$KAFKA_HOME/config/connect-distributed.propertiesto confirm broker URLs and worker settings are correct - Verify connector JARs exist in
$KAFKA_HOME/libs/or the plugin path specified inplugin.path
Steps
- Start Kafka Connect in distributed mode using
$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties - Verify the Connect cluster is healthy by checking
curl http://localhost:8083/returns worker metadata - List available connector plugins with
curl http://localhost:8083/connector-plugins | jq '.[].class'to confirm connectors are loaded - Create a source connector by POSTing JSON config to
http://localhost:8083/connectorswith name, connector class, and source-specific tasks - Create a sink connector with target system credentials, topic subscriptions, and connection pooling settings
- Monitor connector status with
curl http://localhost:8083/connectors/{name}/statusand check task-level failures - Scale horizontally by adding workers to the same cluster — they auto-discover via group ID in the config file
- Update connector configs by PUTting new JSON to
http://localhost:8083/connectors/{name}/configwithout restarting
Code
#!/bin/bash
# Start Kafka Connect worker
export KAFKA_HOME=/opt/kafka
$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties &
sleep 5
# Create a source connector (e.g., JDBC source from PostgreSQL)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.source.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://postgres:5432/mydb",
"connection.user": "postgres",
"connection.password": "secret",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "pg_",
"poll.interval.ms": "5000"
}
}'
sleep 3
# Create a sink connector (e.g., Elasticsearch sink)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "es-sink",
Note: this example was truncated in the source. See the GitHub repo for the latest full version.
Common Pitfalls
- Treating this skill as a one-shot solution — most workflows need iteration and verification
- Skipping the verification steps — you don't know it worked until you measure
- Applying this skill without understanding the underlying problem — read the related docs first
When NOT to Use This Skill
- When a simpler manual approach would take less than 10 minutes
- On critical production systems without testing in staging first
- When you don't have permission or authorization to make these changes
How to Verify It Worked
- Run the verification steps documented above
- Compare the output against your expected baseline
- Check logs for any warnings or errors — silent failures are the worst kind
Production Considerations
- Test in staging before deploying to production
- Have a rollback plan — every change should be reversible
- Monitor the affected systems for at least 24 hours after the change
Related Kafka Skills
Other Claude Code skills in the same category — free to download.
Kafka Producer
Build Kafka producers with serialization, partitioning, and delivery guarantees
Kafka Consumer
Build Kafka consumers with consumer groups, offsets, and error handling
Kafka Streams
Build stream processing applications with Kafka Streams DSL
Kafka Schema Registry
Manage Avro/Protobuf schemas with Confluent Schema Registry
Kafka Monitoring
Monitor Kafka clusters with metrics, consumer lag, and alerting
Kafka Consumer Group Setup
Configure Kafka consumer groups for parallel processing and fault tolerance
Want a Kafka skill personalized to YOUR project?
This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.