Kafka #

Apache Kafka adalah platform distributed event streaming yang dirancang untuk menangani jutaan pesan per detik dengan latensi rendah dan durabilitas tinggi. Berbeda dari message broker tradisional seperti RabbitMQ yang fokus pada routing pesan, Kafka dirancang sebagai log terdistribusi yang persisten — pesan disimpan di disk dan bisa dibaca ulang oleh banyak consumer secara independen. Ini membuatnya ideal untuk event sourcing, data pipeline real-time, audit log, dan integrasi antar microservice dalam skala besar. Memahami konsep Topic, Partition, Consumer Group, dan Offset adalah fondasi sebelum menulis kode Kafka yang andal.

Konsep Dasar Kafka #

Arsitektur Kafka:

Producer                    Broker (Kafka Server)                Consumer
─────────                   ─────────────────────                ────────
App A  ──┐                  Topic: "orders"                  ┌── Service X
         ├─── publish ────► Partition 0: [msg1][msg2][msg3]  │   (Group A)
App B  ──┘                  Partition 1: [msg4][msg5]        └── Service Y
                            Partition 2: [msg6][msg7][msg8]
                                 │
                                 └── Topic bisa punya banyak partition
                                     untuk parallelisme dan skalabilitas

Konsep penting:
  Topic         -- kategori/saluran pesan (seperti "tabel" di database)
  Partition     -- subdivisi topic untuk parallelisme; pesan dalam satu partition urut
  Offset        -- posisi pesan dalam partition (0, 1, 2, ...)
  Consumer Group -- sekelompok consumer yang berbagi beban membaca satu topic
  Broker        -- server Kafka; cluster biasanya terdiri dari 3+ broker
  Replication   -- setiap partition punya N replika di broker berbeda untuk fault tolerance
Sejak Kafka 3.3+, mode KRaft (Kafka Raft) tersedia sebagai pengganti Zookeeper dan menjadi default di versi terbaru. KRaft menyederhanakan operasi cluster secara signifikan — kamu tidak perlu menjalankan Zookeeper secara terpisah lagi. Untuk instalasi baru, gunakan Kafka 3.x dengan mode KRaft.

Instalasi #

# Library Python untuk Kafka
pip install kafka-python

# Alternatif: confluent-kafka (binding librdkafka, lebih performan untuk produksi)
pip install confluent-kafka

Untuk menjalankan Kafka di lokal, cara termudah menggunakan Docker:

# docker-compose.yml untuk Kafka dengan KRaft (tanpa Zookeeper)
# Simpan sebagai docker-compose.yml dan jalankan: docker compose up -d
version: "3"
services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

Producer — Mengirim Pesan #

Producer mengirim pesan ke topic Kafka. Pesan terdiri dari key (opsional) dan value, keduanya berupa bytes. Serialisasi ke JSON perlu dilakukan secara eksplisit.

import json
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError

BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")

# ANTI-PATTERN: kirim bytes mentah tanpa serialisasi
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
producer.send("orders", b"{'order_id': 1}")  # ✗ -- bukan JSON valid, sulit di-consume

# BENAR: konfigurasi serializer JSON
producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",             # tunggu konfirmasi dari semua replika (paling aman)
    retries=3,              # coba ulang jika gagal
    retry_backoff_ms=300,   # jeda antar retry
    request_timeout_ms=30000,
    compression_type="gzip" # kompres pesan untuk efisiensi bandwidth
)

Kirim Pesan Sederhana #

def kirim_order(producer: KafkaProducer, order: dict) -> None:
    topic   = "orders"
    # Key menentukan partition -- pesan dengan key sama selalu ke partition yang sama
    # Gunakan ID entitas sebagai key untuk menjaga urutan per entitas
    key     = str(order.get("order_id"))

    future = producer.send(topic, key=key, value=order)

    try:
        record_metadata = future.get(timeout=10)  # tunggu konfirmasi
        print(
            f"Pesan terkirim → topic: {record_metadata.topic}, "
            f"partition: {record_metadata.partition}, "
            f"offset: {record_metadata.offset}"
        )
    except KafkaError as e:
        print(f"Gagal mengirim pesan: {e}")

# Penggunaan
order = {
    "order_id":   1001,
    "pengguna_id": 42,
    "produk":     "Laptop Gaming ASUS",
    "total":      18500000,
    "status":     "pending",
    "timestamp":  "2024-03-15T10:30:00Z"
}

kirim_order(producer, order)
producer.flush()  # pastikan semua pesan terkirim sebelum exit

Producer dengan Callback (Non-blocking) #

def on_send_success(record_metadata):
    print(
        f"✓ Terkirim → {record_metadata.topic}:"
        f"[{record_metadata.partition}]@{record_metadata.offset}"
    )

def on_send_error(exc):
    print(f"✗ Gagal mengirim: {exc}")

def kirim_event_async(producer: KafkaProducer, topic: str, key: str, event: dict) -> None:
    producer.send(topic, key=key, value=event) \
            .add_callback(on_send_success)     \
            .add_errback(on_send_error)

# Kirim banyak event secara asinkron
events = [
    {"event": "user_login",    "user_id": 1, "ip": "192.168.1.1"},
    {"event": "product_view",  "user_id": 1, "product_id": 101},
    {"event": "add_to_cart",   "user_id": 1, "product_id": 101},
]

for event in events:
    kirim_event_async(producer, "user-events", str(event["user_id"]), event)

producer.flush()  # tunggu semua callback selesai
producer.close()

Consumer — Menerima Pesan #

Consumer membaca pesan dari topic. Pengaturan auto_offset_reset dan enable_auto_commit adalah dua parameter yang paling sering menjadi sumber bug.

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import signal

BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")

# ANTI-PATTERN: enable_auto_commit=True tanpa memahami risikonya
consumer = KafkaConsumer(
    "orders",
    bootstrap_servers=BOOTSTRAP_SERVERS,
    enable_auto_commit=True,   # ✗ -- commit otomatis sebelum pemrosesan selesai
    group_id="order-service"   #    bisa menyebabkan pesan hilang jika crash saat proses
)

# BENAR: manual commit setelah pemrosesan berhasil
consumer = KafkaConsumer(
    "orders",
    bootstrap_servers=BOOTSTRAP_SERVERS,
    group_id="order-service",
    auto_offset_reset="earliest",   # mulai dari awal jika group baru / offset tidak ada
                                    # "latest" = hanya baca pesan baru
    enable_auto_commit=False,       # commit manual setelah pemrosesan berhasil
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    key_deserializer=lambda k: k.decode("utf-8") if k else None,
    session_timeout_ms=30000,
    heartbeat_interval_ms=10000,
    max_poll_records=50             # maksimum pesan per poll
)

Consumer Loop dengan Graceful Shutdown #

import signal
import threading

def proses_order(order: dict) -> None:
    """Logika bisnis pemrosesan order."""
    print(f"Memproses order #{order['order_id']} — total Rp{order['total']:,.0f}")
    # ... simpan ke database, kirim notifikasi, dll.

