Amazon SQS #

Amazon Simple Queue Service (SQS) adalah layanan antrian pesan terkelola sepenuhnya dari AWS — tanpa server untuk dikelola, tanpa kapasitas yang perlu diprovisioning, dan otomatis skalabel hingga miliaran pesan per hari. SQS sangat cocok jika kamu sudah berada di ekosistem AWS dan ingin integrasi asinkron antar Lambda, EC2, ECS, atau service lain tanpa mengelola infrastruktur broker sendiri. Memahami dua tipe queue (Standard dan FIFO), cara kerja Visibility Timeout, dan pola consumer loop yang benar adalah kunci untuk menghindari pesan yang diproses ganda atau hilang.

Konsep Dasar SQS #

Alur pesan di SQS:

Producer                  SQS Queue                    Consumer
────────                  ─────────                    ────────
App A ──send_message──►  [msg1][msg2][msg3]  ──poll──► Worker 1
App B ──send_message──►  (pesan tersimpan    ──poll──► Worker 2
                          di AWS, terkelola)

Dua tipe queue:

Standard Queue
  ├── Throughput tidak terbatas
  ├── At-least-once delivery (pesan bisa dikirim lebih dari sekali)
  ├── Best-effort ordering (urutan tidak dijamin)
  └── Cocok untuk: task queue, notifikasi, pemrosesan yang idempoten

FIFO Queue (nama harus berakhiran .fifo)
  ├── Throughput terbatas (3.000/detik dengan batching, 300/detik tanpa)
  ├── Exactly-once delivery (tidak ada duplikat)
  ├── Strict ordering per Message Group ID
  └── Cocok untuk: transaksi keuangan, event ordering yang kritis

Visibility Timeout:
  Saat pesan di-receive, pesan "disembunyikan" dari consumer lain
  selama N detik. Jika tidak di-delete dalam waktu itu, pesan
  muncul kembali di queue dan bisa diproses ulang.

  [Queue]─receive──►[Worker memproses]──delete──►[Pesan hilang dari queue]
                     │
                     └─ jika crash atau timeout: pesan muncul kembali ✓

Instalasi dan Autentikasi #

pip install boto3

AWS membutuhkan kredensial untuk setiap API call. Ada beberapa cara autentikasi — urutan prioritasnya:

1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
2. AWS credentials file (~/.aws/credentials)
3. AWS IAM Role (untuk EC2, Lambda, ECS — cara terbaik di produksi)
4. Parameter eksplisit di kode (JANGAN lakukan ini)
import boto3
import os

# ANTI-PATTERN: hardcode kredensial di kode
sqs = boto3.client(
    "sqs",
    aws_access_key_id="AKIAXXXXXXXXXXXXXXXX",      # ✗ -- jangan pernah ini
    aws_secret_access_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
)

# BENAR cara 1: baca dari environment variable
sqs = boto3.client(
    "sqs",
    region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-1"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    aws_session_token=os.getenv("AWS_SESSION_TOKEN")   # jika pakai temporary credentials
)

# BENAR cara 2: boto3 otomatis baca dari ~/.aws/credentials atau IAM Role
sqs = boto3.client("sqs", region_name="ap-southeast-1")

# BENAR cara 3: gunakan resource API (lebih pythonic)
sqs_resource = boto3.resource("sqs", region_name="ap-southeast-1")
Di lingkungan produksi pada EC2, ECS, atau Lambda, gunakan IAM Role yang dilampirkan ke compute resource — tidak perlu menyimpan kredensial sama sekali. Boto3 otomatis mengambil credentials dari instance metadata service. Ini adalah cara paling aman karena credentials di-rotate otomatis.

Membuat Queue #

import boto3
import json
import os

sqs = boto3.client("sqs", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-1"))

def buat_standard_queue(nama: str, retention_detik: int = 86400) -> str:
    """Buat Standard Queue, kembalikan URL."""
    response = sqs.create_queue(
        QueueName=nama,
        Attributes={
            "VisibilityTimeout":      "30",              # detik pesan disembunyikan saat diproses
            "MessageRetentionPeriod": str(retention_detik),  # berapa lama pesan disimpan (maks 14 hari)
            "ReceiveMessageWaitTimeSeconds": "20",       # Long Polling -- tunggu hingga 20 detik
            "RedrivePolicy": json.dumps({               # Dead Letter Queue setelah 3x gagal
                "deadLetterTargetArn": buat_dlq(nama + "-dlq"),
                "maxReceiveCount":     "3"
            })
        }
    )
    print(f"Standard Queue dibuat: {response['QueueUrl']}")
    return response["QueueUrl"]

def buat_fifo_queue(nama: str) -> str:
    """Buat FIFO Queue -- nama harus berakhiran .fifo"""
    if not nama.endswith(".fifo"):
        nama += ".fifo"

    response = sqs.create_queue(
        QueueName=nama,
        Attributes={
            "FifoQueue":                    "true",
            "ContentBasedDeduplication":    "true",   # dedup otomatis berdasarkan isi pesan
            "VisibilityTimeout":            "60",
            "MessageRetentionPeriod":       "86400",
            "ReceiveMessageWaitTimeSeconds": "20",
        }
    )
    print(f"FIFO Queue dibuat: {response['QueueUrl']}")
    return response["QueueUrl"]

def buat_dlq(nama: str) -> str:
    """Buat Dead Letter Queue, kembalikan ARN."""
    response = sqs.create_queue(
        QueueName=nama,
        Attributes={
            "MessageRetentionPeriod": str(14 * 24 * 3600)  # simpan 14 hari
        }
    )
    url = response["QueueUrl"]
    # Ambil ARN dari queue yang baru dibuat
    attrs = sqs.get_queue_attributes(QueueUrl=url, AttributeNames=["QueueArn"])
    return attrs["Attributes"]["QueueArn"]

def ambil_queue_url(nama: str) -> str:
    """Ambil URL queue yang sudah ada."""
    response = sqs.get_queue_url(QueueName=nama)
    return response["QueueUrl"]

Mengirim Pesan #

import json
from datetime import datetime, timezone
import uuid

QUEUE_URL = ambil_queue_url("order-queue")

def kirim_pesan(queue_url: str, payload: dict, delay_detik: int = 0) -> str:
    """Kirim satu pesan ke Standard Queue."""
    body = json.dumps(payload, ensure_ascii=False)

    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=body,
        DelaySeconds=delay_detik,         # tunda pengiriman (0–900 detik)
        MessageAttributes={
            "source": {
                "DataType":    "String",
                "StringValue": "order-service"
            },
            "event_type": {
                "DataType":    "String",
                "StringValue": payload.get("event", "unknown")
            }
        }
    )
    msg_id = response["MessageId"]
    print(f"Pesan terkirim: {msg_id}")
    return msg_id

def kirim_pesan_fifo(queue_url: str, payload: dict, group_id: str) -> str:
    """Kirim pesan ke FIFO Queue dengan Message Group ID."""
    body = json.dumps(payload, ensure_ascii=False)

    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=body,
        MessageGroupId=group_id,               # pesan dalam group yang sama dijamin urut
        MessageDeduplicationId=str(uuid.uuid4())  # ID unik untuk cegah duplikat
    )
    return response["MessageId"]

# Contoh penggunaan
order = {
    "event":    "order.created",
    "order_id": 1001,
    "user_id":  42,
    "total":    18500000,
    "timestamp": datetime.now(timezone.utc).isoformat()
}

kirim_pesan(QUEUE_URL, order)

# FIFO -- pesan per user_id dijamin urut
kirim_pesan_fifo(
    ambil_queue_url("order-fifo.fifo"),
    order,
    group_id=f"user-{order['user_id']}"  # semua order user 42 diproses berurutan
)

Batch Send — Lebih Efisien #

def kirim_batch(queue_url: str, payloads: list[dict]) -> dict:
    """
    Kirim hingga 10 pesan sekaligus.
    Lebih efisien dari kirim satu per satu (biaya AWS dihitung per request).
    """
    entries = [
        {
            "Id":          str(i),
            "MessageBody": json.dumps(p, ensure_ascii=False),
            "MessageAttributes": {
                "event_type": {
                    "DataType":    "String",
                    "StringValue": p.get("event", "unknown")
                }
            }
        }
        for i, p in enumerate(payloads[:10])   # maks 10 per batch
    ]

    response = sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)

    sukses = len(response.get("Successful", []))
    gagal  = len(response.get("Failed", []))

    if response.get("Failed"):
        for f in response["Failed"]:
            print(f"Gagal kirim ID {f['Id']}: {f['Message']}")

    print(f"Batch terkirim: {sukses} sukses, {gagal} gagal")
    return response

# Kirim 10 order sekaligus
orders = [{"event": "order.created", "order_id": i, "total": i * 10000} for i in range(1, 11)]
kirim_batch(QUEUE_URL, orders)

Consumer Loop #

SQS menggunakan model pull — consumer harus aktif meminta pesan. Gunakan Long Polling (WaitTimeSeconds=20) untuk mengurangi biaya dan latensi dibanding Short Polling.

import signal
import json
import time

def proses_order(payload: dict) -> None:
    """Logika bisnis — jika raise exception, pesan tidak di-delete."""
    print(f"Memproses order #{payload['order_id']} — Rp{payload['total']:,.0f}")
    # ... simpan ke database, kirim notifikasi, dll.

def jalankan_consumer(queue_url: str) -> None:
    berjalan = True

    def handle_shutdown(signum, frame):
        nonlocal berjalan
        print("Shutdown signal, menghentikan consumer...")
        berjalan = False

    signal.signal(signal.SIGINT,  handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    print(f"Consumer aktif, polling dari queue...")

    while berjalan:
        try:
            # Long Polling: tunggu hingga 20 detik jika queue kosong
            # Jauh lebih efisien dari polling cepat (mengurangi biaya API call)
            response = sqs.receive_message(
                QueueUrl=queue_url,
                MaxNumberOfMessages=10,       # ambil hingga 10 pesan per request
                WaitTimeSeconds=20,           # Long Polling
                VisibilityTimeout=30,         # override visibility timeout untuk batch ini
                MessageAttributeNames=["All"],
                AttributeNames=["All"]
            )

            messages = response.get("Messages", [])
            if not messages:
                continue   # queue kosong, poll lagi

            for msg in messages:
                receipt_handle = msg["ReceiptHandle"]
                msg_id         = msg["MessageId"]

                try:
                    payload = json.loads(msg["Body"])
                    proses_order(payload)

                    # WAJIB: delete setelah berhasil diproses
                    # Jika tidak di-delete, pesan muncul kembali setelah VisibilityTimeout
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=receipt_handle
                    )
                    print(f"✓ Pesan {msg_id} diproses dan dihapus.")

                except json.JSONDecodeError as e:
                    print(f"✗ Format pesan tidak valid: {e}")
                    # Delete pesan invalid -- tidak bisa diperbaiki dengan retry
                    sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)

                except Exception as e:
                    print(f"✗ Error memproses {msg_id}: {e}")
                    # JANGAN delete -- biarkan VisibilityTimeout habis
                    # Pesan akan muncul kembali untuk diproses ulang
                    # Setelah maxReceiveCount kali gagal, masuk ke DLQ

        except Exception as e:
            print(f"Error polling: {e}")
            time.sleep(5)   # jeda sebelum retry jika ada error network

    print("Consumer berhenti.")

jalankan_consumer(QUEUE_URL)
Selalu delete_message() setelah pemrosesan berhasil. Jika tidak, pesan akan muncul kembali di queue setelah VisibilityTimeout habis dan diproses ulang oleh consumer lain. Sebaliknya, jangan delete pesan yang gagal diproses — biarkan SQS menanganinya melalui mekanisme retry dan Dead Letter Queue.

Batch Delete — Efisiensi Tinggi #

def consumer_batch_delete(queue_url: str) -> None:
    """
    Consumer dengan batch delete -- lebih efisien untuk throughput tinggi.
    Proses semua pesan, kumpulkan yang berhasil, delete sekaligus.
    """
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20
    )
    messages = response.get("Messages", [])
    if not messages:
        return

    berhasil_dihapus = []

    for msg in messages:
        try:
            payload = json.loads(msg["Body"])
            proses_order(payload)
            berhasil_dihapus.append({
                "Id":            msg["MessageId"],
                "ReceiptHandle": msg["ReceiptHandle"]
            })
        except Exception as e:
            print(f"✗ Gagal proses {msg['MessageId']}: {e}")
            # Tidak ditambahkan ke berhasil_dihapus -- akan retry otomatis

    # Batch delete untuk pesan yang berhasil
    if berhasil_dihapus:
        sqs.delete_message_batch(
            QueueUrl=queue_url,
            Entries=berhasil_dihapus
        )
        print(f"Batch delete: {len(berhasil_dihapus)} pesan dihapus.")

Dead Letter Queue — Monitoring Pesan Gagal #

def proses_dlq(dlq_url: str, kirim_alert: bool = True) -> None:
    """
    Baca pesan dari Dead Letter Queue untuk monitoring dan debugging.
    Pesan di DLQ adalah pesan yang sudah gagal maxReceiveCount kali.
    """
    response = sqs.receive_message(
        QueueUrl=dlq_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=5,
        AttributeNames=["All"]
    )
    messages = response.get("Messages", [])

    if not messages:
        print("DLQ kosong.")
        return

    print(f"⚠ {len(messages)} pesan di DLQ:")
    for msg in messages:
        attrs        = msg.get("Attributes", {})
        receive_count = attrs.get("ApproximateReceiveCount", "?")

        try:
            payload = json.loads(msg["Body"])
        except Exception:
            payload = msg["Body"]

        print(f"  ID: {msg['MessageId']}")
        print(f"  Sudah dicoba: {receive_count}x")
        print(f"  Payload: {payload}")

        if kirim_alert:
            # Kirim notifikasi ke tim (Slack, email, PagerDuty, dll.)
            print(f"  → Alert dikirim untuk pesan {msg['MessageId']}")

    # Setelah dianalisis, bisa:
    # 1. Delete pesan dari DLQ (jika tidak bisa dipulihkan)
    # 2. Pindahkan kembali ke queue utama setelah bug diperbaiki (redrive)

def redrive_dari_dlq(dlq_url: str, main_queue_url: str) -> int:
    """Pindahkan pesan dari DLQ kembali ke queue utama setelah bug diperbaiki."""
    dipindah = 0
    while True:
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=1
        )
        messages = response.get("Messages", [])
        if not messages:
            break

        for msg in messages:
            sqs.send_message(QueueUrl=main_queue_url, MessageBody=msg["Body"])
            sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg["ReceiptHandle"])
            dipindah += 1

    print(f"{dipindah} pesan dipindahkan dari DLQ ke queue utama.")
    return dipindah

Visibility Timeout — Extend saat Pemrosesan Lama #

def proses_dengan_extend_visibility(queue_url: str) -> None:
    """
    Untuk pesan yang butuh waktu pemrosesan lama,
    extend visibility timeout secara berkala agar tidak muncul kembali.
    """
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,
        VisibilityTimeout=30   # beri 30 detik awal
    )
    messages = response.get("Messages", [])
    if not messages:
        return

    msg            = messages[0]
    receipt_handle = msg["ReceiptHandle"]

    try:
        payload = json.loads(msg["Body"])
        print(f"Memproses pesan besar: {msg['MessageId']}")

        for langkah in range(5):
            # Simulasi pemrosesan panjang
            time.sleep(20)

            # Extend visibility timeout sebelum habis
            sqs.change_message_visibility(
                QueueUrl=queue_url,
                ReceiptHandle=receipt_handle,
                VisibilityTimeout=30   # tambah 30 detik lagi
            )
            print(f"Visibility timeout di-extend, langkah {langkah + 1}/5")

        # Selesai -- delete pesan
        sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
        print("Pesan berhasil diproses.")

    except Exception as e:
        print(f"Error: {e}")
        # Jangan extend -- biarkan timeout habis dan pesan muncul kembali

Ringkasan #

  • Standard vs FIFO — gunakan FIFO (nama harus .fifo) jika urutan pesan dan exactly-once delivery kritis; gunakan Standard untuk throughput tinggi dengan pemrosesan idempoten.
  • Long Polling wajib — selalu set WaitTimeSeconds=20 di receive_message(); mengurangi biaya API call dan latensi dibanding Short Polling yang terus-menerus polling meski queue kosong.
  • Delete setelah sukses, tidak setelah gagaldelete_message() hanya setelah pemrosesan berhasil; jika gagal, biarkan VisibilityTimeout habis agar pesan bisa diproses ulang.
  • Visibility Timeout harus lebih panjang dari waktu proses — set lebih besar dari perkiraan waktu pemrosesan; gunakan change_message_visibility() untuk extend jika pemrosesan memakan waktu lama.
  • Dead Letter Queue (DLQ) — selalu konfigurasi DLQ dengan maxReceiveCount yang tepat agar pesan yang berulang kali gagal tidak memblokir queue utama.
  • Batch send dan batch delete — gunakan send_message_batch() dan delete_message_batch() untuk efisiensi; biaya SQS dihitung per API request, bukan per pesan.
  • MaxNumberOfMessages=10 — ambil hingga 10 pesan per poll; lebih efisien daripada satu per satu.
  • IAM Role di produksi — jangan simpan kredensial AWS di kode atau environment variable di EC2/ECS/Lambda; gunakan IAM Role yang dilampirkan ke compute resource.
  • Message Attributes — gunakan untuk metadata (event type, source service) yang perlu difilter tanpa parsing body pesan.
  • Idempoten consumer — rancang consumer agar aman dijalankan lebih dari sekali untuk pesan yang sama, karena Standard Queue menjamin at-least-once (bukan exactly-once) delivery.

← Sebelumnya: RabbitMQ   Berikutnya: Google Pub/Sub →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact