🚀
Module 7: Kafka with Python
60 minutes2 examplesIntermediate
Hands-on Examples
Interactive examples to reinforce your learning
Complete Python Producer-Consumer Example
Full working example with producer and consumer
Code Example
# Complete Python Kafka Example
## producer.py
from kafka import KafkaProducer
import json
import time
import random
def create_producer():
return KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8'),
acks='all',
retries=3
)
def send_user_events(producer):
users = ['alice', 'bob', 'charlie', 'diana']
actions = ['login', 'logout', 'view_product', 'add_to_cart', 'purchase']
products = ['laptop', 'mouse', 'keyboard', 'monitor']
for i in range(20):
event = {
'user_id': random.choice(users),
'action': random.choice(actions),
'product_id': random.choice(products),
'timestamp': time.time(),
'session_id': f'session_{{i}}'
}
producer.send('user-events', key=event['user_id'], value=event)
print(f'Sent: {{event}}')
time.sleep(1)
producer.flush()
if __name__ == '__main__':
producer = create_producer()
send_user_events(producer)
producer.close()
## consumer.py
from kafka import KafkaConsumer
import json
from collections import defaultdict
def create_consumer():
return KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
group_id='analytics-group',
auto_offset_reset='earliest'
)
def process_events(consumer):
stats = defaultdict(int)
for message in consumer:
event = message.value
stats['total_events'] += 1
stats[f'action_{{event["action"]}}'] += 1
stats[f'user_{{event["user_id"]}}'] += 1
print(f'Processed: {{event["user_id"]}} - {{event["action"]}}')
if stats['total_events'] % 5 == 0:
print(f'\nStats: {{dict(stats)}}\n')
if __name__ == '__main__':
consumer = create_consumer()
process_events(consumer)
consumer.close()
## Run the example:
# Terminal 1: Start consumer
python consumer.py
# Terminal 2: Start producer
python producer.pyExpected Output:
Producer sends 20 user events, consumer processes them and displays analytics statistics.
Explanation:
This complete example shows how to create a producer that sends user events and a consumer that processes them with real-time analytics.
Course Navigation
1
Module 1: Introduction to Kafka2
Module 2: The Problem Statement3
Module 3: How Kafka Solves the Problem4
Module 4: Kafka Architecture (Deep Dive)5
Module 5: Consumer Groups in Kafka6
Module 6: Kafka Setup & Hands-On7
Module 7: Kafka with Python8
Module 8: Kafka Monitoring & Optimization9
Module 9: Final Project - Real-Time Analytics PlatformYour Progress
Examples Completed0/2