RabbitMQ #

RabbitMQ adalah message broker open-source yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol) — protokol standar untuk pertukaran pesan antar aplikasi yang menjamin pengiriman, routing, dan antrian pesan secara andal. Berbeda dari Kafka yang dirancang sebagai log terdistribusi, RabbitMQ adalah broker klasik dengan model push: pesan diantarkan ke consumer segera saat tersedia, dan dihapus dari queue setelah di-acknowledge. Keunggulan RabbitMQ ada pada fleksibilitas routing melalui Exchange dengan berbagai tipe, dukungan message TTL, Dead Letter Exchange, dan priority queue — membuatnya sangat cocok untuk task queue, job scheduling, dan komunikasi antar microservice yang butuh routing kompleks.

Konsep Dasar RabbitMQ #

Alur pesan di RabbitMQ:

Producer                Exchange               Queue(s)            Consumer
────────                ────────               ────────            ────────
App A  ──publish──►  [Exchange]  ──binding──►  [Queue A]  ──►  Worker 1
                         │                     [Queue B]  ──►  Worker 2
                         └──────────────────►  [Queue C]  ──►  Worker 3

Tipe Exchange:
  direct  -- routing berdasarkan exact match routing key
             (Producer → "order.created" → Queue yang terikat dengan key "order.created")

  fanout  -- broadcast ke SEMUA queue yang terikat, routing key diabaikan
             (Producer → Exchange → Queue A + Queue B + Queue C)

  topic   -- routing berdasarkan pattern matching dengan wildcard (* dan #)
             (* = satu kata, # = nol atau lebih kata)
             "order.*"  → cocok dengan "order.created", "order.paid"
             "order.#"  → cocok dengan "order.created", "order.paid.success"

  headers -- routing berdasarkan header pesan, bukan routing key

Instalasi #

pip install pika

Untuk menjalankan RabbitMQ secara lokal menggunakan Docker:

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=rahasia \
  rabbitmq:3-management
# Management UI tersedia di http://localhost:15672

Membuat Koneksi #

import pika
import os
from urllib.parse import quote

# ANTI-PATTERN: hardcode koneksi tanpa autentikasi
connection = pika.BlockingConnection(
    pika.ConnectionParameters("localhost")  # ✗ -- tidak ada user/pass, tidak fleksibel
)

# BENAR: gunakan environment variable dan URLParameters
def get_connection() -> pika.BlockingConnection:
    amqp_url = os.getenv(
        "RABBITMQ_URL",
        "amqp://admin:rahasia@localhost:5672/%2F"
        # Format: amqp://user:pass@host:port/vhost
        # %2F adalah URL-encoded "/" untuk default vhost
    )
    params = pika.URLParameters(amqp_url)
    params.heartbeat            = 60     # kirim heartbeat tiap 60 detik
    params.blocked_connection_timeout = 300

    return pika.BlockingConnection(params)

# Tes koneksi
conn    = get_connection()
channel = conn.channel()
print("Koneksi RabbitMQ berhasil.")
conn.close()

Setup Exchange, Queue, dan Binding #

Selalu deklarasikan exchange dan queue di kedua sisi (producer dan consumer) dengan parameter yang sama — RabbitMQ bersifat idempotent untuk deklarasi yang identik.

import pika
import json

def setup_infrastruktur(channel: pika.channel.Channel) -> None:
    """
    Deklarasikan exchange, queue, dan binding.
    Aman dipanggil berulang kali — idempotent.
    """

    # Exchange utama untuk order events
    channel.exchange_declare(
        exchange="order.events",
        exchange_type="topic",
        durable=True       # bertahan setelah RabbitMQ restart
    )

    # Dead Letter Exchange -- untuk pesan yang gagal diproses
    channel.exchange_declare(
        exchange="order.dlx",
        exchange_type="direct",
        durable=True
    )

    # Queue untuk service pembayaran
    channel.queue_declare(
        queue="payment.queue",
        durable=True,      # queue bertahan setelah restart
        arguments={
            "x-dead-letter-exchange":    "order.dlx",        # kirim ke DLX jika gagal
            "x-dead-letter-routing-key": "payment.dead",
            "x-message-ttl":             86400000,           # TTL 24 jam (ms)
            "x-max-length":              10000               # maks 10.000 pesan
        }
    )

    # Queue untuk service notifikasi
    channel.queue_declare(
        queue="notification.queue",
        durable=True,
        arguments={
            "x-dead-letter-exchange":    "order.dlx",
            "x-dead-letter-routing-key": "notification.dead",
        }
    )

    # Dead Letter Queue -- tampung pesan yang gagal
    channel.queue_declare(queue="order.dead.queue", durable=True)

    # Binding: exchange → queue dengan routing key
    channel.queue_bind(
        exchange="order.events",
        queue="payment.queue",
        routing_key="order.created"     # hanya terima event "order.created"
    )
    channel.queue_bind(
        exchange="order.events",
        queue="payment.queue",
        routing_key="order.updated"
    )
    channel.queue_bind(
        exchange="order.events",
        queue="notification.queue",
        routing_key="order.*"           # terima semua "order.{apapun}"
    )

    # DLX binding
    channel.queue_bind(
        exchange="order.dlx",
        queue="order.dead.queue",
        routing_key="payment.dead"
    )
    channel.queue_bind(
        exchange="order.dlx",
        queue="order.dead.queue",
        routing_key="notification.dead"
    )

    print("Exchange, queue, dan binding berhasil dideklarasikan.")

Producer — Mengirim Pesan #

import pika
import json
from datetime import datetime, timezone

def buat_producer() -> tuple[pika.BlockingConnection, pika.channel.Channel]:
    conn    = get_connection()
    channel = conn.channel()
    setup_infrastruktur(channel)
    return conn, channel

def kirim_event(
    channel:     pika.channel.Channel,
    exchange:    str,
    routing_key: str,
    payload:     dict
) -> None:
    body = json.dumps(payload, ensure_ascii=False).encode("utf-8")

    channel.basic_publish(
        exchange=exchange,
        routing_key=routing_key,
        body=body,
        properties=pika.BasicProperties(
            content_type  = "application/json",
            delivery_mode = pika.DeliveryMode.Persistent,  # ✓ pesan tersimpan ke disk
            message_id    = payload.get("event_id"),
            timestamp     = int(datetime.now(timezone.utc).timestamp()),
            headers       = {
                "source":  "order-service",
                "version": "1.0"
            }
        )
    )

# Penggunaan
conn, channel = buat_producer()

try:
    # Kirim event order.created
    order_event = {
        "event_id":  "evt-001",
        "event":     "order.created",
        "order_id":  1001,
        "user_id":   42,
        "total":     18500000,
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    kirim_event(channel, "order.events", "order.created", order_event)
    print(f"Event terkirim: {order_event['event']} (order #{order_event['order_id']})")

    # Kirim event order.updated
    update_event = {
        "event_id": "evt-002",
        "event":    "order.updated",
        "order_id": 1001,
        "status":   "paid",
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    kirim_event(channel, "order.events", "order.updated", update_event)

finally:
    conn.close()
Selalu gunakan delivery_mode=Persistent untuk pesan penting. Tanpanya (delivery_mode=Transient), pesan hanya disimpan di memori dan akan hilang jika RabbitMQ restart atau crash. Persistent message dikombinasikan dengan durable=True pada queue adalah pasangan yang menjamin durabilitas pesan.

Consumer — Menerima Pesan #

import pika
import json
import signal

def proses_pembayaran(order: dict) -> None:
    """Logika bisnis pemrosesan pembayaran."""
    print(f"Memproses pembayaran order #{order['order_id']} — Rp{order['total']:,.0f}")
    # Jika exception dilempar di sini, pesan akan di-nack dan masuk DLX

def jalankan_payment_consumer() -> None:
    conn    = get_connection()
    channel = conn.channel()
    setup_infrastruktur(channel)

    # QoS: proses hanya 1 pesan per consumer pada satu waktu
    # Pastikan beban terdistribusi merata antar worker
    channel.basic_qos(prefetch_count=1)

    def callback(ch, method, properties, body):
        try:
            payload = json.loads(body.decode("utf-8"))
            print(f"Menerima: {properties.message_id} — routing: {method.routing_key}")

            proses_pembayaran(payload)

            # ACK -- beritahu RabbitMQ pesan sudah diproses, hapus dari queue
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print(f"✓ Pesan di-ACK: {properties.message_id}")

        except json.JSONDecodeError as e:
            print(f"✗ Pesan tidak valid (JSON error): {e}")
            # Reject tanpa requeue -- langsung ke DLX
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

        except Exception as e:
            print(f"✗ Error memproses pesan: {e}")
            # ANTI-PATTERN: basic_ack saat error
            # ch.basic_ack(delivery_tag=method.delivery_tag)  # ✗ -- pesan hilang!

            # BENAR: nack dengan requeue=False → masuk Dead Letter Exchange
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # ✓

    channel.basic_consume(
        queue="payment.queue",
        on_message_callback=callback,
        auto_ack=False   # WAJIB False -- commit manual setelah proses selesai
    )

    # Graceful shutdown
    def handle_shutdown(signum, frame):
        print("Shutdown signal diterima, menghentikan consumer...")
        channel.stop_consuming()

    signal.signal(signal.SIGINT,  handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    print("Payment consumer aktif, menunggu pesan...")
    try:
        channel.start_consuming()
    finally:
        conn.close()
        print("Consumer ditutup dengan bersih.")

jalankan_payment_consumer()

Exchange Types dalam Praktek #

Direct Exchange — Routing Tepat #

# Setup
channel.exchange_declare(exchange="notif.direct", exchange_type="direct", durable=True)
channel.queue_declare(queue="email.queue",   durable=True)
channel.queue_declare(queue="sms.queue",     durable=True)
channel.queue_declare(queue="push.queue",    durable=True)

channel.queue_bind(exchange="notif.direct", queue="email.queue", routing_key="email")
channel.queue_bind(exchange="notif.direct", queue="sms.queue",   routing_key="sms")
channel.queue_bind(exchange="notif.direct", queue="push.queue",  routing_key="push")

# Kirim hanya ke email queue
channel.basic_publish(
    exchange="notif.direct",
    routing_key="email",   # hanya email.queue yang menerima
    body=json.dumps({"to": "[email protected]", "subject": "Order Confirmed"}).encode()
)

Fanout Exchange — Broadcast #

# Setup -- routing key diabaikan
channel.exchange_declare(exchange="order.broadcast", exchange_type="fanout", durable=True)
channel.queue_declare(queue="audit.queue",   durable=True)
channel.queue_declare(queue="report.queue",  durable=True)
channel.queue_declare(queue="analytics.queue", durable=True)

# Bind semua queue (tanpa routing key)
for q in ["audit.queue", "report.queue", "analytics.queue"]:
    channel.queue_bind(exchange="order.broadcast", queue=q)

# Satu publish → tiga queue menerimanya sekaligus
channel.basic_publish(
    exchange="order.broadcast",
    routing_key="",   # diabaikan untuk fanout
    body=json.dumps({"event": "order.completed", "order_id": 1001}).encode()
)

Topic Exchange — Pattern Matching #

# Setup
channel.exchange_declare(exchange="logs", exchange_type="topic", durable=True)
channel.queue_declare(queue="error.queue",   durable=True)
channel.queue_declare(queue="warning.queue", durable=True)
channel.queue_declare(queue="all.queue",     durable=True)

# * = tepat satu kata, # = nol atau lebih kata
channel.queue_bind(exchange="logs", queue="error.queue",   routing_key="*.error")
channel.queue_bind(exchange="logs", queue="warning.queue", routing_key="*.warning")
channel.queue_bind(exchange="logs", queue="all.queue",     routing_key="#")  # terima semua

# "payment.error"   → error.queue + all.queue
# "auth.warning"    → warning.queue + all.queue
# "order.info"      → all.queue saja
channel.basic_publish(exchange="logs", routing_key="payment.error",
                      body=b"Payment gateway timeout")
channel.basic_publish(exchange="logs", routing_key="auth.warning",
                      body=b"Failed login attempt")

Reconnect Otomatis #

Koneksi RabbitMQ bisa putus karena network issue atau timeout. Penting untuk punya logika reconnect agar consumer tidak mati diam-diam.

import pika
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def jalankan_consumer_dengan_reconnect(
    queue:       str,
    callback_fn,
    maks_retry:  int = 5,
    jeda_retry:  int = 5
) -> None:
    retry = 0

    while True:
        try:
            conn    = get_connection()
            channel = conn.channel()
            setup_infrastruktur(channel)
            channel.basic_qos(prefetch_count=1)

            channel.basic_consume(
                queue=queue,
                on_message_callback=callback_fn,
                auto_ack=False
            )

            logger.info(f"Consumer aktif di queue '{queue}'")
            retry = 0   # reset counter setelah berhasil connect
            channel.start_consuming()

        except pika.exceptions.AMQPConnectionError as e:
            retry += 1
            if retry > maks_retry:
                logger.error(f"Gagal reconnect setelah {maks_retry} percobaan. Berhenti.")
                raise

            logger.warning(
                f"Koneksi putus ({e}). "
                f"Reconnect dalam {jeda_retry}s... (percobaan {retry}/{maks_retry})"
            )
            time.sleep(jeda_retry)

        except Exception as e:
            logger.error(f"Error tidak terduga: {e}")
            raise

        finally:
            try:
                if conn and not conn.is_closed:
                    conn.close()
            except Exception:
                pass

Penanganan Error dan Dead Letter Exchange #

Dead Letter Exchange (DLX) menampung pesan yang gagal diproses — baik karena di-nack, TTL habis, atau queue penuh. Ini mencegah pesan hilang begitu saja.

def jalankan_dlq_consumer() -> None:
    """Consumer untuk memantau dan memproses ulang pesan di DLQ."""
    conn    = get_connection()
    channel = conn.channel()

    def callback_dlq(ch, method, properties, body):
        try:
            payload  = json.loads(body.decode("utf-8"))
            headers  = properties.headers or {}
            kematian = headers.get("x-death", [{}])[0]

            print(f"[DLQ] Pesan mati dari queue: {kematian.get('queue')}")
            print(f"[DLQ] Alasan: {kematian.get('reason')}")
            print(f"[DLQ] Jumlah kematian: {kematian.get('count')}")
            print(f"[DLQ] Payload: {payload}")

            # Strategi: log, alert, atau retry manual
            # Setelah diproses, ACK agar tidak menumpuk di DLQ
            ch.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            print(f"[DLQ] Error: {e}")
            ch.basic_ack(delivery_tag=method.delivery_tag)  # tetap ACK di DLQ

    channel.basic_consume(
        queue="order.dead.queue",
        on_message_callback=callback_dlq,
        auto_ack=False
    )

    print("DLQ consumer aktif...")
    channel.start_consuming()

Ringkasan #

  • auto_ack=False wajib — selalu gunakan manual acknowledgment; auto_ack=True mengakui pesan segera saat diterima, sebelum diproses, sehingga pesan hilang jika consumer crash.
  • delivery_mode=Persistent — gunakan agar pesan disimpan ke disk dan tidak hilang jika RabbitMQ restart; pasangkan dengan durable=True pada queue.
  • basic_ack() hanya setelah sukses — panggil hanya setelah pemrosesan berhasil; gunakan basic_nack(requeue=False) untuk menolak pesan yang gagal ke DLX.
  • prefetch_count=1 — atur QoS agar setiap consumer hanya menerima satu pesan sekaligus; mencegah satu consumer dibanjiri pesan sementara lainnya idle.
  • Dead Letter Exchange (DLX) — selalu konfigurasi DLX pada queue produksi agar pesan yang gagal tidak hilang begitu saja, melainkan masuk ke antrian monitoring.
  • Exchange type sesuai kebutuhandirect untuk routing tepat, fanout untuk broadcast, topic untuk pattern matching routing key.
  • durable=True untuk exchange dan queue — agar konfigurasi broker bertahan setelah restart.
  • Reconnect logic — implementasikan retry dengan backoff untuk consumer agar tidak mati diam-diam saat koneksi terputus.
  • x-message-ttl — gunakan TTL pada queue untuk membersihkan pesan lama yang tidak diproses secara otomatis.
  • Deklarasi idempotent — deklarasikan exchange, queue, dan binding di kedua sisi (producer dan consumer) dengan parameter identik; RabbitMQ tidak akan error jika sudah ada.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact