Kafka #
Apache Kafka adalah platform distributed event streaming yang dirancang untuk menangani jutaan pesan per detik dengan latensi rendah dan durabilitas tinggi. Berbeda dari message broker tradisional seperti RabbitMQ yang fokus pada routing pesan, Kafka dirancang sebagai log terdistribusi yang persisten — pesan disimpan di disk dan bisa dibaca ulang oleh banyak consumer secara independen. Ini membuatnya ideal untuk event sourcing, data pipeline real-time, audit log, dan integrasi antar microservice dalam skala besar. Memahami konsep Topic, Partition, Consumer Group, dan Offset adalah fondasi sebelum menulis kode Kafka yang andal.
Konsep Dasar Kafka #
Arsitektur Kafka:
Producer Broker (Kafka Server) Consumer
───────── ───────────────────── ────────
App A ──┐ Topic: "orders" ┌── Service X
├─── publish ────► Partition 0: [msg1][msg2][msg3] │ (Group A)
App B ──┘ Partition 1: [msg4][msg5] └── Service Y
Partition 2: [msg6][msg7][msg8]
│
└── Topic bisa punya banyak partition
untuk parallelisme dan skalabilitas
Konsep penting:
Topic -- kategori/saluran pesan (seperti "tabel" di database)
Partition -- subdivisi topic untuk parallelisme; pesan dalam satu partition urut
Offset -- posisi pesan dalam partition (0, 1, 2, ...)
Consumer Group -- sekelompok consumer yang berbagi beban membaca satu topic
Broker -- server Kafka; cluster biasanya terdiri dari 3+ broker
Replication -- setiap partition punya N replika di broker berbeda untuk fault tolerance
Sejak Kafka 3.3+, mode KRaft (Kafka Raft) tersedia sebagai pengganti Zookeeper dan menjadi default di versi terbaru. KRaft menyederhanakan operasi cluster secara signifikan — kamu tidak perlu menjalankan Zookeeper secara terpisah lagi. Untuk instalasi baru, gunakan Kafka 3.x dengan mode KRaft.
Instalasi #
# Library Python untuk Kafka
pip install kafka-python
# Alternatif: confluent-kafka (binding librdkafka, lebih performan untuk produksi)
pip install confluent-kafka
Untuk menjalankan Kafka di lokal, cara termudah menggunakan Docker:
# docker-compose.yml untuk Kafka dengan KRaft (tanpa Zookeeper)
# Simpan sebagai docker-compose.yml dan jalankan: docker compose up -d
version: "3"
services:
kafka:
image: bitnami/kafka:latest
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
Producer — Mengirim Pesan #
Producer mengirim pesan ke topic Kafka. Pesan terdiri dari key (opsional) dan value, keduanya berupa bytes. Serialisasi ke JSON perlu dilakukan secara eksplisit.
import json
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError
BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
# ANTI-PATTERN: kirim bytes mentah tanpa serialisasi
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
producer.send("orders", b"{'order_id': 1}") # ✗ -- bukan JSON valid, sulit di-consume
# BENAR: konfigurasi serializer JSON
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all", # tunggu konfirmasi dari semua replika (paling aman)
retries=3, # coba ulang jika gagal
retry_backoff_ms=300, # jeda antar retry
request_timeout_ms=30000,
compression_type="gzip" # kompres pesan untuk efisiensi bandwidth
)
Kirim Pesan Sederhana #
def kirim_order(producer: KafkaProducer, order: dict) -> None:
topic = "orders"
# Key menentukan partition -- pesan dengan key sama selalu ke partition yang sama
# Gunakan ID entitas sebagai key untuk menjaga urutan per entitas
key = str(order.get("order_id"))
future = producer.send(topic, key=key, value=order)
try:
record_metadata = future.get(timeout=10) # tunggu konfirmasi
print(
f"Pesan terkirim → topic: {record_metadata.topic}, "
f"partition: {record_metadata.partition}, "
f"offset: {record_metadata.offset}"
)
except KafkaError as e:
print(f"Gagal mengirim pesan: {e}")
# Penggunaan
order = {
"order_id": 1001,
"pengguna_id": 42,
"produk": "Laptop Gaming ASUS",
"total": 18500000,
"status": "pending",
"timestamp": "2024-03-15T10:30:00Z"
}
kirim_order(producer, order)
producer.flush() # pastikan semua pesan terkirim sebelum exit
Producer dengan Callback (Non-blocking) #
def on_send_success(record_metadata):
print(
f"✓ Terkirim → {record_metadata.topic}:"
f"[{record_metadata.partition}]@{record_metadata.offset}"
)
def on_send_error(exc):
print(f"✗ Gagal mengirim: {exc}")
def kirim_event_async(producer: KafkaProducer, topic: str, key: str, event: dict) -> None:
producer.send(topic, key=key, value=event) \
.add_callback(on_send_success) \
.add_errback(on_send_error)
# Kirim banyak event secara asinkron
events = [
{"event": "user_login", "user_id": 1, "ip": "192.168.1.1"},
{"event": "product_view", "user_id": 1, "product_id": 101},
{"event": "add_to_cart", "user_id": 1, "product_id": 101},
]
for event in events:
kirim_event_async(producer, "user-events", str(event["user_id"]), event)
producer.flush() # tunggu semua callback selesai
producer.close()
Consumer — Menerima Pesan #
Consumer membaca pesan dari topic. Pengaturan auto_offset_reset dan enable_auto_commit adalah dua parameter yang paling sering menjadi sumber bug.
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import signal
BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
# ANTI-PATTERN: enable_auto_commit=True tanpa memahami risikonya
consumer = KafkaConsumer(
"orders",
bootstrap_servers=BOOTSTRAP_SERVERS,
enable_auto_commit=True, # ✗ -- commit otomatis sebelum pemrosesan selesai
group_id="order-service" # bisa menyebabkan pesan hilang jika crash saat proses
)
# BENAR: manual commit setelah pemrosesan berhasil
consumer = KafkaConsumer(
"orders",
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="order-service",
auto_offset_reset="earliest", # mulai dari awal jika group baru / offset tidak ada
# "latest" = hanya baca pesan baru
enable_auto_commit=False, # commit manual setelah pemrosesan berhasil
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
key_deserializer=lambda k: k.decode("utf-8") if k else None,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_records=50 # maksimum pesan per poll
)
Consumer Loop dengan Graceful Shutdown #
import signal
import threading
def proses_order(order: dict) -> None:
"""Logika bisnis pemrosesan order."""
print(f"Memproses order #{order['order_id']} — total Rp{order['total']:,.0f}")
# ... simpan ke database, kirim notifikasi, dll.
def jalankan_consumer():
berjalan = True
def handle_shutdown(signum, frame):
nonlocal berjalan
print("Menerima sinyal shutdown, menghentikan consumer...")
berjalan = False
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
consumer = KafkaConsumer(
"orders",
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="order-service",
auto_offset_reset="earliest",
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
key_deserializer=lambda k: k.decode("utf-8") if k else None,
)
print("Consumer aktif, menunggu pesan...")
try:
while berjalan:
# poll() dengan timeout agar bisa cek flag berjalan secara periodik
records = consumer.poll(timeout_ms=1000, max_records=50)
for topic_partition, messages in records.items():
for msg in messages:
try:
proses_order(msg.value)
# Commit hanya setelah pemrosesan berhasil
consumer.commit()
except Exception as e:
print(f"Error memproses pesan offset {msg.offset}: {e}")
# Jangan commit -- pesan akan dibaca ulang setelah restart
# Tambahkan ke dead letter queue jika perlu
finally:
consumer.close()
print("Consumer ditutup dengan bersih.")
jalankan_consumer()
Consumer Group dan Partition Assignment #
Consumer group memungkinkan beberapa instance consumer berbagi beban membaca satu topic — setiap partition hanya dibaca oleh satu consumer dalam group pada satu waktu.
# Diagram consumer group:
#
# Topic "orders" dengan 3 partition:
#
# Partition 0 ──────► Consumer A ┐
# Partition 1 ──────► Consumer B ├── Group: "order-service"
# Partition 2 ──────► Consumer C ┘
#
# Jika Consumer B mati:
# Partition 0 ──────► Consumer A ┐
# Partition 1 ──────► Consumer A ├── Rebalance otomatis
# Partition 2 ──────► Consumer C ┘
# Subscribe ke beberapa topic sekaligus
consumer.subscribe(["orders", "payments", "notifications"])
# Atau assign ke partition spesifik secara manual (tanpa rebalance)
from kafka import TopicPartition
consumer.assign([
TopicPartition("orders", 0),
TopicPartition("orders", 1),
])
# Cek assignment saat ini
print("Assigned partitions:", consumer.assignment())
# Cek lag (berapa pesan yang belum diproses)
for tp in consumer.assignment():
committed = consumer.committed(tp)
end = consumer.end_offsets([tp])[tp]
lag = end - (committed.offset if committed else 0)
print(f"{tp.topic}[{tp.partition}]: lag={lag}")
Serialisasi Pesan #
Untuk aplikasi production, gunakan format yang lebih efisien dari JSON — seperti Avro atau Protobuf — terutama untuk throughput tinggi.
import json
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class OrderEvent:
order_id: int
user_id: int
total: float
status: str
created_at: str = None
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.utcnow().isoformat() + "Z"
# Producer dengan dataclass
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(asdict(v), ensure_ascii=False).encode("utf-8"),
key_serializer=lambda k: str(k).encode("utf-8") if k else None,
acks="all"
)
event = OrderEvent(order_id=1001, user_id=42, total=18500000.0, status="created")
producer.send("order-events", key=event.order_id, value=event)
producer.flush()
# Consumer yang me-reconstruct dataclass
consumer = KafkaConsumer(
"order-events",
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="order-processor",
auto_offset_reset="earliest",
enable_auto_commit=False,
value_deserializer=lambda v: OrderEvent(**json.loads(v.decode("utf-8")))
)
for msg in consumer:
order: OrderEvent = msg.value
print(f"Order #{order.order_id}: Rp{order.total:,.0f} — {order.status}")
consumer.commit()
Manajemen Topic #
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
def buat_topic(
nama: str,
num_partitions: int = 3,
replication_factor: int = 1
) -> None:
admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
topic = NewTopic(
name=nama,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs={
"retention.ms": str(7 * 24 * 60 * 60 * 1000), # simpan 7 hari
"cleanup.policy": "delete",
"compression.type": "gzip"
}
)
try:
admin.create_topics([topic])
print(f"Topic '{nama}' berhasil dibuat dengan {num_partitions} partition.")
except TopicAlreadyExistsError:
print(f"Topic '{nama}' sudah ada.")
finally:
admin.close()
def list_topics() -> list[str]:
admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
topics = admin.list_topics()
admin.close()
return [t for t in topics if not t.startswith("__")] # filter internal topics
def hapus_topic(nama: str) -> None:
admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
admin.delete_topics([nama])
admin.close()
print(f"Topic '{nama}' dihapus.")
# Buat topic untuk aplikasi
buat_topic("orders", num_partitions=3)
buat_topic("order-events", num_partitions=3)
buat_topic("user-events", num_partitions=6)
print("Topics:", list_topics())
Penanganan Error dan Dead Letter Queue #
from kafka import KafkaProducer, KafkaConsumer
import json
DLQ_TOPIC = "orders-dlq" # Dead Letter Queue -- pesan yang gagal diproses
def proses_dengan_retry(
consumer: KafkaConsumer,
dlq_producer: KafkaProducer,
maks_retry: int = 3
) -> None:
for msg in consumer:
retry_count = 0
berhasil = False
while retry_count <= maks_retry and not berhasil:
try:
# Proses pesan
proses_order(msg.value)
consumer.commit()
berhasil = True
except Exception as e:
retry_count += 1
print(f"Retry {retry_count}/{maks_retry} untuk offset {msg.offset}: {e}")
if retry_count > maks_retry:
# Kirim ke Dead Letter Queue
dlq_payload = {
"original_topic": msg.topic,
"original_partition": msg.partition,
"original_offset": msg.offset,
"original_key": msg.key,
"original_value": msg.value,
"error": str(e),
"failed_at": datetime.utcnow().isoformat()
}
dlq_producer.send(DLQ_TOPIC, value=dlq_payload)
dlq_producer.flush()
consumer.commit() # commit agar tidak loop terus
print(f"Pesan dikirim ke DLQ: {DLQ_TOPIC}")
Kapan Memilih Kafka vs Broker Lain #
Pilih Kafka jika:
✓ Throughput sangat tinggi (jutaan pesan/detik)
✓ Perlu replay pesan (consumer bisa baca ulang dari offset manapun)
✓ Banyak consumer independen membaca topic yang sama
✓ Event sourcing atau audit log jangka panjang
✓ Data pipeline real-time antar banyak service
Pilih RabbitMQ jika:
✓ Routing pesan kompleks (exchange, binding, routing key)
✓ Butuh acknowledgment per-pesan yang lebih granular
✓ Ukuran cluster kecil dan tim tidak familiar dengan Kafka
✓ Kebutuhan message TTL dan priority queue
Pilih Amazon SQS / Google Pub/Sub jika:
✓ Ingin managed service tanpa operasional cluster
✓ Sudah di ekosistem AWS / GCP
Ringkasan #
enable_auto_commit=False— selalu gunakan manual commit agar pesan tidak dianggap selesai sebelum benar-benar diproses; auto commit bisa menyebabkan pesan hilang jika crash saat pemrosesan.- Key menentukan partition — pesan dengan key yang sama selalu dikirim ke partition yang sama, menjaga urutan per entitas (misal per
user_idatauorder_id).acks="all"— gunakan di producer untuk memastikan pesan tersimpan di semua replika sebelum dianggap berhasil; mengorbankan latensi untuk durabilitas.auto_offset_reset—"earliest"untuk baca dari awal (consumer baru/group baru);"latest"untuk hanya baca pesan baru setelah consumer aktif.poll()dengan timeout — gunakanconsumer.poll(timeout_ms=1000)daripada iterasi langsungfor msg in consumeragar bisa cek flag shutdown secara periodik.- Consumer Group untuk skalabilitas — jalankan beberapa instance consumer dengan
group_idyang sama; Kafka otomatis mendistribusikan partition ke masing-masing instance.- Serialisasi eksplisit — selalu definisikan
value_serializerdanvalue_deserializerdi constructor; jangan kirim bytes mentah tanpa format yang terdefinisi.- Dead Letter Queue (DLQ) — kirim pesan yang gagal diproses setelah N retry ke topic terpisah agar tidak memblokir pemrosesan pesan lain.
- Mode KRaft — gunakan Kafka 3.x dengan mode KRaft untuk instalasi baru; tidak perlu Zookeeper lagi.
producer.flush()sebelum exit — pastikan semua pesan dalam buffer terkirim sebelum aplikasi berhenti.