Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging terdistribusi dan terkelola sepenuhnya dari Google Cloud — dirancang untuk menghubungkan service yang menghasilkan event dengan service yang mengonsumsinya, dalam skala global dan latensi rendah. Model Pub/Sub (Publish-Subscribe) berbeda dari antrian tradisional: satu pesan yang dipublish ke Topic bisa diterima oleh banyak Subscription yang berbeda secara independen — masing-masing mendapat salinannya sendiri. Ini membuatnya ideal untuk event distribution, data pipeline, dan integrasi antar microservice di ekosistem Google Cloud. Memahami perbedaan Pull vs Push subscription, cara kerja acknowledgment deadline, dan konfigurasi Dead Letter Topic adalah kunci untuk implementasi yang andal.

Konsep Dasar Pub/Sub #

Arsitektur Google Pub/Sub:

Publisher              Topic               Subscription(s)          Subscriber
─────────              ─────               ───────────────          ──────────
App A  ──publish──►  [orders]  ──────►  [payment-sub]   ──pull──►  Payment Service
App B  ──publish──►     │      ──────►  [notif-sub]     ──pull──►  Notification Service
                        │      ──────►  [audit-sub]    ──push──►   Cloud Function/Run
                        │
                        └── Setiap subscription mendapat salinan SEMUA pesan

Konsep penting:
  Topic        -- saluran pesan (publisher menulis ke sini)
  Subscription -- langganan ke topic; setiap sub punya antrian pesan sendiri
  Pull         -- subscriber aktif meminta pesan (lebih umum)
  Push         -- Pub/Sub mendorong pesan ke endpoint HTTPS subscriber
  Ack          -- subscriber konfirmasi pesan sudah diproses → dihapus dari sub
  Nack         -- subscriber tolak pesan → kirim ulang segera
  Ack Deadline -- waktu (detik) subscriber harus ack sebelum pesan dikirim ulang

Instalasi #

pip install google-cloud-pubsub

Autentikasi #

Google Cloud mendukung beberapa cara autentikasi. Application Default Credentials (ADC) adalah pendekatan yang direkomendasikan karena bekerja secara otomatis di berbagai environment.

import os
from google.cloud import pubsub_v1

# ANTI-PATTERN: hardcode path service account di kode
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/sa-key.json"  # ✗

# BENAR cara 1: ADC via environment variable
# export GOOGLE_APPLICATION_CREDENTIALS="/path/to/sa-key.json"
# Library akan otomatis membaca dari sini

# BENAR cara 2: ADC via gcloud CLI (untuk development lokal)
# gcloud auth application-default login

# BENAR cara 3: Service Account eksplisit (jika perlu isolasi credential)
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

publisher  = pubsub_v1.PublisherClient(credentials=credentials)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
Di lingkungan GCP (GKE, Cloud Run, Cloud Functions, Compute Engine), gunakan Workload Identity atau Service Account yang dilampirkan ke compute resource — tidak perlu file JSON sama sekali. ADC otomatis mengambil credentials dari metadata server GCP. Ini cara paling aman karena credentials di-rotate otomatis dan tidak ada secret yang disimpan di kode atau environment.

Membuat Topic dan Subscription #

import os
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists

PROJECT_ID = os.getenv("GCP_PROJECT_ID", "my-project")
publisher  = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

def buat_topic(topic_id: str) -> str:
    topic_path = publisher.topic_path(PROJECT_ID, topic_id)
    try:
        topic = publisher.create_topic(request={"name": topic_path})
        print(f"Topic dibuat: {topic.name}")
    except AlreadyExists:
        print(f"Topic sudah ada: {topic_path}")
    return topic_path

def buat_pull_subscription(
    topic_id:        str,
    subscription_id: str,
    ack_deadline:    int = 60,
    dlt_topic_id:    str = None
) -> str:
    topic_path = publisher.topic_path(PROJECT_ID, topic_id)
    sub_path   = subscriber.subscription_path(PROJECT_ID, subscription_id)

    request = {
        "name":                 sub_path,
        "topic":                topic_path,
        "ack_deadline_seconds": ack_deadline,
        "retry_policy": {
            "minimum_backoff": {"seconds": 10},
            "maximum_backoff": {"seconds": 600}
        }
    }

    if dlt_topic_id:
        dlt_path = publisher.topic_path(PROJECT_ID, dlt_topic_id)
        request["dead_letter_policy"] = {
            "dead_letter_topic":     dlt_path,
            "max_delivery_attempts": 5
        }

    try:
        sub = subscriber.create_subscription(request=request)
        print(f"Pull Subscription dibuat: {sub.name}")
    except AlreadyExists:
        print(f"Subscription sudah ada: {sub_path}")

    return sub_path

def buat_push_subscription(
    topic_id:        str,
    subscription_id: str,
    push_endpoint:   str
) -> str:
    topic_path = publisher.topic_path(PROJECT_ID, topic_id)
    sub_path   = subscriber.subscription_path(PROJECT_ID, subscription_id)

    try:
        subscriber.create_subscription(request={
            "name":  sub_path,
            "topic": topic_path,
            "push_config": {"push_endpoint": push_endpoint},
            "ack_deadline_seconds": 30
        })
        print(f"Push Subscription dibuat: {sub_path}")
    except AlreadyExists:
        print(f"Subscription sudah ada: {sub_path}")

    return sub_path

# Setup infrastruktur
buat_topic("order-events")
buat_topic("order-events-dlt")
buat_pull_subscription(
    "order-events", "payment-service-sub",
    ack_deadline=60, dlt_topic_id="order-events-dlt"
)
buat_pull_subscription("order-events", "notification-service-sub", ack_deadline=30)

Publisher — Mengirim Pesan #

import json
from datetime import datetime, timezone
from concurrent import futures

publisher  = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path(PROJECT_ID, "order-events")

def publish_pesan(payload: dict, attributes: dict = None) -> str:
    data = json.dumps(payload, ensure_ascii=False).encode("utf-8")

    future = publisher.publish(
        TOPIC_PATH,
        data,
        **(attributes or {}),
        event_type=payload.get("event", "unknown"),
        source="order-service",
        version="1.0"
    )

    msg_id = future.result()
    print(f"Pesan terkirim, ID: {msg_id}")
    return msg_id

# Contoh penggunaan
order_event = {
    "event":     "order.created",
    "order_id":  1001,
    "user_id":   42,
    "total":     18500000,
    "timestamp": datetime.now(timezone.utc).isoformat()
}
publish_pesan(order_event)

Batch Publishing #

def publish_batch(payloads: list[dict]) -> int:
    """Publish banyak pesan secara asinkron dengan batching otomatis."""
    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024 * 1024,
        max_latency=0.1,
        max_messages=100
    )
    pub   = pubsub_v1.PublisherClient(batch_settings=batch_settings)
    topic = pub.topic_path(PROJECT_ID, "order-events")

    publish_futures = []
    for payload in payloads:
        data   = json.dumps(payload, ensure_ascii=False).encode("utf-8")
        future = pub.publish(topic, data, event_type=payload.get("event", "unknown"))
        publish_futures.append(future)

    sukses = 0
    for future in publish_futures:
        try:
            future.result()
            sukses += 1
        except Exception as e:
            print(f"Gagal publish: {e}")

    print(f"{sukses}/{len(payloads)} pesan berhasil dipublish.")
    return sukses

events = [{"event": "order.created", "order_id": i, "total": i * 50000} for i in range(1, 51)]
publish_batch(events)

Ordered Messages #

def publish_dengan_ordering(payload: dict, ordering_key: str) -> str:
    """Pesan dengan ordering_key yang sama dijamin berurutan."""
    pub = pubsub_v1.PublisherClient(
        publisher_options=pubsub_v1.types.PublisherOptions(
            enable_message_ordering=True
        )
    )
    topic = pub.topic_path(PROJECT_ID, "order-events-ordered")
    data  = json.dumps(payload, ensure_ascii=False).encode("utf-8")
    return pub.publish(topic, data, ordering_key=ordering_key).result()

for status in ["created", "paid", "shipped", "delivered"]:
    publish_dengan_ordering(
        {"event": f"order.{status}", "order_id": 1001},
        ordering_key="user-42"
    )

Subscriber Pull — Menerima Pesan #

Streaming Pull (Asinkron) #

import signal

def proses_order(payload: dict) -> None:
    print(f"Memproses order #{payload['order_id']} — Rp{payload['total']:,.0f}")

def jalankan_streaming_subscriber(subscription_id: str) -> None:
    sub      = pubsub_v1.SubscriberClient()
    sub_path = sub.subscription_path(PROJECT_ID, subscription_id)

    flow_control = pubsub_v1.types.FlowControl(
        max_messages=10,
        max_bytes=10 * 1024 * 1024
    )

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        msg_id = message.message_id
        try:
            payload = json.loads(message.data.decode("utf-8"))
            print(f"Menerima [{msg_id}]: {message.attributes.get('event_type')}")

            proses_order(payload)
            message.ack()
            print(f"✓ ACK: {msg_id}")

        except json.JSONDecodeError:
            print(f"✗ Format pesan tidak valid: {msg_id}")
            message.ack()   # ack pesan rusak agar tidak loop terus

        except Exception as e:
            print(f"✗ Error memproses {msg_id}: {e}")
            # ANTI-PATTERN: message.ack() saat error
            # message.ack()  # ✗ -- pesan hilang!

            # BENAR: nack agar Pub/Sub kirim ulang segera
            message.nack()  # ✓
            print(f"✗ NACK: {msg_id}")

    streaming_pull = sub.subscribe(sub_path, callback=callback, flow_control=flow_control)

    berjalan = True
    def handle_shutdown(signum, frame):
        nonlocal berjalan
        berjalan = False
        streaming_pull.cancel()

    signal.signal(signal.SIGINT,  handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    print(f"Subscriber aktif di {sub_path}")
    try:
        streaming_pull.result()
    except Exception as e:
        if berjalan:
            print(f"Subscriber error: {e}")
    finally:
        sub.close()

jalankan_streaming_subscriber("payment-service-sub")

Synchronous Pull — untuk Batch Processing #

def pull_sinkron(subscription_id: str, maks_pesan: int = 10) -> list[dict]:
    sub      = pubsub_v1.SubscriberClient()
    sub_path = sub.subscription_path(PROJECT_ID, subscription_id)

    response = sub.pull(
        request={"subscription": sub_path, "max_messages": maks_pesan},
        timeout=30
    )

    if not response.received_messages:
        return []

    ack_ids = []
    hasil   = []

    for received in response.received_messages:
        msg = received.message
        try:
            payload = json.loads(msg.data.decode("utf-8"))
            hasil.append(payload)
            ack_ids.append(received.ack_id)
        except Exception as e:
            print(f"Error parse {msg.message_id}: {e}")

    if ack_ids:
        sub.acknowledge(request={"subscription": sub_path, "ack_ids": ack_ids})
        print(f"✓ Batch ACK: {len(ack_ids)} pesan")

    sub.close()
    return hasil

Push Subscription — Endpoint HTTPS #

Push subscription adalah mode di mana Pub/Sub mendorong pesan ke endpoint HTTPS kamu secara aktif — cocok untuk Cloud Run, Cloud Functions, atau aplikasi web.

import base64
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/pubsub/push", methods=["POST"])
def pubsub_push_handler():
    envelope = request.get_json()

    if not envelope or "message" not in envelope:
        return "Bad Request", 400

    msg      = envelope["message"]
    data_raw = base64.b64decode(msg.get("data", "")).decode("utf-8")

    try:
        payload = json.loads(data_raw)
        proses_order(payload)
        return jsonify({"status": "ok"}), 200   # 2xx = ACK

    except Exception as e:
        print(f"Error: {e}")
        return jsonify({"error": str(e)}), 500  # 5xx = NACK, Pub/Sub akan retry

Extend Ack Deadline #

import threading

def proses_dengan_extend_ack(message: pubsub_v1.subscriber.message.Message) -> None:
    """Extend ack deadline secara periodik untuk pemrosesan berumur panjang."""
    berhenti = threading.Event()

    def extend_loop():
        while not berhenti.wait(timeout=30):
            message.modify_ack_deadline(60)
            print(f"Ack deadline di-extend: {message.message_id}")

    thread = threading.Thread(target=extend_loop, daemon=True)
    thread.start()

    try:
        payload = json.loads(message.data.decode("utf-8"))
        import time
        for langkah in range(5):
            time.sleep(25)
            print(f"Langkah {langkah + 1}/5 selesai")
        message.ack()
    except Exception as e:
        message.nack()
    finally:
        berhenti.set()

Monitoring Dead Letter Topic #

def pantau_dead_letter_topic(dlt_subscription_id: str) -> None:
    sub      = pubsub_v1.SubscriberClient()
    sub_path = sub.subscription_path(PROJECT_ID, dlt_subscription_id)

    response = sub.pull(
        request={"subscription": sub_path, "max_messages": 20},
        timeout=10
    )

    if not response.received_messages:
        print("Dead Letter Topic kosong.")
        sub.close()
        return

    print(f"⚠ {len(response.received_messages)} pesan di Dead Letter Topic:")
    ack_ids = []

    for received in response.received_messages:
        msg   = received.message
        attrs = msg.attributes
        try:
            payload = json.loads(msg.data.decode("utf-8"))
        except Exception:
            payload = msg.data.decode("utf-8")

        print(f"  ID: {msg.message_id}")
        print(f"  Sudah dicoba: {attrs.get('CloudPubSubDeadLetterSourceDeliveryCount', '?')}x")
        print(f"  Dari sub: {attrs.get('CloudPubSubDeadLetterSourceSubscription', '?')}")
        print(f"  Payload: {payload}\n")
        ack_ids.append(received.ack_id)

    if ack_ids:
        sub.acknowledge(request={"subscription": sub_path, "ack_ids": ack_ids})

    sub.close()

Ringkasan #

  • Topic dan Subscription terpisah — setiap subscription mendapat salinan semua pesan secara independen; satu topic bisa punya banyak subscription untuk service yang berbeda.
  • message.ack() setelah sukses, message.nack() setelah gagalack() menghapus pesan dari subscription; nack() meminta Pub/Sub mengirim ulang segera tanpa menunggu ack deadline habis.
  • Flow control wajib — atur max_messages dan max_bytes di streaming subscriber agar tidak dibanjiri pesan melebihi kapasitas pemrosesan.
  • Dead Letter Topic (DLT) — konfigurasi dead_letter_policy di subscription agar pesan yang gagal N kali otomatis dipindahkan ke DLT untuk monitoring dan debugging.
  • Push vs Pull — gunakan Pull untuk kontrol penuh atas timing; gunakan Push jika sudah punya HTTP endpoint dan ingin Pub/Sub mendorong pesan secara aktif.
  • Ordering key — gunakan untuk menjamin urutan pesan per entitas (misal per user ID); hanya berlaku jika topic diaktifkan message ordering.
  • Batch publisher settings — konfigurasi max_messages, max_latency, dan max_bytes untuk mengoptimalkan throughput vs latensi publish.
  • ADC dan Workload Identity — di GCP, gunakan Workload Identity atau Service Account yang dilampirkan ke compute resource; tidak perlu menyimpan file JSON credentials.
  • message.modify_ack_deadline() — untuk pemrosesan lama, extend ack deadline secara periodik agar pesan tidak dikirim ulang sebelum selesai.
  • Retry policy di subscription — konfigurasi minimum_backoff dan maximum_backoff untuk mengontrol interval pengiriman ulang pesan yang gagal.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact