Redis #
Redis adalah penyimpanan struktur data in-memory yang berfungsi sebagai cache, database, message broker, dan queue sekaligus — dengan latensi sub-milidetik yang tidak bisa ditandingi database disk-based. Kecepatan Redis bukan hanya karena data disimpan di memori, tapi karena arsitektur single-threaded yang menghindari context switching, dan struktur data yang sudah dioptimalkan: String, Hash, List, Set, Sorted Set, Stream, dan lainnya, masing-masing dirancang untuk use case spesifik. Dalam pengembangan aplikasi Python, Redis paling banyak dipakai untuk caching hasil query database, session storage, rate limiting, leaderboard real-time, dan distributed lock antar instance.
Instalasi #
pip install redis
Untuk menjalankan Redis secara lokal:
# Docker (paling mudah)
docker run -d --name redis -p 6379:6379 redis:latest
# Dengan password
docker run -d --name redis -p 6379:6379 redis:latest redis-server --requirepass "rahasia"
Membuat Koneksi #
import redis
import os
# ANTI-PATTERN: koneksi hardcode tanpa password, tanpa decode_responses
r = redis.Redis(host="localhost", port=6379, db=0)
nilai = r.get("kunci") # ✗ -- nilai berupa bytes: b"data", bukan string
# BENAR: gunakan environment variable, password, dan decode_responses=True
r = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD"),
db=int(os.getenv("REDIS_DB", "0")),
decode_responses=True, # kembalikan str bukan bytes
socket_timeout=5, # timeout saat operasi
socket_connect_timeout=5 # timeout saat koneksi
)
# Atau menggunakan URL (lebih ringkas)
r = redis.from_url(
os.getenv("REDIS_URL", "redis://localhost:6379/0"),
decode_responses=True,
socket_timeout=5
)
# Tes koneksi
try:
r.ping()
print("Koneksi Redis berhasil.")
except redis.ConnectionError as e:
print(f"Koneksi gagal: {e}")
Connection Pool #
# Connection Pool -- satu pool untuk seluruh aplikasi
pool = redis.ConnectionPool(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD"),
db=0,
decode_responses=True,
max_connections=20 # batas koneksi dalam pool
)
# Buat client yang menggunakan pool
r = redis.Redis(connection_pool=pool)
# Semua operasi memakai koneksi dari pool yang sama
# Koneksi dikembalikan ke pool otomatis setelah operasi selesai
decode_responses=Trueadalah parameter penting yang sering dilupakan. Tanpanya, semua nilai yang dikembalikan Redis berupabytes(b"nilai"), bukanstr. Aktifkan selalu kecuali kamu memang bekerja dengan data binary.
String — Tipe Data Paling Dasar #
String di Redis bukan hanya teks — bisa menyimpan integer, float, atau data serialisasi apapun hingga 512MB per key.
import json
from datetime import timedelta
# Set dan Get dasar
r.set("nama", "Budi Santoso")
print(r.get("nama")) # "Budi Santoso"
# Set dengan TTL (Time To Live) -- kunci otomatis dihapus setelah N detik
r.set("token:abc123", "user-42", ex=3600) # expire dalam 3600 detik (1 jam)
r.set("otp:081234", "7823", px=300000) # expire dalam 300000 milidetik (5 menit)
r.setex("session:xyz", timedelta(hours=24), "data") # menggunakan timedelta
# Cek TTL yang tersisa
print(r.ttl("token:abc123")) # detik tersisa, -1 jika tidak ada TTL, -2 jika key tidak ada
# Increment / decrement -- atomik
r.set("halaman_dilihat", 0)
r.incr("halaman_dilihat") # +1
r.incrby("halaman_dilihat", 10) # +10
r.decr("halaman_dilihat") # -1
print(r.get("halaman_dilihat")) # "9"
# Simpan JSON (serialisasi manual)
pengguna = {"id": 1, "nama": "Budi", "email": "[email protected]"}
r.set("pengguna:1", json.dumps(pengguna), ex=3600)
# Ambil JSON
data_raw = r.get("pengguna:1")
pengguna = json.loads(data_raw) if data_raw else None
# Operasi atomik: SET hanya jika key belum ada (NX = Not eXists)
berhasil = r.set("kunci_unik", "nilai", nx=True, ex=60)
print("Set berhasil:", berhasil) # True jika baru diset, None jika sudah ada
Pola Caching #
Cache-Aside (Lazy Loading) #
Pola paling umum — baca dari cache dahulu, jika miss baru baca dari database dan simpan ke cache.
import json
from typing import Callable, Any
def get_dengan_cache(
r: redis.Redis,
cache_key: str,
fetch_fn: Callable, # fungsi untuk fetch data dari source (DB, API, dll.)
ttl_detik: int = 3600
) -> Any:
"""
Cache-Aside pattern:
1. Cek cache
2. Jika hit → kembalikan dari cache
3. Jika miss → fetch dari source, simpan ke cache, kembalikan
"""
# Cek cache
cached = r.get(cache_key)
if cached is not None:
print(f"Cache HIT: {cache_key}")
return json.loads(cached)
# Cache miss -- fetch dari source
print(f"Cache MISS: {cache_key}")
data = fetch_fn()
if data is not None:
r.set(cache_key, json.dumps(data, ensure_ascii=False), ex=ttl_detik)
return data
# Contoh penggunaan
def ambil_produk_dari_db(produk_id: int) -> dict:
# Simulasi query database
return {"id": produk_id, "nama": "Laptop Gaming", "harga": 18500000}
produk = get_dengan_cache(
r,
cache_key=f"produk:{101}",
fetch_fn=lambda: ambil_produk_dari_db(101),
ttl_detik=1800
)
print(produk)
Cache Invalidation #
def update_produk(r: redis.Redis, produk_id: int, data_baru: dict) -> None:
"""Update database dan invalidasi cache terkait."""
# 1. Update database (tidak ditampilkan)
# update_produk_di_db(produk_id, data_baru)
# 2. Hapus cache yang sudah tidak valid
r.delete(f"produk:{produk_id}")
# Atau update cache langsung (write-through)
r.set(
f"produk:{produk_id}",
json.dumps(data_baru, ensure_ascii=False),
ex=1800
)
def invalidasi_cache_pattern(r: redis.Redis, pattern: str) -> int:
"""
Hapus semua key yang cocok dengan pattern.
Gunakan dengan hati-hati di produksi -- scan bisa lambat untuk data besar.
"""
keys = r.keys(pattern) # contoh: "produk:*", "session:user-42:*"
if keys:
return r.delete(*keys)
return 0
# Hapus semua cache produk
jumlah_dihapus = invalidasi_cache_pattern(r, "produk:*")
print(f"{jumlah_dihapus} cache key dihapus.")
Hash — Struktur Mirip Dictionary #
Hash cocok untuk menyimpan objek dengan banyak field — lebih efisien dari menyimpan JSON string jika kamu sering mengakses field tertentu saja.
# Set hash
r.hset("pengguna:42", mapping={
"nama": "Budi Santoso",
"email": "[email protected]",
"usia": "28",
"aktif": "1"
})
# Get satu field
nama = r.hget("pengguna:42", "nama") # "Budi Santoso"
# Get semua field
semua = r.hgetall("pengguna:42") # dict semua field
print(semua) # {'nama': 'Budi Santoso', 'email': '...', 'usia': '28', 'aktif': '1'}
# Get beberapa field tertentu
nama, email = r.hmget("pengguna:42", ["nama", "email"])
# Update satu field saja (tanpa mengubah yang lain)
r.hset("pengguna:42", "usia", "29")
# Hapus satu field
r.hdel("pengguna:42", "aktif")
# Set TTL untuk seluruh hash key
r.expire("pengguna:42", 3600)
# Increment field numerik
r.hincrby("pengguna:42", "login_count", 1)
List — Antrian dan Stack #
List Redis adalah linked list — push/pop di kedua ujung berjalan dalam O(1). Cocok untuk task queue, activity feed, atau log terbatas.
# Push ke kanan (FIFO queue)
r.rpush("antrian:email", json.dumps({"to": "[email protected]", "subject": "Welcome"}))
r.rpush("antrian:email", json.dumps({"to": "[email protected]", "subject": "Order"}))
# Pop dari kiri (ambil yang pertama masuk)
item_raw = r.lpop("antrian:email")
item = json.loads(item_raw) if item_raw else None
# Blocking pop -- tunggu hingga ada item (berguna untuk worker)
item_raw = r.blpop("antrian:email", timeout=30) # tunggu 30 detik
# Panjang list
print(r.llen("antrian:email"))
# Ambil semua item tanpa menghapus
semua = r.lrange("antrian:email", 0, -1) # 0 = indeks pertama, -1 = terakhir
# Batasi list ke N item terakhir (sliding window)
r.ltrim("log:aktivitas", 0, 999) # simpan hanya 1000 item terbaru
Set — Koleksi Unik #
Set cocok untuk menyimpan koleksi unik: tag, followers, item yang sudah dikunjungi, atau anggota grup.
# Tambah ke set
r.sadd("tag:produk:101", "laptop", "gaming", "asus")
r.sadd("tag:produk:102", "laptop", "bisnis", "lenovo")
# Cek keberadaan
print(r.sismember("tag:produk:101", "gaming")) # True
print(r.sismember("tag:produk:101", "bisnis")) # False
# Semua anggota
print(r.smembers("tag:produk:101")) # {'laptop', 'gaming', 'asus'}
# Operasi himpunan
irisan = r.sinter("tag:produk:101", "tag:produk:102") # {'laptop'}
gabungan = r.sunion("tag:produk:101", "tag:produk:102") # semua tag
perbedaan = r.sdiff("tag:produk:101", "tag:produk:102") # {'gaming', 'asus'}
# Hapus dari set
r.srem("tag:produk:101", "asus")
Sorted Set — Leaderboard dan Rate Limiting #
Sorted Set adalah set dengan skor float — anggota diurutkan berdasarkan skor. Cocok untuk leaderboard, rating, antrian prioritas, dan rate limiting.
# Leaderboard skor game
r.zadd("leaderboard:game", {
"player_alice": 9500,
"player_budi": 8750,
"player_sari": 9200,
"player_andi": 7800
})
# Tambah/update skor
r.zincrby("leaderboard:game", 500, "player_budi") # +500 untuk budi
# Top 3 pemain (skor tertinggi)
top3 = r.zrange("leaderboard:game", 0, 2, desc=True, withscores=True)
print("Top 3:")
for rank, (pemain, skor) in enumerate(top3, start=1):
print(f" #{rank} {pemain}: {int(skor)}")
# Peringkat seorang pemain (0-based)
rank = r.zrevrank("leaderboard:game", "player_budi")
print(f"Peringkat budi: #{rank + 1}")
# Skor seorang pemain
skor = r.zscore("leaderboard:game", "player_budi")
print(f"Skor budi: {int(skor)}")
Rate Limiting dengan Sorted Set #
def cek_rate_limit(r: redis.Redis, user_id: str, maks_request: int = 10, window_detik: int = 60) -> bool:
"""
Sliding window rate limiter menggunakan Sorted Set.
Kembalikan True jika request diizinkan, False jika sudah melampaui batas.
"""
import time
key = f"rate_limit:{user_id}"
sekarang = time.time()
window_start = sekarang - window_detik
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # hapus request di luar window
pipe.zadd(key, {str(sekarang): sekarang}) # tambahkan request sekarang
pipe.zcard(key) # hitung request dalam window
pipe.expire(key, window_detik) # set TTL agar key tidak menumpuk
results = pipe.execute()
jumlah_request = results[2]
return jumlah_request <= maks_request
# Cek rate limit
for i in range(12):
diizinkan = cek_rate_limit(r, "user-42", maks_request=10, window_detik=60)
print(f"Request {i+1}: {'✓ diizinkan' if diizinkan else '✗ ditolak'}")
Pipeline — Batch Commands #
Pipeline mengirim banyak perintah sekaligus dalam satu network round-trip, mengurangi latensi secara signifikan.
# ANTI-PATTERN: operasi satu per satu (N network round-trips)
for i in range(100):
r.set(f"kunci:{i}", f"nilai:{i}") # ✗ -- 100 round-trip ke Redis
# BENAR: gunakan pipeline (1 round-trip untuk semua)
pipe = r.pipeline(transaction=False) # transaction=False = lebih cepat, non-atomic
for i in range(100):
pipe.set(f"kunci:{i}", f"nilai:{i}", ex=3600)
pipe.execute() # ✓ -- satu round-trip
# Pipeline dengan transaksi atomik (MULTI/EXEC)
with r.pipeline(transaction=True) as pipe:
pipe.set("saldo:alice", 1000)
pipe.set("saldo:budi", 500)
pipe.execute() # semua atau tidak sama sekali
# Pipeline dengan watch (optimistic locking)
def transfer_saldo(r: redis.Redis, dari: str, ke: str, jumlah: int) -> bool:
kunci_dari = f"saldo:{dari}"
kunci_ke = f"saldo:{ke}"
with r.pipeline() as pipe:
while True:
try:
pipe.watch(kunci_dari, kunci_ke) # pantau perubahan
saldo_dari = int(pipe.get(kunci_dari) or 0)
if saldo_dari < jumlah:
pipe.unwatch()
return False
pipe.multi() # mulai transaksi
pipe.decrby(kunci_dari, jumlah)
pipe.incrby(kunci_ke, jumlah)
pipe.execute() # commit -- akan gagal jika ada perubahan dari luar
return True
except redis.WatchError:
# Data berubah saat kita sedang proses -- coba lagi
continue
r.set("saldo:alice", 1000)
r.set("saldo:budi", 500)
berhasil = transfer_saldo(r, "alice", "budi", 300)
print(f"Transfer: {'berhasil' if berhasil else 'gagal'}")
print(f"Alice: {r.get('saldo:alice')}, Budi: {r.get('saldo:budi')}")
Distributed Lock #
Distributed lock memastikan hanya satu instance yang menjalankan operasi kritis pada satu waktu — penting di lingkungan dengan banyak server.
import uuid
import time
def acquire_lock(r: redis.Redis, lock_name: str, ttl_detik: int = 30) -> str | None:
"""
Acquire distributed lock. Kembalikan lock_value jika berhasil, None jika gagal.
Gunakan SET NX EX -- atomik, tidak bisa di-race condition.
"""
lock_key = f"lock:{lock_name}"
lock_value = str(uuid.uuid4()) # nilai unik per lock
# SET hanya jika belum ada (NX), dengan TTL (EX)
berhasil = r.set(lock_key, lock_value, nx=True, ex=ttl_detik)
return lock_value if berhasil else None
def release_lock(r: redis.Redis, lock_name: str, lock_value: str) -> bool:
"""
Release lock. Hanya bisa di-release oleh yang mengacquire (cek lock_value).
Gunakan Lua script untuk atomicity.
"""
lock_key = f"lock:{lock_name}"
# Script Lua: cek value lalu delete secara atomik
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = r.eval(lua_script, 1, lock_key, lock_value)
return bool(result)
# Penggunaan distributed lock
def proses_dengan_lock(r: redis.Redis, operasi_id: str) -> None:
lock_value = acquire_lock(r, f"operasi:{operasi_id}", ttl_detik=30)
if not lock_value:
print(f"Operasi {operasi_id} sedang diproses instance lain, skip.")
return
try:
print(f"Lock acquired untuk operasi {operasi_id}")
# Proses operasi kritis di sini
time.sleep(1)
print(f"Operasi {operasi_id} selesai.")
finally:
release_lock(r, f"operasi:{operasi_id}", lock_value)
print(f"Lock released untuk operasi {operasi_id}")
proses_dengan_lock(r, "generate-report")
Pub/Sub Redis #
Redis juga mendukung Pub/Sub sederhana untuk messaging real-time antar proses.
import threading
def publisher_loop(r: redis.Redis) -> None:
"""Publish pesan ke channel Redis."""
import time
for i in range(5):
payload = json.dumps({"event": "update", "seq": i})
r.publish("notifikasi:order", payload)
print(f"Published: {payload}")
time.sleep(1)
def subscriber_loop(r: redis.Redis) -> None:
"""Subscribe ke channel dan proses pesan."""
pubsub = r.pubsub()
pubsub.subscribe("notifikasi:order")
for message in pubsub.listen():
if message["type"] == "message":
payload = json.loads(message["data"])
print(f"Diterima: {payload}")
# Jalankan subscriber di thread terpisah
sub_thread = threading.Thread(target=subscriber_loop, args=(r,), daemon=True)
sub_thread.start()
# Publisher di main thread
publisher_loop(r)
Penanganan Error #
from redis.exceptions import ConnectionError, TimeoutError, ResponseError
def get_cache_aman(r: redis.Redis, key: str) -> str | None:
try:
return r.get(key)
except ConnectionError:
print("Redis tidak bisa dijangkau — fallback ke database.")
return None
except TimeoutError:
print("Redis timeout — coba lagi.")
return None
except ResponseError as e:
print(f"Redis response error: {e}")
return None
# Pola dengan fallback ke database jika Redis down
def ambil_data(r: redis.Redis, key: str, fetch_fn) -> Any:
try:
cached = r.get(key)
if cached:
return json.loads(cached)
except (ConnectionError, TimeoutError):
pass # Redis tidak tersedia, langsung ke database
data = fetch_fn()
try:
if data:
r.set(key, json.dumps(data), ex=3600)
except (ConnectionError, TimeoutError):
pass # Tidak bisa simpan cache, tapi data tetap dikembalikan
return data
Ringkasan #
decode_responses=True— selalu aktifkan agar nilai dikembalikan sebagaistrbukanbytes; tanpanya semua nilai berupab"...".- Connection Pool — buat satu
ConnectionPooluntuk seluruh aplikasi; redis-py mengelola koneksi secara otomatis dari pool.- TTL untuk semua cache key — selalu set
ex(detik) ataupx(milidetik) saat menyimpan cache agar memory tidak penuh secara perlahan.- Pipeline untuk operasi massal — gunakan
r.pipeline()untuk mengirim banyak perintah dalam satu round-trip; mengurangi latensi secara signifikan.- Hash vs JSON string — gunakan Hash jika sering mengakses field tertentu saja; gunakan JSON string jika selalu butuh seluruh objek.
- Sorted Set untuk leaderboard dan rate limiting — skor float memungkinkan pengurutan, peringkat, dan sliding window secara efisien.
SET NX EXuntuk distributed lock — atomik dan aman; gunakan Lua script untuk release agar cek-dan-hapus juga atomik.BLPOPuntuk task queue — blocking pop lebih efisien dari polling aktif; worker tidur sampai ada pekerjaan baru.- Cache-Aside pattern — cek cache → hit? kembalikan; miss? fetch dari source, simpan ke cache, kembalikan. Selalu sertakan TTL.
- Fallback ke database — tangkap
ConnectionErrordanTimeoutErroragar aplikasi tetap berjalan meski Redis down; Redis harus menjadi optimasi, bukan titik kegagalan tunggal.