Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging terdistribusi dan terkelola sepenuhnya dari Google Cloud — dirancang untuk menghubungkan service yang menghasilkan event dengan service yang mengonsumsinya, dalam skala global dan latensi rendah. Model Pub/Sub (Publish-Subscribe) berbeda dari antrian tradisional: satu pesan yang dipublish ke Topic bisa diterima oleh banyak Subscription yang berbeda secara independen — masing-masing mendapat salinannya sendiri. Ini membuatnya ideal untuk event distribution, data pipeline, dan integrasi antar microservice di ekosistem Google Cloud. Memahami perbedaan Pull vs Push subscription, cara kerja acknowledgment deadline, dan konfigurasi Dead Letter Topic adalah kunci untuk implementasi yang andal.
Konsep Dasar Pub/Sub #
Arsitektur Google Pub/Sub:
Publisher Topic Subscription(s) Subscriber
───────── ───── ─────────────── ──────────
App A ──publish──► [orders] ──────► [payment-sub] ──pull──► Payment Service
App B ──publish──► │ ──────► [notif-sub] ──pull──► Notification Service
│ ──────► [audit-sub] ──push──► Cloud Function/Run
│
└── Setiap subscription mendapat salinan SEMUA pesan
Konsep penting:
Topic -- saluran pesan (publisher menulis ke sini)
Subscription -- langganan ke topic; setiap sub punya antrian pesan sendiri
Pull -- subscriber aktif meminta pesan (lebih umum)
Push -- Pub/Sub mendorong pesan ke endpoint HTTPS subscriber
Ack -- subscriber konfirmasi pesan sudah diproses → dihapus dari sub
Nack -- subscriber tolak pesan → kirim ulang segera
Ack Deadline -- waktu (detik) subscriber harus ack sebelum pesan dikirim ulang
Instalasi #
pip install google-cloud-pubsub
Autentikasi #
Google Cloud mendukung beberapa cara autentikasi. Application Default Credentials (ADC) adalah pendekatan yang direkomendasikan karena bekerja secara otomatis di berbagai environment.
import os
from google.cloud import pubsub_v1
# ANTI-PATTERN: hardcode path service account di kode
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/sa-key.json" # ✗
# BENAR cara 1: ADC via environment variable
# export GOOGLE_APPLICATION_CREDENTIALS="/path/to/sa-key.json"
# Library akan otomatis membaca dari sini
# BENAR cara 2: ADC via gcloud CLI (untuk development lokal)
# gcloud auth application-default login
# BENAR cara 3: Service Account eksplisit (jika perlu isolasi credential)
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
Di lingkungan GCP (GKE, Cloud Run, Cloud Functions, Compute Engine), gunakan Workload Identity atau Service Account yang dilampirkan ke compute resource — tidak perlu file JSON sama sekali. ADC otomatis mengambil credentials dari metadata server GCP. Ini cara paling aman karena credentials di-rotate otomatis dan tidak ada secret yang disimpan di kode atau environment.
Membuat Topic dan Subscription #
import os
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "my-project")
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
def buat_topic(topic_id: str) -> str:
topic_path = publisher.topic_path(PROJECT_ID, topic_id)
try:
topic = publisher.create_topic(request={"name": topic_path})
print(f"Topic dibuat: {topic.name}")
except AlreadyExists:
print(f"Topic sudah ada: {topic_path}")
return topic_path
def buat_pull_subscription(
topic_id: str,
subscription_id: str,
ack_deadline: int = 60,
dlt_topic_id: str = None
) -> str:
topic_path = publisher.topic_path(PROJECT_ID, topic_id)
sub_path = subscriber.subscription_path(PROJECT_ID, subscription_id)
request = {
"name": sub_path,
"topic": topic_path,
"ack_deadline_seconds": ack_deadline,
"retry_policy": {
"minimum_backoff": {"seconds": 10},
"maximum_backoff": {"seconds": 600}
}
}
if dlt_topic_id:
dlt_path = publisher.topic_path(PROJECT_ID, dlt_topic_id)
request["dead_letter_policy"] = {
"dead_letter_topic": dlt_path,
"max_delivery_attempts": 5
}
try:
sub = subscriber.create_subscription(request=request)
print(f"Pull Subscription dibuat: {sub.name}")
except AlreadyExists:
print(f"Subscription sudah ada: {sub_path}")
return sub_path
def buat_push_subscription(
topic_id: str,
subscription_id: str,
push_endpoint: str
) -> str:
topic_path = publisher.topic_path(PROJECT_ID, topic_id)
sub_path = subscriber.subscription_path(PROJECT_ID, subscription_id)
try:
subscriber.create_subscription(request={
"name": sub_path,
"topic": topic_path,
"push_config": {"push_endpoint": push_endpoint},
"ack_deadline_seconds": 30
})
print(f"Push Subscription dibuat: {sub_path}")
except AlreadyExists:
print(f"Subscription sudah ada: {sub_path}")
return sub_path
# Setup infrastruktur
buat_topic("order-events")
buat_topic("order-events-dlt")
buat_pull_subscription(
"order-events", "payment-service-sub",
ack_deadline=60, dlt_topic_id="order-events-dlt"
)
buat_pull_subscription("order-events", "notification-service-sub", ack_deadline=30)
Publisher — Mengirim Pesan #
import json
from datetime import datetime, timezone
from concurrent import futures
publisher = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path(PROJECT_ID, "order-events")
def publish_pesan(payload: dict, attributes: dict = None) -> str:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
future = publisher.publish(
TOPIC_PATH,
data,
**(attributes or {}),
event_type=payload.get("event", "unknown"),
source="order-service",
version="1.0"
)
msg_id = future.result()
print(f"Pesan terkirim, ID: {msg_id}")
return msg_id
# Contoh penggunaan
order_event = {
"event": "order.created",
"order_id": 1001,
"user_id": 42,
"total": 18500000,
"timestamp": datetime.now(timezone.utc).isoformat()
}
publish_pesan(order_event)
Batch Publishing #
def publish_batch(payloads: list[dict]) -> int:
"""Publish banyak pesan secara asinkron dengan batching otomatis."""
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024 * 1024,
max_latency=0.1,
max_messages=100
)
pub = pubsub_v1.PublisherClient(batch_settings=batch_settings)
topic = pub.topic_path(PROJECT_ID, "order-events")
publish_futures = []
for payload in payloads:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
future = pub.publish(topic, data, event_type=payload.get("event", "unknown"))
publish_futures.append(future)
sukses = 0
for future in publish_futures:
try:
future.result()
sukses += 1
except Exception as e:
print(f"Gagal publish: {e}")
print(f"{sukses}/{len(payloads)} pesan berhasil dipublish.")
return sukses
events = [{"event": "order.created", "order_id": i, "total": i * 50000} for i in range(1, 51)]
publish_batch(events)
Ordered Messages #
def publish_dengan_ordering(payload: dict, ordering_key: str) -> str:
"""Pesan dengan ordering_key yang sama dijamin berurutan."""
pub = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(
enable_message_ordering=True
)
)
topic = pub.topic_path(PROJECT_ID, "order-events-ordered")
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
return pub.publish(topic, data, ordering_key=ordering_key).result()
for status in ["created", "paid", "shipped", "delivered"]:
publish_dengan_ordering(
{"event": f"order.{status}", "order_id": 1001},
ordering_key="user-42"
)
Subscriber Pull — Menerima Pesan #
Streaming Pull (Asinkron) #
import signal
def proses_order(payload: dict) -> None:
print(f"Memproses order #{payload['order_id']} — Rp{payload['total']:,.0f}")
def jalankan_streaming_subscriber(subscription_id: str) -> None:
sub = pubsub_v1.SubscriberClient()
sub_path = sub.subscription_path(PROJECT_ID, subscription_id)
flow_control = pubsub_v1.types.FlowControl(
max_messages=10,
max_bytes=10 * 1024 * 1024
)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
msg_id = message.message_id
try:
payload = json.loads(message.data.decode("utf-8"))
print(f"Menerima [{msg_id}]: {message.attributes.get('event_type')}")
proses_order(payload)
message.ack()
print(f"✓ ACK: {msg_id}")
except json.JSONDecodeError:
print(f"✗ Format pesan tidak valid: {msg_id}")
message.ack() # ack pesan rusak agar tidak loop terus
except Exception as e:
print(f"✗ Error memproses {msg_id}: {e}")
# ANTI-PATTERN: message.ack() saat error
# message.ack() # ✗ -- pesan hilang!
# BENAR: nack agar Pub/Sub kirim ulang segera
message.nack() # ✓
print(f"✗ NACK: {msg_id}")
streaming_pull = sub.subscribe(sub_path, callback=callback, flow_control=flow_control)
berjalan = True
def handle_shutdown(signum, frame):
nonlocal berjalan
berjalan = False
streaming_pull.cancel()
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
print(f"Subscriber aktif di {sub_path}")
try:
streaming_pull.result()
except Exception as e:
if berjalan:
print(f"Subscriber error: {e}")
finally:
sub.close()
jalankan_streaming_subscriber("payment-service-sub")
Synchronous Pull — untuk Batch Processing #
def pull_sinkron(subscription_id: str, maks_pesan: int = 10) -> list[dict]:
sub = pubsub_v1.SubscriberClient()
sub_path = sub.subscription_path(PROJECT_ID, subscription_id)
response = sub.pull(
request={"subscription": sub_path, "max_messages": maks_pesan},
timeout=30
)
if not response.received_messages:
return []
ack_ids = []
hasil = []
for received in response.received_messages:
msg = received.message
try:
payload = json.loads(msg.data.decode("utf-8"))
hasil.append(payload)
ack_ids.append(received.ack_id)
except Exception as e:
print(f"Error parse {msg.message_id}: {e}")
if ack_ids:
sub.acknowledge(request={"subscription": sub_path, "ack_ids": ack_ids})
print(f"✓ Batch ACK: {len(ack_ids)} pesan")
sub.close()
return hasil
Push Subscription — Endpoint HTTPS #
Push subscription adalah mode di mana Pub/Sub mendorong pesan ke endpoint HTTPS kamu secara aktif — cocok untuk Cloud Run, Cloud Functions, atau aplikasi web.
import base64
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/pubsub/push", methods=["POST"])
def pubsub_push_handler():
envelope = request.get_json()
if not envelope or "message" not in envelope:
return "Bad Request", 400
msg = envelope["message"]
data_raw = base64.b64decode(msg.get("data", "")).decode("utf-8")
try:
payload = json.loads(data_raw)
proses_order(payload)
return jsonify({"status": "ok"}), 200 # 2xx = ACK
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500 # 5xx = NACK, Pub/Sub akan retry
Extend Ack Deadline #
import threading
def proses_dengan_extend_ack(message: pubsub_v1.subscriber.message.Message) -> None:
"""Extend ack deadline secara periodik untuk pemrosesan berumur panjang."""
berhenti = threading.Event()
def extend_loop():
while not berhenti.wait(timeout=30):
message.modify_ack_deadline(60)
print(f"Ack deadline di-extend: {message.message_id}")
thread = threading.Thread(target=extend_loop, daemon=True)
thread.start()
try:
payload = json.loads(message.data.decode("utf-8"))
import time
for langkah in range(5):
time.sleep(25)
print(f"Langkah {langkah + 1}/5 selesai")
message.ack()
except Exception as e:
message.nack()
finally:
berhenti.set()
Monitoring Dead Letter Topic #
def pantau_dead_letter_topic(dlt_subscription_id: str) -> None:
sub = pubsub_v1.SubscriberClient()
sub_path = sub.subscription_path(PROJECT_ID, dlt_subscription_id)
response = sub.pull(
request={"subscription": sub_path, "max_messages": 20},
timeout=10
)
if not response.received_messages:
print("Dead Letter Topic kosong.")
sub.close()
return
print(f"⚠ {len(response.received_messages)} pesan di Dead Letter Topic:")
ack_ids = []
for received in response.received_messages:
msg = received.message
attrs = msg.attributes
try:
payload = json.loads(msg.data.decode("utf-8"))
except Exception:
payload = msg.data.decode("utf-8")
print(f" ID: {msg.message_id}")
print(f" Sudah dicoba: {attrs.get('CloudPubSubDeadLetterSourceDeliveryCount', '?')}x")
print(f" Dari sub: {attrs.get('CloudPubSubDeadLetterSourceSubscription', '?')}")
print(f" Payload: {payload}\n")
ack_ids.append(received.ack_id)
if ack_ids:
sub.acknowledge(request={"subscription": sub_path, "ack_ids": ack_ids})
sub.close()
Ringkasan #
- Topic dan Subscription terpisah — setiap subscription mendapat salinan semua pesan secara independen; satu topic bisa punya banyak subscription untuk service yang berbeda.
message.ack()setelah sukses,message.nack()setelah gagal —ack()menghapus pesan dari subscription;nack()meminta Pub/Sub mengirim ulang segera tanpa menunggu ack deadline habis.- Flow control wajib — atur
max_messagesdanmax_bytesdi streaming subscriber agar tidak dibanjiri pesan melebihi kapasitas pemrosesan.- Dead Letter Topic (DLT) — konfigurasi
dead_letter_policydi subscription agar pesan yang gagal N kali otomatis dipindahkan ke DLT untuk monitoring dan debugging.- Push vs Pull — gunakan Pull untuk kontrol penuh atas timing; gunakan Push jika sudah punya HTTP endpoint dan ingin Pub/Sub mendorong pesan secara aktif.
- Ordering key — gunakan untuk menjamin urutan pesan per entitas (misal per user ID); hanya berlaku jika topic diaktifkan message ordering.
- Batch publisher settings — konfigurasi
max_messages,max_latency, danmax_bytesuntuk mengoptimalkan throughput vs latensi publish.- ADC dan Workload Identity — di GCP, gunakan Workload Identity atau Service Account yang dilampirkan ke compute resource; tidak perlu menyimpan file JSON credentials.
message.modify_ack_deadline()— untuk pemrosesan lama, extend ack deadline secara periodik agar pesan tidak dikirim ulang sebelum selesai.- Retry policy di subscription — konfigurasi
minimum_backoffdanmaximum_backoffuntuk mengontrol interval pengiriman ulang pesan yang gagal.