Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan antrian pesan terkelola sepenuhnya dari AWS — tanpa server untuk dikelola, tanpa kapasitas yang perlu diprovisioning, dan otomatis skalabel hingga miliaran pesan per hari. SQS sangat cocok jika kamu sudah berada di ekosistem AWS dan ingin integrasi asinkron antar Lambda, EC2, ECS, atau service lain tanpa mengelola infrastruktur broker sendiri. Memahami dua tipe queue (Standard dan FIFO), cara kerja Visibility Timeout, dan pola consumer loop yang benar adalah kunci untuk menghindari pesan yang diproses ganda atau hilang.
Konsep Dasar SQS #
Alur pesan di SQS:
Producer SQS Queue Consumer
──────── ───────── ────────
App A ──send_message──► [msg1][msg2][msg3] ──poll──► Worker 1
App B ──send_message──► (pesan tersimpan ──poll──► Worker 2
di AWS, terkelola)
Dua tipe queue:
Standard Queue
├── Throughput tidak terbatas
├── At-least-once delivery (pesan bisa dikirim lebih dari sekali)
├── Best-effort ordering (urutan tidak dijamin)
└── Cocok untuk: task queue, notifikasi, pemrosesan yang idempoten
FIFO Queue (nama harus berakhiran .fifo)
├── Throughput terbatas (3.000/detik dengan batching, 300/detik tanpa)
├── Exactly-once delivery (tidak ada duplikat)
├── Strict ordering per Message Group ID
└── Cocok untuk: transaksi keuangan, event ordering yang kritis
Visibility Timeout:
Saat pesan di-receive, pesan "disembunyikan" dari consumer lain
selama N detik. Jika tidak di-delete dalam waktu itu, pesan
muncul kembali di queue dan bisa diproses ulang.
[Queue]─receive──►[Worker memproses]──delete──►[Pesan hilang dari queue]
│
└─ jika crash atau timeout: pesan muncul kembali ✓
Instalasi dan Autentikasi #
pip install boto3
AWS membutuhkan kredensial untuk setiap API call. Ada beberapa cara autentikasi — urutan prioritasnya:
1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
2. AWS credentials file (~/.aws/credentials)
3. AWS IAM Role (untuk EC2, Lambda, ECS — cara terbaik di produksi)
4. Parameter eksplisit di kode (JANGAN lakukan ini)
import boto3
import os
# ANTI-PATTERN: hardcode kredensial di kode
sqs = boto3.client(
"sqs",
aws_access_key_id="AKIAXXXXXXXXXXXXXXXX", # ✗ -- jangan pernah ini
aws_secret_access_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
)
# BENAR cara 1: baca dari environment variable
sqs = boto3.client(
"sqs",
region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-1"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN") # jika pakai temporary credentials
)
# BENAR cara 2: boto3 otomatis baca dari ~/.aws/credentials atau IAM Role
sqs = boto3.client("sqs", region_name="ap-southeast-1")
# BENAR cara 3: gunakan resource API (lebih pythonic)
sqs_resource = boto3.resource("sqs", region_name="ap-southeast-1")
Di lingkungan produksi pada EC2, ECS, atau Lambda, gunakan IAM Role yang dilampirkan ke compute resource — tidak perlu menyimpan kredensial sama sekali. Boto3 otomatis mengambil credentials dari instance metadata service. Ini adalah cara paling aman karena credentials di-rotate otomatis.
Membuat Queue #
import boto3
import json
import os
sqs = boto3.client("sqs", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-1"))
def buat_standard_queue(nama: str, retention_detik: int = 86400) -> str:
"""Buat Standard Queue, kembalikan URL."""
response = sqs.create_queue(
QueueName=nama,
Attributes={
"VisibilityTimeout": "30", # detik pesan disembunyikan saat diproses
"MessageRetentionPeriod": str(retention_detik), # berapa lama pesan disimpan (maks 14 hari)
"ReceiveMessageWaitTimeSeconds": "20", # Long Polling -- tunggu hingga 20 detik
"RedrivePolicy": json.dumps({ # Dead Letter Queue setelah 3x gagal
"deadLetterTargetArn": buat_dlq(nama + "-dlq"),
"maxReceiveCount": "3"
})
}
)
print(f"Standard Queue dibuat: {response['QueueUrl']}")
return response["QueueUrl"]
def buat_fifo_queue(nama: str) -> str:
"""Buat FIFO Queue -- nama harus berakhiran .fifo"""
if not nama.endswith(".fifo"):
nama += ".fifo"
response = sqs.create_queue(
QueueName=nama,
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true", # dedup otomatis berdasarkan isi pesan
"VisibilityTimeout": "60",
"MessageRetentionPeriod": "86400",
"ReceiveMessageWaitTimeSeconds": "20",
}
)
print(f"FIFO Queue dibuat: {response['QueueUrl']}")
return response["QueueUrl"]
def buat_dlq(nama: str) -> str:
"""Buat Dead Letter Queue, kembalikan ARN."""
response = sqs.create_queue(
QueueName=nama,
Attributes={
"MessageRetentionPeriod": str(14 * 24 * 3600) # simpan 14 hari
}
)
url = response["QueueUrl"]
# Ambil ARN dari queue yang baru dibuat
attrs = sqs.get_queue_attributes(QueueUrl=url, AttributeNames=["QueueArn"])
return attrs["Attributes"]["QueueArn"]
def ambil_queue_url(nama: str) -> str:
"""Ambil URL queue yang sudah ada."""
response = sqs.get_queue_url(QueueName=nama)
return response["QueueUrl"]
Mengirim Pesan #
import json
from datetime import datetime, timezone
import uuid
QUEUE_URL = ambil_queue_url("order-queue")
def kirim_pesan(queue_url: str, payload: dict, delay_detik: int = 0) -> str:
"""Kirim satu pesan ke Standard Queue."""
body = json.dumps(payload, ensure_ascii=False)
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=body,
DelaySeconds=delay_detik, # tunda pengiriman (0–900 detik)
MessageAttributes={
"source": {
"DataType": "String",
"StringValue": "order-service"
},
"event_type": {
"DataType": "String",
"StringValue": payload.get("event", "unknown")
}
}
)
msg_id = response["MessageId"]
print(f"Pesan terkirim: {msg_id}")
return msg_id
def kirim_pesan_fifo(queue_url: str, payload: dict, group_id: str) -> str:
"""Kirim pesan ke FIFO Queue dengan Message Group ID."""
body = json.dumps(payload, ensure_ascii=False)
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=body,
MessageGroupId=group_id, # pesan dalam group yang sama dijamin urut
MessageDeduplicationId=str(uuid.uuid4()) # ID unik untuk cegah duplikat
)
return response["MessageId"]
# Contoh penggunaan
order = {
"event": "order.created",
"order_id": 1001,
"user_id": 42,
"total": 18500000,
"timestamp": datetime.now(timezone.utc).isoformat()
}
kirim_pesan(QUEUE_URL, order)
# FIFO -- pesan per user_id dijamin urut
kirim_pesan_fifo(
ambil_queue_url("order-fifo.fifo"),
order,
group_id=f"user-{order['user_id']}" # semua order user 42 diproses berurutan
)
Batch Send — Lebih Efisien #
def kirim_batch(queue_url: str, payloads: list[dict]) -> dict:
"""
Kirim hingga 10 pesan sekaligus.
Lebih efisien dari kirim satu per satu (biaya AWS dihitung per request).
"""
entries = [
{
"Id": str(i),
"MessageBody": json.dumps(p, ensure_ascii=False),
"MessageAttributes": {
"event_type": {
"DataType": "String",
"StringValue": p.get("event", "unknown")
}
}
}
for i, p in enumerate(payloads[:10]) # maks 10 per batch
]
response = sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)
sukses = len(response.get("Successful", []))
gagal = len(response.get("Failed", []))
if response.get("Failed"):
for f in response["Failed"]:
print(f"Gagal kirim ID {f['Id']}: {f['Message']}")
print(f"Batch terkirim: {sukses} sukses, {gagal} gagal")
return response
# Kirim 10 order sekaligus
orders = [{"event": "order.created", "order_id": i, "total": i * 10000} for i in range(1, 11)]
kirim_batch(QUEUE_URL, orders)
Consumer Loop #
SQS menggunakan model pull — consumer harus aktif meminta pesan. Gunakan Long Polling (WaitTimeSeconds=20) untuk mengurangi biaya dan latensi dibanding Short Polling.
import signal
import json
import time
def proses_order(payload: dict) -> None:
"""Logika bisnis — jika raise exception, pesan tidak di-delete."""
print(f"Memproses order #{payload['order_id']} — Rp{payload['total']:,.0f}")
# ... simpan ke database, kirim notifikasi, dll.
def jalankan_consumer(queue_url: str) -> None:
berjalan = True
def handle_shutdown(signum, frame):
nonlocal berjalan
print("Shutdown signal, menghentikan consumer...")
berjalan = False
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
print(f"Consumer aktif, polling dari queue...")
while berjalan:
try:
# Long Polling: tunggu hingga 20 detik jika queue kosong
# Jauh lebih efisien dari polling cepat (mengurangi biaya API call)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # ambil hingga 10 pesan per request
WaitTimeSeconds=20, # Long Polling
VisibilityTimeout=30, # override visibility timeout untuk batch ini
MessageAttributeNames=["All"],
AttributeNames=["All"]
)
messages = response.get("Messages", [])
if not messages:
continue # queue kosong, poll lagi
for msg in messages:
receipt_handle = msg["ReceiptHandle"]
msg_id = msg["MessageId"]
try:
payload = json.loads(msg["Body"])
proses_order(payload)
# WAJIB: delete setelah berhasil diproses
# Jika tidak di-delete, pesan muncul kembali setelah VisibilityTimeout
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
print(f"✓ Pesan {msg_id} diproses dan dihapus.")
except json.JSONDecodeError as e:
print(f"✗ Format pesan tidak valid: {e}")
# Delete pesan invalid -- tidak bisa diperbaiki dengan retry
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
except Exception as e:
print(f"✗ Error memproses {msg_id}: {e}")
# JANGAN delete -- biarkan VisibilityTimeout habis
# Pesan akan muncul kembali untuk diproses ulang
# Setelah maxReceiveCount kali gagal, masuk ke DLQ
except Exception as e:
print(f"Error polling: {e}")
time.sleep(5) # jeda sebelum retry jika ada error network
print("Consumer berhenti.")
jalankan_consumer(QUEUE_URL)
Selaludelete_message()setelah pemrosesan berhasil. Jika tidak, pesan akan muncul kembali di queue setelahVisibilityTimeouthabis dan diproses ulang oleh consumer lain. Sebaliknya, jangan delete pesan yang gagal diproses — biarkan SQS menanganinya melalui mekanisme retry dan Dead Letter Queue.
Batch Delete — Efisiensi Tinggi #
def consumer_batch_delete(queue_url: str) -> None:
"""
Consumer dengan batch delete -- lebih efisien untuk throughput tinggi.
Proses semua pesan, kumpulkan yang berhasil, delete sekaligus.
"""
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get("Messages", [])
if not messages:
return
berhasil_dihapus = []
for msg in messages:
try:
payload = json.loads(msg["Body"])
proses_order(payload)
berhasil_dihapus.append({
"Id": msg["MessageId"],
"ReceiptHandle": msg["ReceiptHandle"]
})
except Exception as e:
print(f"✗ Gagal proses {msg['MessageId']}: {e}")
# Tidak ditambahkan ke berhasil_dihapus -- akan retry otomatis
# Batch delete untuk pesan yang berhasil
if berhasil_dihapus:
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=berhasil_dihapus
)
print(f"Batch delete: {len(berhasil_dihapus)} pesan dihapus.")
Dead Letter Queue — Monitoring Pesan Gagal #
def proses_dlq(dlq_url: str, kirim_alert: bool = True) -> None:
"""
Baca pesan dari Dead Letter Queue untuk monitoring dan debugging.
Pesan di DLQ adalah pesan yang sudah gagal maxReceiveCount kali.
"""
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
AttributeNames=["All"]
)
messages = response.get("Messages", [])
if not messages:
print("DLQ kosong.")
return
print(f"⚠ {len(messages)} pesan di DLQ:")
for msg in messages:
attrs = msg.get("Attributes", {})
receive_count = attrs.get("ApproximateReceiveCount", "?")
try:
payload = json.loads(msg["Body"])
except Exception:
payload = msg["Body"]
print(f" ID: {msg['MessageId']}")
print(f" Sudah dicoba: {receive_count}x")
print(f" Payload: {payload}")
if kirim_alert:
# Kirim notifikasi ke tim (Slack, email, PagerDuty, dll.)
print(f" → Alert dikirim untuk pesan {msg['MessageId']}")
# Setelah dianalisis, bisa:
# 1. Delete pesan dari DLQ (jika tidak bisa dipulihkan)
# 2. Pindahkan kembali ke queue utama setelah bug diperbaiki (redrive)
def redrive_dari_dlq(dlq_url: str, main_queue_url: str) -> int:
"""Pindahkan pesan dari DLQ kembali ke queue utama setelah bug diperbaiki."""
dipindah = 0
while True:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=1
)
messages = response.get("Messages", [])
if not messages:
break
for msg in messages:
sqs.send_message(QueueUrl=main_queue_url, MessageBody=msg["Body"])
sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg["ReceiptHandle"])
dipindah += 1
print(f"{dipindah} pesan dipindahkan dari DLQ ke queue utama.")
return dipindah
Visibility Timeout — Extend saat Pemrosesan Lama #
def proses_dengan_extend_visibility(queue_url: str) -> None:
"""
Untuk pesan yang butuh waktu pemrosesan lama,
extend visibility timeout secara berkala agar tidak muncul kembali.
"""
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=20,
VisibilityTimeout=30 # beri 30 detik awal
)
messages = response.get("Messages", [])
if not messages:
return
msg = messages[0]
receipt_handle = msg["ReceiptHandle"]
try:
payload = json.loads(msg["Body"])
print(f"Memproses pesan besar: {msg['MessageId']}")
for langkah in range(5):
# Simulasi pemrosesan panjang
time.sleep(20)
# Extend visibility timeout sebelum habis
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=30 # tambah 30 detik lagi
)
print(f"Visibility timeout di-extend, langkah {langkah + 1}/5")
# Selesai -- delete pesan
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
print("Pesan berhasil diproses.")
except Exception as e:
print(f"Error: {e}")
# Jangan extend -- biarkan timeout habis dan pesan muncul kembali
Ringkasan #
- Standard vs FIFO — gunakan FIFO (nama harus
.fifo) jika urutan pesan dan exactly-once delivery kritis; gunakan Standard untuk throughput tinggi dengan pemrosesan idempoten.- Long Polling wajib — selalu set
WaitTimeSeconds=20direceive_message(); mengurangi biaya API call dan latensi dibanding Short Polling yang terus-menerus polling meski queue kosong.- Delete setelah sukses, tidak setelah gagal —
delete_message()hanya setelah pemrosesan berhasil; jika gagal, biarkanVisibilityTimeouthabis agar pesan bisa diproses ulang.- Visibility Timeout harus lebih panjang dari waktu proses — set lebih besar dari perkiraan waktu pemrosesan; gunakan
change_message_visibility()untuk extend jika pemrosesan memakan waktu lama.- Dead Letter Queue (DLQ) — selalu konfigurasi DLQ dengan
maxReceiveCountyang tepat agar pesan yang berulang kali gagal tidak memblokir queue utama.- Batch send dan batch delete — gunakan
send_message_batch()dandelete_message_batch()untuk efisiensi; biaya SQS dihitung per API request, bukan per pesan.MaxNumberOfMessages=10— ambil hingga 10 pesan per poll; lebih efisien daripada satu per satu.- IAM Role di produksi — jangan simpan kredensial AWS di kode atau environment variable di EC2/ECS/Lambda; gunakan IAM Role yang dilampirkan ke compute resource.
- Message Attributes — gunakan untuk metadata (event type, source service) yang perlu difilter tanpa parsing body pesan.
- Idempoten consumer — rancang consumer agar aman dijalankan lebih dari sekali untuk pesan yang sama, karena Standard Queue menjamin at-least-once (bukan exactly-once) delivery.