def jalankan_consumer():
    berjalan = True

    def handle_shutdown(signum, frame):
        nonlocal berjalan
        print("Menerima sinyal shutdown, menghentikan consumer...")
        berjalan = False

    signal.signal(signal.SIGINT,  handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    consumer = KafkaConsumer(
        "orders",
        bootstrap_servers=BOOTSTRAP_SERVERS,
        group_id="order-service",
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        key_deserializer=lambda k: k.decode("utf-8") if k else None,
    )

    print("Consumer aktif, menunggu pesan...")
    try:
        while berjalan:
            # poll() dengan timeout agar bisa cek flag berjalan secara periodik
            records = consumer.poll(timeout_ms=1000, max_records=50)

            for topic_partition, messages in records.items():
                for msg in messages:
                    try:
                        proses_order(msg.value)
                        # Commit hanya setelah pemrosesan berhasil
                        consumer.commit()
                    except Exception as e:
                        print(f"Error memproses pesan offset {msg.offset}: {e}")
                        # Jangan commit -- pesan akan dibaca ulang setelah restart
                        # Tambahkan ke dead letter queue jika perlu

    finally:
        consumer.close()
        print("Consumer ditutup dengan bersih.")

jalankan_consumer()

Consumer Group dan Partition Assignment #

Consumer group memungkinkan beberapa instance consumer berbagi beban membaca satu topic — setiap partition hanya dibaca oleh satu consumer dalam group pada satu waktu.

# Diagram consumer group:
#
# Topic "orders" dengan 3 partition:
#
# Partition 0 ──────► Consumer A  ┐
# Partition 1 ──────► Consumer B  ├── Group: "order-service"
# Partition 2 ──────► Consumer C  ┘
#
# Jika Consumer B mati:
# Partition 0 ──────► Consumer A  ┐
# Partition 1 ──────► Consumer A  ├── Rebalance otomatis
# Partition 2 ──────► Consumer C  ┘

# Subscribe ke beberapa topic sekaligus
consumer.subscribe(["orders", "payments", "notifications"])

# Atau assign ke partition spesifik secara manual (tanpa rebalance)
from kafka import TopicPartition

consumer.assign([
    TopicPartition("orders", 0),
    TopicPartition("orders", 1),
])

# Cek assignment saat ini
print("Assigned partitions:", consumer.assignment())

# Cek lag (berapa pesan yang belum diproses)
for tp in consumer.assignment():
    committed = consumer.committed(tp)
    end       = consumer.end_offsets([tp])[tp]
    lag       = end - (committed.offset if committed else 0)
    print(f"{tp.topic}[{tp.partition}]: lag={lag}")

Serialisasi Pesan #

Untuk aplikasi production, gunakan format yang lebih efisien dari JSON — seperti Avro atau Protobuf — terutama untuk throughput tinggi.

import json
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class OrderEvent:
    order_id:   int
    user_id:    int
    total:      float
    status:     str
    created_at: str = None

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat() + "Z"

# Producer dengan dataclass
producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(asdict(v), ensure_ascii=False).encode("utf-8"),
    key_serializer=lambda k: str(k).encode("utf-8") if k else None,
    acks="all"
)

event = OrderEvent(order_id=1001, user_id=42, total=18500000.0, status="created")
producer.send("order-events", key=event.order_id, value=event)
producer.flush()

# Consumer yang me-reconstruct dataclass
consumer = KafkaConsumer(
    "order-events",
    bootstrap_servers=BOOTSTRAP_SERVERS,
    group_id="order-processor",
    auto_offset_reset="earliest",
    enable_auto_commit=False,
    value_deserializer=lambda v: OrderEvent(**json.loads(v.decode("utf-8")))
)

for msg in consumer:
    order: OrderEvent = msg.value
    print(f"Order #{order.order_id}: Rp{order.total:,.0f}{order.status}")
    consumer.commit()

Manajemen Topic #

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

def buat_topic(
    nama:         str,
    num_partitions: int = 3,
    replication_factor: int = 1
) -> None:
    admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)

    topic = NewTopic(
        name=nama,
        num_partitions=num_partitions,
        replication_factor=replication_factor,
        topic_configs={
            "retention.ms":     str(7 * 24 * 60 * 60 * 1000),  # simpan 7 hari
            "cleanup.policy":   "delete",
            "compression.type": "gzip"
        }
    )

    try:
        admin.create_topics([topic])
        print(f"Topic '{nama}' berhasil dibuat dengan {num_partitions} partition.")
    except TopicAlreadyExistsError:
        print(f"Topic '{nama}' sudah ada.")
    finally:
        admin.close()

def list_topics() -> list[str]:
    admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
    topics = admin.list_topics()
    admin.close()
    return [t for t in topics if not t.startswith("__")]  # filter internal topics

def hapus_topic(nama: str) -> None:
    admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
    admin.delete_topics([nama])
    admin.close()
    print(f"Topic '{nama}' dihapus.")

# Buat topic untuk aplikasi
buat_topic("orders",       num_partitions=3)
buat_topic("order-events", num_partitions=3)
buat_topic("user-events",  num_partitions=6)
print("Topics:", list_topics())

Penanganan Error dan Dead Letter Queue #

from kafka import KafkaProducer, KafkaConsumer
import json

DLQ_TOPIC = "orders-dlq"   # Dead Letter Queue -- pesan yang gagal diproses

def proses_dengan_retry(
    consumer: KafkaConsumer,
    dlq_producer: KafkaProducer,
    maks_retry: int = 3
) -> None:

    for msg in consumer:
        retry_count = 0
        berhasil    = False

        while retry_count <= maks_retry and not berhasil:
            try:
                # Proses pesan
                proses_order(msg.value)
                consumer.commit()
                berhasil = True

            except Exception as e:
                retry_count += 1
                print(f"Retry {retry_count}/{maks_retry} untuk offset {msg.offset}: {e}")

                if retry_count > maks_retry:
                    # Kirim ke Dead Letter Queue
                    dlq_payload = {
                        "original_topic":     msg.topic,
                        "original_partition": msg.partition,
                        "original_offset":    msg.offset,
                        "original_key":       msg.key,
                        "original_value":     msg.value,
                        "error":              str(e),
                        "failed_at":          datetime.utcnow().isoformat()
                    }
                    dlq_producer.send(DLQ_TOPIC, value=dlq_payload)
                    dlq_producer.flush()
                    consumer.commit()  # commit agar tidak loop terus
                    print(f"Pesan dikirim ke DLQ: {DLQ_TOPIC}")

Kapan Memilih Kafka vs Broker Lain #

Pilih Kafka jika:
  ✓ Throughput sangat tinggi (jutaan pesan/detik)
  ✓ Perlu replay pesan (consumer bisa baca ulang dari offset manapun)
  ✓ Banyak consumer independen membaca topic yang sama
  ✓ Event sourcing atau audit log jangka panjang
  ✓ Data pipeline real-time antar banyak service

Pilih RabbitMQ jika:
  ✓ Routing pesan kompleks (exchange, binding, routing key)
  ✓ Butuh acknowledgment per-pesan yang lebih granular
  ✓ Ukuran cluster kecil dan tim tidak familiar dengan Kafka
  ✓ Kebutuhan message TTL dan priority queue

Pilih Amazon SQS / Google Pub/Sub jika:
  ✓ Ingin managed service tanpa operasional cluster
  ✓ Sudah di ekosistem AWS / GCP

Ringkasan #

  • enable_auto_commit=False — selalu gunakan manual commit agar pesan tidak dianggap selesai sebelum benar-benar diproses; auto commit bisa menyebabkan pesan hilang jika crash saat pemrosesan.
  • Key menentukan partition — pesan dengan key yang sama selalu dikirim ke partition yang sama, menjaga urutan per entitas (misal per user_id atau order_id).
  • acks="all" — gunakan di producer untuk memastikan pesan tersimpan di semua replika sebelum dianggap berhasil; mengorbankan latensi untuk durabilitas.
  • auto_offset_reset"earliest" untuk baca dari awal (consumer baru/group baru); "latest" untuk hanya baca pesan baru setelah consumer aktif.
  • poll() dengan timeout — gunakan consumer.poll(timeout_ms=1000) daripada iterasi langsung for msg in consumer agar bisa cek flag shutdown secara periodik.
  • Consumer Group untuk skalabilitas — jalankan beberapa instance consumer dengan group_id yang sama; Kafka otomatis mendistribusikan partition ke masing-masing instance.
  • Serialisasi eksplisit — selalu definisikan value_serializer dan value_deserializer di constructor; jangan kirim bytes mentah tanpa format yang terdefinisi.
  • Dead Letter Queue (DLQ) — kirim pesan yang gagal diproses setelah N retry ke topic terpisah agar tidak memblokir pemrosesan pesan lain.
  • Mode KRaft — gunakan Kafka 3.x dengan mode KRaft untuk instalasi baru; tidak perlu Zookeeper lagi.
  • producer.flush() sebelum exit — pastikan semua pesan dalam buffer terkirim sebelum aplikasi berhenti.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact