Redis #

Redis adalah penyimpanan struktur data in-memory yang berfungsi sebagai cache, database, message broker, dan queue sekaligus — dengan latensi sub-milidetik yang tidak bisa ditandingi database disk-based. Kecepatan Redis bukan hanya karena data disimpan di memori, tapi karena arsitektur single-threaded yang menghindari context switching, dan struktur data yang sudah dioptimalkan: String, Hash, List, Set, Sorted Set, Stream, dan lainnya, masing-masing dirancang untuk use case spesifik. Dalam pengembangan aplikasi Python, Redis paling banyak dipakai untuk caching hasil query database, session storage, rate limiting, leaderboard real-time, dan distributed lock antar instance.

Instalasi #

pip install redis

Untuk menjalankan Redis secara lokal:

# Docker (paling mudah)
docker run -d --name redis -p 6379:6379 redis:latest

# Dengan password
docker run -d --name redis -p 6379:6379 redis:latest redis-server --requirepass "rahasia"

Membuat Koneksi #

import redis
import os

# ANTI-PATTERN: koneksi hardcode tanpa password, tanpa decode_responses
r = redis.Redis(host="localhost", port=6379, db=0)
nilai = r.get("kunci")   # ✗ -- nilai berupa bytes: b"data", bukan string

# BENAR: gunakan environment variable, password, dan decode_responses=True
r = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", "6379")),
    password=os.getenv("REDIS_PASSWORD"),
    db=int(os.getenv("REDIS_DB", "0")),
    decode_responses=True,    # kembalikan str bukan bytes
    socket_timeout=5,         # timeout saat operasi
    socket_connect_timeout=5  # timeout saat koneksi
)

# Atau menggunakan URL (lebih ringkas)
r = redis.from_url(
    os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    decode_responses=True,
    socket_timeout=5
)

# Tes koneksi
try:
    r.ping()
    print("Koneksi Redis berhasil.")
except redis.ConnectionError as e:
    print(f"Koneksi gagal: {e}")

Connection Pool #

# Connection Pool -- satu pool untuk seluruh aplikasi
pool = redis.ConnectionPool(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", "6379")),
    password=os.getenv("REDIS_PASSWORD"),
    db=0,
    decode_responses=True,
    max_connections=20    # batas koneksi dalam pool
)

# Buat client yang menggunakan pool
r = redis.Redis(connection_pool=pool)

# Semua operasi memakai koneksi dari pool yang sama
# Koneksi dikembalikan ke pool otomatis setelah operasi selesai
decode_responses=True adalah parameter penting yang sering dilupakan. Tanpanya, semua nilai yang dikembalikan Redis berupa bytes (b"nilai"), bukan str. Aktifkan selalu kecuali kamu memang bekerja dengan data binary.

String — Tipe Data Paling Dasar #

String di Redis bukan hanya teks — bisa menyimpan integer, float, atau data serialisasi apapun hingga 512MB per key.

import json
from datetime import timedelta

# Set dan Get dasar
r.set("nama", "Budi Santoso")
print(r.get("nama"))   # "Budi Santoso"

# Set dengan TTL (Time To Live) -- kunci otomatis dihapus setelah N detik
r.set("token:abc123", "user-42", ex=3600)          # expire dalam 3600 detik (1 jam)
r.set("otp:081234", "7823", px=300000)             # expire dalam 300000 milidetik (5 menit)
r.setex("session:xyz", timedelta(hours=24), "data")  # menggunakan timedelta

# Cek TTL yang tersisa
print(r.ttl("token:abc123"))   # detik tersisa, -1 jika tidak ada TTL, -2 jika key tidak ada

# Increment / decrement -- atomik
r.set("halaman_dilihat", 0)
r.incr("halaman_dilihat")         # +1
r.incrby("halaman_dilihat", 10)   # +10
r.decr("halaman_dilihat")         # -1
print(r.get("halaman_dilihat"))   # "9"

# Simpan JSON (serialisasi manual)
pengguna = {"id": 1, "nama": "Budi", "email": "[email protected]"}
r.set("pengguna:1", json.dumps(pengguna), ex=3600)

# Ambil JSON
data_raw = r.get("pengguna:1")
pengguna = json.loads(data_raw) if data_raw else None

# Operasi atomik: SET hanya jika key belum ada (NX = Not eXists)
berhasil = r.set("kunci_unik", "nilai", nx=True, ex=60)
print("Set berhasil:", berhasil)   # True jika baru diset, None jika sudah ada

Pola Caching #

Cache-Aside (Lazy Loading) #

Pola paling umum — baca dari cache dahulu, jika miss baru baca dari database dan simpan ke cache.

import json
from typing import Callable, Any

def get_dengan_cache(
    r:          redis.Redis,
    cache_key:  str,
    fetch_fn:   Callable,       # fungsi untuk fetch data dari source (DB, API, dll.)
    ttl_detik:  int = 3600
) -> Any:
    """
    Cache-Aside pattern:
    1. Cek cache
    2. Jika hit → kembalikan dari cache
    3. Jika miss → fetch dari source, simpan ke cache, kembalikan
    """
    # Cek cache
    cached = r.get(cache_key)
    if cached is not None:
        print(f"Cache HIT: {cache_key}")
        return json.loads(cached)

    # Cache miss -- fetch dari source
    print(f"Cache MISS: {cache_key}")
    data = fetch_fn()

    if data is not None:
        r.set(cache_key, json.dumps(data, ensure_ascii=False), ex=ttl_detik)

    return data

# Contoh penggunaan
def ambil_produk_dari_db(produk_id: int) -> dict:
    # Simulasi query database
    return {"id": produk_id, "nama": "Laptop Gaming", "harga": 18500000}

produk = get_dengan_cache(
    r,
    cache_key=f"produk:{101}",
    fetch_fn=lambda: ambil_produk_dari_db(101),
    ttl_detik=1800
)
print(produk)

Cache Invalidation #

def update_produk(r: redis.Redis, produk_id: int, data_baru: dict) -> None:
    """Update database dan invalidasi cache terkait."""
    # 1. Update database (tidak ditampilkan)
    # update_produk_di_db(produk_id, data_baru)

    # 2. Hapus cache yang sudah tidak valid
    r.delete(f"produk:{produk_id}")

    # Atau update cache langsung (write-through)
    r.set(
        f"produk:{produk_id}",
        json.dumps(data_baru, ensure_ascii=False),
        ex=1800
    )

def invalidasi_cache_pattern(r: redis.Redis, pattern: str) -> int:
    """
    Hapus semua key yang cocok dengan pattern.
    Gunakan dengan hati-hati di produksi -- scan bisa lambat untuk data besar.
    """
    keys = r.keys(pattern)   # contoh: "produk:*", "session:user-42:*"
    if keys:
        return r.delete(*keys)
    return 0

# Hapus semua cache produk
jumlah_dihapus = invalidasi_cache_pattern(r, "produk:*")
print(f"{jumlah_dihapus} cache key dihapus.")

Hash — Struktur Mirip Dictionary #

Hash cocok untuk menyimpan objek dengan banyak field — lebih efisien dari menyimpan JSON string jika kamu sering mengakses field tertentu saja.

# Set hash
r.hset("pengguna:42", mapping={
    "nama":   "Budi Santoso",
    "email":  "[email protected]",
    "usia":   "28",
    "aktif":  "1"
})

# Get satu field
nama = r.hget("pengguna:42", "nama")         # "Budi Santoso"

# Get semua field
semua = r.hgetall("pengguna:42")             # dict semua field
print(semua)  # {'nama': 'Budi Santoso', 'email': '...', 'usia': '28', 'aktif': '1'}

# Get beberapa field tertentu
nama, email = r.hmget("pengguna:42", ["nama", "email"])

# Update satu field saja (tanpa mengubah yang lain)
r.hset("pengguna:42", "usia", "29")

# Hapus satu field
r.hdel("pengguna:42", "aktif")

# Set TTL untuk seluruh hash key
r.expire("pengguna:42", 3600)

# Increment field numerik
r.hincrby("pengguna:42", "login_count", 1)

List — Antrian dan Stack #

List Redis adalah linked list — push/pop di kedua ujung berjalan dalam O(1). Cocok untuk task queue, activity feed, atau log terbatas.

# Push ke kanan (FIFO queue)
r.rpush("antrian:email", json.dumps({"to": "[email protected]", "subject": "Welcome"}))
r.rpush("antrian:email", json.dumps({"to": "[email protected]", "subject": "Order"}))

# Pop dari kiri (ambil yang pertama masuk)
item_raw = r.lpop("antrian:email")
item     = json.loads(item_raw) if item_raw else None

# Blocking pop -- tunggu hingga ada item (berguna untuk worker)
item_raw = r.blpop("antrian:email", timeout=30)   # tunggu 30 detik

# Panjang list
print(r.llen("antrian:email"))

# Ambil semua item tanpa menghapus
semua = r.lrange("antrian:email", 0, -1)   # 0 = indeks pertama, -1 = terakhir

# Batasi list ke N item terakhir (sliding window)
r.ltrim("log:aktivitas", 0, 999)   # simpan hanya 1000 item terbaru

Set — Koleksi Unik #

Set cocok untuk menyimpan koleksi unik: tag, followers, item yang sudah dikunjungi, atau anggota grup.

# Tambah ke set
r.sadd("tag:produk:101", "laptop", "gaming", "asus")
r.sadd("tag:produk:102", "laptop", "bisnis", "lenovo")

# Cek keberadaan
print(r.sismember("tag:produk:101", "gaming"))   # True
print(r.sismember("tag:produk:101", "bisnis"))   # False

# Semua anggota
print(r.smembers("tag:produk:101"))   # {'laptop', 'gaming', 'asus'}

# Operasi himpunan
irisan      = r.sinter("tag:produk:101", "tag:produk:102")   # {'laptop'}
gabungan    = r.sunion("tag:produk:101", "tag:produk:102")   # semua tag
perbedaan   = r.sdiff("tag:produk:101", "tag:produk:102")    # {'gaming', 'asus'}

# Hapus dari set
r.srem("tag:produk:101", "asus")

Sorted Set — Leaderboard dan Rate Limiting #

Sorted Set adalah set dengan skor float — anggota diurutkan berdasarkan skor. Cocok untuk leaderboard, rating, antrian prioritas, dan rate limiting.

# Leaderboard skor game
r.zadd("leaderboard:game", {
    "player_alice": 9500,
    "player_budi":  8750,
    "player_sari":  9200,
    "player_andi":  7800
})

# Tambah/update skor
r.zincrby("leaderboard:game", 500, "player_budi")   # +500 untuk budi

# Top 3 pemain (skor tertinggi)
top3 = r.zrange("leaderboard:game", 0, 2, desc=True, withscores=True)
print("Top 3:")
for rank, (pemain, skor) in enumerate(top3, start=1):
    print(f"  #{rank} {pemain}: {int(skor)}")

# Peringkat seorang pemain (0-based)
rank = r.zrevrank("leaderboard:game", "player_budi")
print(f"Peringkat budi: #{rank + 1}")

# Skor seorang pemain
skor = r.zscore("leaderboard:game", "player_budi")
print(f"Skor budi: {int(skor)}")

Rate Limiting dengan Sorted Set #

def cek_rate_limit(r: redis.Redis, user_id: str, maks_request: int = 10, window_detik: int = 60) -> bool:
    """
    Sliding window rate limiter menggunakan Sorted Set.
    Kembalikan True jika request diizinkan, False jika sudah melampaui batas.
    """
    import time

    key     = f"rate_limit:{user_id}"
    sekarang = time.time()
    window_start = sekarang - window_detik

    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)   # hapus request di luar window
    pipe.zadd(key, {str(sekarang): sekarang})     # tambahkan request sekarang
    pipe.zcard(key)                               # hitung request dalam window
    pipe.expire(key, window_detik)               # set TTL agar key tidak menumpuk
    results = pipe.execute()

    jumlah_request = results[2]
    return jumlah_request <= maks_request

# Cek rate limit
for i in range(12):
    diizinkan = cek_rate_limit(r, "user-42", maks_request=10, window_detik=60)
    print(f"Request {i+1}: {'✓ diizinkan' if diizinkan else '✗ ditolak'}")

Pipeline — Batch Commands #

Pipeline mengirim banyak perintah sekaligus dalam satu network round-trip, mengurangi latensi secara signifikan.

# ANTI-PATTERN: operasi satu per satu (N network round-trips)
for i in range(100):
    r.set(f"kunci:{i}", f"nilai:{i}")   # ✗ -- 100 round-trip ke Redis

# BENAR: gunakan pipeline (1 round-trip untuk semua)
pipe = r.pipeline(transaction=False)   # transaction=False = lebih cepat, non-atomic
for i in range(100):
    pipe.set(f"kunci:{i}", f"nilai:{i}", ex=3600)
pipe.execute()   # ✓ -- satu round-trip

# Pipeline dengan transaksi atomik (MULTI/EXEC)
with r.pipeline(transaction=True) as pipe:
    pipe.set("saldo:alice", 1000)
    pipe.set("saldo:budi",  500)
    pipe.execute()   # semua atau tidak sama sekali

# Pipeline dengan watch (optimistic locking)
def transfer_saldo(r: redis.Redis, dari: str, ke: str, jumlah: int) -> bool:
    kunci_dari = f"saldo:{dari}"
    kunci_ke   = f"saldo:{ke}"

    with r.pipeline() as pipe:
        while True:
            try:
                pipe.watch(kunci_dari, kunci_ke)   # pantau perubahan
                saldo_dari = int(pipe.get(kunci_dari) or 0)

                if saldo_dari < jumlah:
                    pipe.unwatch()
                    return False

                pipe.multi()   # mulai transaksi
                pipe.decrby(kunci_dari, jumlah)
                pipe.incrby(kunci_ke,   jumlah)
                pipe.execute()   # commit -- akan gagal jika ada perubahan dari luar
                return True

            except redis.WatchError:
                # Data berubah saat kita sedang proses -- coba lagi
                continue

r.set("saldo:alice", 1000)
r.set("saldo:budi",  500)
berhasil = transfer_saldo(r, "alice", "budi", 300)
print(f"Transfer: {'berhasil' if berhasil else 'gagal'}")
print(f"Alice: {r.get('saldo:alice')}, Budi: {r.get('saldo:budi')}")

Distributed Lock #

Distributed lock memastikan hanya satu instance yang menjalankan operasi kritis pada satu waktu — penting di lingkungan dengan banyak server.

import uuid
import time

def acquire_lock(r: redis.Redis, lock_name: str, ttl_detik: int = 30) -> str | None:
    """
    Acquire distributed lock. Kembalikan lock_value jika berhasil, None jika gagal.
    Gunakan SET NX EX -- atomik, tidak bisa di-race condition.
    """
    lock_key   = f"lock:{lock_name}"
    lock_value = str(uuid.uuid4())   # nilai unik per lock

    # SET hanya jika belum ada (NX), dengan TTL (EX)
    berhasil = r.set(lock_key, lock_value, nx=True, ex=ttl_detik)
    return lock_value if berhasil else None

def release_lock(r: redis.Redis, lock_name: str, lock_value: str) -> bool:
    """
    Release lock. Hanya bisa di-release oleh yang mengacquire (cek lock_value).
    Gunakan Lua script untuk atomicity.
    """
    lock_key = f"lock:{lock_name}"

    # Script Lua: cek value lalu delete secara atomik
    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(lua_script, 1, lock_key, lock_value)
    return bool(result)

# Penggunaan distributed lock
def proses_dengan_lock(r: redis.Redis, operasi_id: str) -> None:
    lock_value = acquire_lock(r, f"operasi:{operasi_id}", ttl_detik=30)

    if not lock_value:
        print(f"Operasi {operasi_id} sedang diproses instance lain, skip.")
        return

    try:
        print(f"Lock acquired untuk operasi {operasi_id}")
        # Proses operasi kritis di sini
        time.sleep(1)
        print(f"Operasi {operasi_id} selesai.")
    finally:
        release_lock(r, f"operasi:{operasi_id}", lock_value)
        print(f"Lock released untuk operasi {operasi_id}")

proses_dengan_lock(r, "generate-report")

Pub/Sub Redis #

Redis juga mendukung Pub/Sub sederhana untuk messaging real-time antar proses.

import threading

def publisher_loop(r: redis.Redis) -> None:
    """Publish pesan ke channel Redis."""
    import time
    for i in range(5):
        payload = json.dumps({"event": "update", "seq": i})
        r.publish("notifikasi:order", payload)
        print(f"Published: {payload}")
        time.sleep(1)

def subscriber_loop(r: redis.Redis) -> None:
    """Subscribe ke channel dan proses pesan."""
    pubsub = r.pubsub()
    pubsub.subscribe("notifikasi:order")

    for message in pubsub.listen():
        if message["type"] == "message":
            payload = json.loads(message["data"])
            print(f"Diterima: {payload}")

# Jalankan subscriber di thread terpisah
sub_thread = threading.Thread(target=subscriber_loop, args=(r,), daemon=True)
sub_thread.start()

# Publisher di main thread
publisher_loop(r)

Penanganan Error #

from redis.exceptions import ConnectionError, TimeoutError, ResponseError

def get_cache_aman(r: redis.Redis, key: str) -> str | None:
    try:
        return r.get(key)
    except ConnectionError:
        print("Redis tidak bisa dijangkau — fallback ke database.")
        return None
    except TimeoutError:
        print("Redis timeout — coba lagi.")
        return None
    except ResponseError as e:
        print(f"Redis response error: {e}")
        return None

# Pola dengan fallback ke database jika Redis down
def ambil_data(r: redis.Redis, key: str, fetch_fn) -> Any:
    try:
        cached = r.get(key)
        if cached:
            return json.loads(cached)
    except (ConnectionError, TimeoutError):
        pass   # Redis tidak tersedia, langsung ke database

    data = fetch_fn()
    try:
        if data:
            r.set(key, json.dumps(data), ex=3600)
    except (ConnectionError, TimeoutError):
        pass   # Tidak bisa simpan cache, tapi data tetap dikembalikan

    return data

Ringkasan #

  • decode_responses=True — selalu aktifkan agar nilai dikembalikan sebagai str bukan bytes; tanpanya semua nilai berupa b"...".
  • Connection Pool — buat satu ConnectionPool untuk seluruh aplikasi; redis-py mengelola koneksi secara otomatis dari pool.
  • TTL untuk semua cache key — selalu set ex (detik) atau px (milidetik) saat menyimpan cache agar memory tidak penuh secara perlahan.
  • Pipeline untuk operasi massal — gunakan r.pipeline() untuk mengirim banyak perintah dalam satu round-trip; mengurangi latensi secara signifikan.
  • Hash vs JSON string — gunakan Hash jika sering mengakses field tertentu saja; gunakan JSON string jika selalu butuh seluruh objek.
  • Sorted Set untuk leaderboard dan rate limiting — skor float memungkinkan pengurutan, peringkat, dan sliding window secara efisien.
  • SET NX EX untuk distributed lock — atomik dan aman; gunakan Lua script untuk release agar cek-dan-hapus juga atomik.
  • BLPOP untuk task queue — blocking pop lebih efisien dari polling aktif; worker tidur sampai ada pekerjaan baru.
  • Cache-Aside pattern — cek cache → hit? kembalikan; miss? fetch dari source, simpan ke cache, kembalikan. Selalu sertakan TTL.
  • Fallback ke database — tangkap ConnectionError dan TimeoutError agar aplikasi tetap berjalan meski Redis down; Redis harus menjadi optimasi, bukan titik kegagalan tunggal.

← Sebelumnya: Google Pub/Sub   Berikutnya: Memcached →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact