Build Kafka consumers with consumer groups, offsets, and error handling
✓Works with OpenClaudeYou are a Kafka infrastructure engineer. The user wants to build production-ready Kafka consumers with proper consumer group management, offset handling, and error recovery.
What to check first
- Verify Kafka broker is running:
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 - Confirm topic exists:
kafka-topics.sh --list --bootstrap-server localhost:9092 - Check installed kafka-python or confluent-kafka version:
pip list | grep kafka
Steps
- Import the KafkaConsumer class and configure bootstrap servers with your broker addresses
- Set the
group_idparameter to assign the consumer to a consumer group for load balancing - Define
auto_offset_resettoearliestorlatestto handle missing offset scenarios - Set
enable_auto_committoFalseto implement manual offset management for critical applications - Subscribe to one or more topics using the
subscribe()method with a list of topic names - Implement a poll loop with
poll(timeout_ms=1000)to fetch message batches - Add try-except blocks to handle deserialization errors, network timeouts, and offset commit failures
- Call
commit()explicitly after successful message processing to persist offset positions
Code
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KafkaConsumerApp:
def __init__(self, bootstrap_servers, group_id, topics):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_records=100
)
self.consumer.subscribe(topics)
self.topics = topics
def process_message(self, message):
"""Override this method to implement custom processing logic"""
logger.info(f"Processing: {message.value}")
return True
def run(self):
"""Main consumer loop with error handling"""
try:
for message in self.consumer:
try:
if self.process_message(message):
self.consumer.commit()
logger.info(
f"Committed offset {message.offset} "
f"for partition {message.partition}"
)
else:
logger.warning(f"Failed to process message at offset {message.offset}")
except Exception as e:
logger.error(
Note: this example was truncated in the source. See the GitHub repo for the latest full version.
Common Pitfalls
- Treating this skill as a one-shot solution — most workflows need iteration and verification
- Skipping the verification steps — you don't know it worked until you measure
- Applying this skill without understanding the underlying problem — read the related docs first
When NOT to Use This Skill
- When a simpler manual approach would take less than 10 minutes
- On critical production systems without testing in staging first
- When you don't have permission or authorization to make these changes
How to Verify It Worked
- Run the verification steps documented above
- Compare the output against your expected baseline
- Check logs for any warnings or errors — silent failures are the worst kind
Production Considerations
- Test in staging before deploying to production
- Have a rollback plan — every change should be reversible
- Monitor the affected systems for at least 24 hours after the change
Related Kafka Skills
Other Claude Code skills in the same category — free to download.
Kafka Producer
Build Kafka producers with serialization, partitioning, and delivery guarantees
Kafka Streams
Build stream processing applications with Kafka Streams DSL
Kafka Connect
Configure source and sink connectors for data integration
Kafka Schema Registry
Manage Avro/Protobuf schemas with Confluent Schema Registry
Kafka Monitoring
Monitor Kafka clusters with metrics, consumer lag, and alerting
Kafka Consumer Group Setup
Configure Kafka consumer groups for parallel processing and fault tolerance
Want a Kafka skill personalized to YOUR project?
This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.