RabbitMQ #
RabbitMQ adalah message broker open-source yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) — protokol standar untuk pertukaran pesan antar aplikasi yang menjamin pengiriman, routing, dan antrian pesan secara andal. Berbeda dari Kafka yang dirancang sebagai log terdistribusi, RabbitMQ adalah broker klasik dengan model push: pesan diantarkan ke consumer segera saat tersedia, dan dihapus dari queue setelah di-acknowledge. Keunggulan RabbitMQ ada pada fleksibilitas routing melalui Exchange dengan berbagai tipe, dukungan message TTL, Dead Letter Exchange, dan priority queue — membuatnya sangat cocok untuk task queue, job scheduling, dan komunikasi antar microservice yang butuh routing kompleks.
Konsep Dasar RabbitMQ #
Alur pesan di RabbitMQ:
Producer Exchange Queue(s) Consumer
──────── ──────── ──────── ────────
App A ──publish──► [Exchange] ──binding──► [Queue A] ──► Worker 1
│ [Queue B] ──► Worker 2
└──────────────────► [Queue C] ──► Worker 3
Tipe Exchange:
direct -- routing berdasarkan exact match routing key
(Producer → "order.created" → Queue yang terikat dengan key "order.created")
fanout -- broadcast ke SEMUA queue yang terikat, routing key diabaikan
(Producer → Exchange → Queue A + Queue B + Queue C)
topic -- routing berdasarkan pattern matching dengan wildcard (* dan #)
(* = satu kata, # = nol atau lebih kata)
"order.*" → cocok dengan "order.created", "order.paid"
"order.#" → cocok dengan "order.created", "order.paid.success"
headers -- routing berdasarkan header pesan, bukan routing key
Instalasi #
pip install pika
Untuk menjalankan RabbitMQ secara lokal menggunakan Docker:
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=rahasia \
rabbitmq:3-management
# Management UI tersedia di http://localhost:15672
Membuat Koneksi #
import pika
import os
from urllib.parse import quote
# ANTI-PATTERN: hardcode koneksi tanpa autentikasi
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost") # ✗ -- tidak ada user/pass, tidak fleksibel
)
# BENAR: gunakan environment variable dan URLParameters
def get_connection() -> pika.BlockingConnection:
amqp_url = os.getenv(
"RABBITMQ_URL",
"amqp://admin:rahasia@localhost:5672/%2F"
# Format: amqp://user:pass@host:port/vhost
# %2F adalah URL-encoded "/" untuk default vhost
)
params = pika.URLParameters(amqp_url)
params.heartbeat = 60 # kirim heartbeat tiap 60 detik
params.blocked_connection_timeout = 300
return pika.BlockingConnection(params)
# Tes koneksi
conn = get_connection()
channel = conn.channel()
print("Koneksi RabbitMQ berhasil.")
conn.close()
Setup Exchange, Queue, dan Binding #
Selalu deklarasikan exchange dan queue di kedua sisi (producer dan consumer) dengan parameter yang sama — RabbitMQ bersifat idempotent untuk deklarasi yang identik.
import pika
import json
def setup_infrastruktur(channel: pika.channel.Channel) -> None:
"""
Deklarasikan exchange, queue, dan binding.
Aman dipanggil berulang kali — idempotent.
"""
# Exchange utama untuk order events
channel.exchange_declare(
exchange="order.events",
exchange_type="topic",
durable=True # bertahan setelah RabbitMQ restart
)
# Dead Letter Exchange -- untuk pesan yang gagal diproses
channel.exchange_declare(
exchange="order.dlx",
exchange_type="direct",
durable=True
)
# Queue untuk service pembayaran
channel.queue_declare(
queue="payment.queue",
durable=True, # queue bertahan setelah restart
arguments={
"x-dead-letter-exchange": "order.dlx", # kirim ke DLX jika gagal
"x-dead-letter-routing-key": "payment.dead",
"x-message-ttl": 86400000, # TTL 24 jam (ms)
"x-max-length": 10000 # maks 10.000 pesan
}
)
# Queue untuk service notifikasi
channel.queue_declare(
queue="notification.queue",
durable=True,
arguments={
"x-dead-letter-exchange": "order.dlx",
"x-dead-letter-routing-key": "notification.dead",
}
)
# Dead Letter Queue -- tampung pesan yang gagal
channel.queue_declare(queue="order.dead.queue", durable=True)
# Binding: exchange → queue dengan routing key
channel.queue_bind(
exchange="order.events",
queue="payment.queue",
routing_key="order.created" # hanya terima event "order.created"
)
channel.queue_bind(
exchange="order.events",
queue="payment.queue",
routing_key="order.updated"
)
channel.queue_bind(
exchange="order.events",
queue="notification.queue",
routing_key="order.*" # terima semua "order.{apapun}"
)
# DLX binding
channel.queue_bind(
exchange="order.dlx",
queue="order.dead.queue",
routing_key="payment.dead"
)
channel.queue_bind(
exchange="order.dlx",
queue="order.dead.queue",
routing_key="notification.dead"
)
print("Exchange, queue, dan binding berhasil dideklarasikan.")
Producer — Mengirim Pesan #
import pika
import json
from datetime import datetime, timezone
def buat_producer() -> tuple[pika.BlockingConnection, pika.channel.Channel]:
conn = get_connection()
channel = conn.channel()
setup_infrastruktur(channel)
return conn, channel
def kirim_event(
channel: pika.channel.Channel,
exchange: str,
routing_key: str,
payload: dict
) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
content_type = "application/json",
delivery_mode = pika.DeliveryMode.Persistent, # ✓ pesan tersimpan ke disk
message_id = payload.get("event_id"),
timestamp = int(datetime.now(timezone.utc).timestamp()),
headers = {
"source": "order-service",
"version": "1.0"
}
)
)
# Penggunaan
conn, channel = buat_producer()
try:
# Kirim event order.created
order_event = {
"event_id": "evt-001",
"event": "order.created",
"order_id": 1001,
"user_id": 42,
"total": 18500000,
"timestamp": datetime.now(timezone.utc).isoformat()
}
kirim_event(channel, "order.events", "order.created", order_event)
print(f"Event terkirim: {order_event['event']} (order #{order_event['order_id']})")
# Kirim event order.updated
update_event = {
"event_id": "evt-002",
"event": "order.updated",
"order_id": 1001,
"status": "paid",
"timestamp": datetime.now(timezone.utc).isoformat()
}
kirim_event(channel, "order.events", "order.updated", update_event)
finally:
conn.close()
Selalu gunakandelivery_mode=Persistentuntuk pesan penting. Tanpanya (delivery_mode=Transient), pesan hanya disimpan di memori dan akan hilang jika RabbitMQ restart atau crash. Persistent message dikombinasikan dengandurable=Truepada queue adalah pasangan yang menjamin durabilitas pesan.
Consumer — Menerima Pesan #
import pika
import json
import signal
def proses_pembayaran(order: dict) -> None:
"""Logika bisnis pemrosesan pembayaran."""
print(f"Memproses pembayaran order #{order['order_id']} — Rp{order['total']:,.0f}")
# Jika exception dilempar di sini, pesan akan di-nack dan masuk DLX
def jalankan_payment_consumer() -> None:
conn = get_connection()
channel = conn.channel()
setup_infrastruktur(channel)
# QoS: proses hanya 1 pesan per consumer pada satu waktu
# Pastikan beban terdistribusi merata antar worker
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
payload = json.loads(body.decode("utf-8"))
print(f"Menerima: {properties.message_id} — routing: {method.routing_key}")
proses_pembayaran(payload)
# ACK -- beritahu RabbitMQ pesan sudah diproses, hapus dari queue
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"✓ Pesan di-ACK: {properties.message_id}")
except json.JSONDecodeError as e:
print(f"✗ Pesan tidak valid (JSON error): {e}")
# Reject tanpa requeue -- langsung ke DLX
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f"✗ Error memproses pesan: {e}")
# ANTI-PATTERN: basic_ack saat error
# ch.basic_ack(delivery_tag=method.delivery_tag) # ✗ -- pesan hilang!
# BENAR: nack dengan requeue=False → masuk Dead Letter Exchange
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) # ✓
channel.basic_consume(
queue="payment.queue",
on_message_callback=callback,
auto_ack=False # WAJIB False -- commit manual setelah proses selesai
)
# Graceful shutdown
def handle_shutdown(signum, frame):
print("Shutdown signal diterima, menghentikan consumer...")
channel.stop_consuming()
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
print("Payment consumer aktif, menunggu pesan...")
try:
channel.start_consuming()
finally:
conn.close()
print("Consumer ditutup dengan bersih.")
jalankan_payment_consumer()
Exchange Types dalam Praktek #
Direct Exchange — Routing Tepat #
# Setup
channel.exchange_declare(exchange="notif.direct", exchange_type="direct", durable=True)
channel.queue_declare(queue="email.queue", durable=True)
channel.queue_declare(queue="sms.queue", durable=True)
channel.queue_declare(queue="push.queue", durable=True)
channel.queue_bind(exchange="notif.direct", queue="email.queue", routing_key="email")
channel.queue_bind(exchange="notif.direct", queue="sms.queue", routing_key="sms")
channel.queue_bind(exchange="notif.direct", queue="push.queue", routing_key="push")
# Kirim hanya ke email queue
channel.basic_publish(
exchange="notif.direct",
routing_key="email", # hanya email.queue yang menerima
body=json.dumps({"to": "[email protected]", "subject": "Order Confirmed"}).encode()
)
Fanout Exchange — Broadcast #
# Setup -- routing key diabaikan
channel.exchange_declare(exchange="order.broadcast", exchange_type="fanout", durable=True)
channel.queue_declare(queue="audit.queue", durable=True)
channel.queue_declare(queue="report.queue", durable=True)
channel.queue_declare(queue="analytics.queue", durable=True)
# Bind semua queue (tanpa routing key)
for q in ["audit.queue", "report.queue", "analytics.queue"]:
channel.queue_bind(exchange="order.broadcast", queue=q)
# Satu publish → tiga queue menerimanya sekaligus
channel.basic_publish(
exchange="order.broadcast",
routing_key="", # diabaikan untuk fanout
body=json.dumps({"event": "order.completed", "order_id": 1001}).encode()
)
Topic Exchange — Pattern Matching #
# Setup
channel.exchange_declare(exchange="logs", exchange_type="topic", durable=True)
channel.queue_declare(queue="error.queue", durable=True)
channel.queue_declare(queue="warning.queue", durable=True)
channel.queue_declare(queue="all.queue", durable=True)
# * = tepat satu kata, # = nol atau lebih kata
channel.queue_bind(exchange="logs", queue="error.queue", routing_key="*.error")
channel.queue_bind(exchange="logs", queue="warning.queue", routing_key="*.warning")
channel.queue_bind(exchange="logs", queue="all.queue", routing_key="#") # terima semua
# "payment.error" → error.queue + all.queue
# "auth.warning" → warning.queue + all.queue
# "order.info" → all.queue saja
channel.basic_publish(exchange="logs", routing_key="payment.error",
body=b"Payment gateway timeout")
channel.basic_publish(exchange="logs", routing_key="auth.warning",
body=b"Failed login attempt")
Reconnect Otomatis #
Koneksi RabbitMQ bisa putus karena network issue atau timeout. Penting untuk punya logika reconnect agar consumer tidak mati diam-diam.
import pika
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def jalankan_consumer_dengan_reconnect(
queue: str,
callback_fn,
maks_retry: int = 5,
jeda_retry: int = 5
) -> None:
retry = 0
while True:
try:
conn = get_connection()
channel = conn.channel()
setup_infrastruktur(channel)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=queue,
on_message_callback=callback_fn,
auto_ack=False
)
logger.info(f"Consumer aktif di queue '{queue}'")
retry = 0 # reset counter setelah berhasil connect
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
retry += 1
if retry > maks_retry:
logger.error(f"Gagal reconnect setelah {maks_retry} percobaan. Berhenti.")
raise
logger.warning(
f"Koneksi putus ({e}). "
f"Reconnect dalam {jeda_retry}s... (percobaan {retry}/{maks_retry})"
)
time.sleep(jeda_retry)
except Exception as e:
logger.error(f"Error tidak terduga: {e}")
raise
finally:
try:
if conn and not conn.is_closed:
conn.close()
except Exception:
pass
Penanganan Error dan Dead Letter Exchange #
Dead Letter Exchange (DLX) menampung pesan yang gagal diproses — baik karena di-nack, TTL habis, atau queue penuh. Ini mencegah pesan hilang begitu saja.
def jalankan_dlq_consumer() -> None:
"""Consumer untuk memantau dan memproses ulang pesan di DLQ."""
conn = get_connection()
channel = conn.channel()
def callback_dlq(ch, method, properties, body):
try:
payload = json.loads(body.decode("utf-8"))
headers = properties.headers or {}
kematian = headers.get("x-death", [{}])[0]
print(f"[DLQ] Pesan mati dari queue: {kematian.get('queue')}")
print(f"[DLQ] Alasan: {kematian.get('reason')}")
print(f"[DLQ] Jumlah kematian: {kematian.get('count')}")
print(f"[DLQ] Payload: {payload}")
# Strategi: log, alert, atau retry manual
# Setelah diproses, ACK agar tidak menumpuk di DLQ
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[DLQ] Error: {e}")
ch.basic_ack(delivery_tag=method.delivery_tag) # tetap ACK di DLQ
channel.basic_consume(
queue="order.dead.queue",
on_message_callback=callback_dlq,
auto_ack=False
)
print("DLQ consumer aktif...")
channel.start_consuming()
Ringkasan #
auto_ack=Falsewajib — selalu gunakan manual acknowledgment;auto_ack=Truemengakui pesan segera saat diterima, sebelum diproses, sehingga pesan hilang jika consumer crash.delivery_mode=Persistent— gunakan agar pesan disimpan ke disk dan tidak hilang jika RabbitMQ restart; pasangkan dengandurable=Truepada queue.basic_ack()hanya setelah sukses — panggil hanya setelah pemrosesan berhasil; gunakanbasic_nack(requeue=False)untuk menolak pesan yang gagal ke DLX.prefetch_count=1— atur QoS agar setiap consumer hanya menerima satu pesan sekaligus; mencegah satu consumer dibanjiri pesan sementara lainnya idle.- Dead Letter Exchange (DLX) — selalu konfigurasi DLX pada queue produksi agar pesan yang gagal tidak hilang begitu saja, melainkan masuk ke antrian monitoring.
- Exchange type sesuai kebutuhan —
directuntuk routing tepat,fanoutuntuk broadcast,topicuntuk pattern matching routing key.durable=Trueuntuk exchange dan queue — agar konfigurasi broker bertahan setelah restart.- Reconnect logic — implementasikan retry dengan backoff untuk consumer agar tidak mati diam-diam saat koneksi terputus.
x-message-ttl— gunakan TTL pada queue untuk membersihkan pesan lama yang tidak diproses secara otomatis.- Deklarasi idempotent — deklarasikan exchange, queue, dan binding di kedua sisi (producer dan consumer) dengan parameter identik; RabbitMQ tidak akan error jika sudah ada.