Multi Threading #
Saat sebuah program perlu melakukan banyak hal sekaligus — mengunduh file sambil memperbarui UI, atau memproses banyak request jaringan secara bersamaan — menjalankan semuanya secara berurutan terasa lambat. Threading adalah solusinya: menjalankan beberapa alur eksekusi secara konkuren dalam satu proses. Python menyediakan modul threading yang lengkap untuk keperluan ini, namun ada satu kekhasan penting yang perlu kamu pahami sebelum memakainya — yaitu GIL (Global Interpreter Lock) yang mempengaruhi kapan threading benar-benar memberikan keuntungan performa.
Apa itu Thread dan GIL? #
Sebelum menulis kode, penting memahami model eksekusi Python. Thread adalah unit eksekusi terkecil di dalam sebuah proses — semua thread dalam satu proses berbagi memori yang sama, yang membuat komunikasi antar thread mudah namun juga rawan konflik.
Yang membuat Python unik adalah Global Interpreter Lock (GIL): sebuah mutex yang memastikan hanya satu thread yang bisa menjalankan bytecode Python pada satu waktu. Artinya, threading di Python tidak memberikan paralelisme sejati untuk operasi CPU-bound.
Kapan threading efektif di Python:
✓ I/O-bound tasks → download file, query database, request HTTP
(GIL dilepas saat menunggu I/O, thread lain bisa jalan)
✗ CPU-bound tasks → komputasi berat, image processing, machine learning
(GIL tidak dilepas, thread bergantian tapi tidak paralel)
→ gunakan multiprocessing untuk kasus ini
GIL adalah trade-off desain CPython untuk keamanan memori. Jika kamu membutuhkan paralelisme sejati untuk CPU-bound tasks, gunakan modulmultiprocessingatau library sepertinumpyyang melepas GIL secara internal.
Membuat Thread #
Ada dua cara membuat thread di Python: menggunakan fungsi biasa sebagai target, atau mewarisi kelas threading.Thread. Keduanya valid — pilih yang lebih sesuai dengan kompleksitas logika thread-mu.
Cara 1: Thread dengan Fungsi #
Cara paling ringkas. Cocok untuk logika yang sederhana dan tidak perlu state internal.
import threading
import time
def unduh_file(nama_file):
print(f"[{nama_file}] Mulai mengunduh...")
time.sleep(2) # simulasi I/O
print(f"[{nama_file}] Selesai diunduh.")
# Membuat dua thread yang berjalan bersamaan
thread1 = threading.Thread(target=unduh_file, args=("laporan.pdf",))
thread2 = threading.Thread(target=unduh_file, args=("gambar.png",))
thread1.start()
thread2.start()
# Tunggu keduanya selesai sebelum lanjut
thread1.join()
thread2.join()
print("Semua file selesai diunduh.")
Cara 2: Thread dengan Subkelas #
Cocok jika thread-mu membutuhkan state, metode tambahan, atau logika yang lebih kompleks.
import threading
import time
class ThreadUnduh(threading.Thread):
def __init__(self, nama_file):
super().__init__()
self.nama_file = nama_file
self.hasil = None # bisa menyimpan state
def run(self):
print(f"[{self.nama_file}] Mulai mengunduh...")
time.sleep(2)
self.hasil = f"Konten dari {self.nama_file}"
print(f"[{self.nama_file}] Selesai.")
thread1 = ThreadUnduh("laporan.pdf")
thread2 = ThreadUnduh("gambar.png")
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.hasil) # akses hasil setelah thread selesai
print(thread2.hasil)
Selalu panggilthread.join()jika main thread perlu menunggu thread lain selesai. Tanpajoin(), program bisa exit sebelum thread background selesai bekerja — menyebabkan output terpotong atau resource tidak di-cleanup dengan benar.
Race Condition dan Lock #
Karena semua thread berbagi memori yang sama, masalah muncul saat dua thread mencoba memodifikasi data yang sama secara bersamaan. Ini disebut race condition — hasilnya tidak deterministik dan sering menyebabkan bug yang sulit direproduksi.
import threading
# ANTI-PATTERN: counter tanpa proteksi
counter = 0
def tambah():
global counter
for _ in range(100000):
counter += 1 # operasi ini TIDAK atomic — bisa terjadi race condition
thread1 = threading.Thread(target=tambah)
thread2 = threading.Thread(target=tambah)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(counter) # hasilnya TIDAK selalu 200000 — bisa kurang karena race condition
Solusinya adalah threading.Lock — hanya satu thread yang bisa memegang lock pada satu waktu.
import threading
# BENAR: gunakan Lock untuk melindungi shared state
class Counter:
def __init__(self):
self.nilai = 0
self._lock = threading.Lock()
def tambah(self):
with self._lock: # otomatis acquire dan release
self.nilai += 1
counter = Counter()
def tambah_banyak():
for _ in range(100000):
counter.tambah()
thread1 = threading.Thread(target=tambah_banyak)
thread2 = threading.Thread(target=tambah_banyak)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(counter.nilai) # selalu 200000
Selain Lock, Python juga menyediakan RLock (Reentrant Lock) untuk kasus di mana thread yang sama perlu mengakquire lock yang sudah dipegangnya sendiri — misalnya dalam pemanggilan rekursif.
import threading
# RLock: thread yang sama bisa acquire berkali-kali
rlock = threading.RLock()
def fungsi_rekursif(n):
with rlock:
if n <= 0:
return
print(f"Level {n}")
fungsi_rekursif(n - 1) # Lock biasa akan deadlock di sini
fungsi_rekursif(3)
Komunikasi Antar Thread dengan Event #
Kadang satu thread perlu menunggu sinyal dari thread lain sebelum melanjutkan. threading.Event adalah primitif sinkronisasi untuk kasus ini — sebuah flag boolean yang bisa di-set atau di-clear oleh thread mana pun.
import threading
import time
event_siap = threading.Event()
def worker():
print("Worker: menunggu data siap...")
event_siap.wait() # blok sampai event di-set
print("Worker: data diterima, mulai memproses.")
def persiapan():
print("Persiapan: menyiapkan data...")
time.sleep(3)
print("Persiapan: data siap, memberi sinyal.")
event_siap.set() # kirim sinyal ke worker
t_worker = threading.Thread(target=worker)
t_persiapan = threading.Thread(target=persiapan)
t_worker.start()
t_persiapan.start()
t_worker.join()
t_persiapan.join()
Event juga bisa digunakan sebagai mekanisme stop untuk thread yang berjalan terus-menerus:
import threading
import time
stop_event = threading.Event()
def monitor():
while not stop_event.is_set():
print("Monitor: sistem berjalan normal...")
time.sleep(1)
print("Monitor: dihentikan.")
t = threading.Thread(target=monitor)
t.start()
time.sleep(4)
stop_event.set() # kirim sinyal berhenti
t.join()
print("Program selesai.")
Membatasi Akses dengan Semaphore #
threading.Semaphore berguna saat kamu ingin membatasi berapa banyak thread yang boleh mengakses sumber daya secara bersamaan — misalnya koneksi database, slot API, atau file I/O.
import threading
import time
# Maksimal 3 koneksi database bersamaan
pool_koneksi = threading.Semaphore(3)
def query_database(worker_id):
print(f"Worker-{worker_id}: menunggu slot koneksi...")
with pool_koneksi:
print(f"Worker-{worker_id}: terkoneksi, menjalankan query.")
time.sleep(2) # simulasi query
print(f"Worker-{worker_id}: selesai, melepas koneksi.")
# 7 worker bersaing untuk 3 slot koneksi
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(7)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Semua query selesai.")
ThreadPoolExecutor #
Membuat dan mengelola thread secara manual bisa melelahkan untuk jumlah task yang besar. concurrent.futures.ThreadPoolExecutor menyediakan abstraksi tingkat tinggi: kamu cukup submit task, pool yang mengurus thread lifecycle-nya.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def proses_item(item_id):
time.sleep(1) # simulasi I/O
return f"Hasil item-{item_id}"
# Pool dengan 4 worker thread
with ThreadPoolExecutor(max_workers=4) as executor:
# submit semua task sekaligus
futures = {executor.submit(proses_item, i): i for i in range(8)}
# ambil hasil saat selesai (urutan mungkin berbeda)
for future in as_completed(futures):
item_id = futures[future]
try:
hasil = future.result()
print(f"Item-{item_id} selesai: {hasil}")
except Exception as e:
print(f"Item-{item_id} error: {e}")
map() adalah alternatif lebih ringkas jika kamu tidak perlu error handling per-task:
from concurrent.futures import ThreadPoolExecutor
def unduh(url):
# simulasi unduh
return f"Konten dari {url}"
urls = ["https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3"]
with ThreadPoolExecutor(max_workers=3) as executor:
hasil = list(executor.map(unduh, urls))
for r in hasil:
print(r)
Kapan Menggunakan Threading vs Alternatif #
Gunakan threading jika:
✓ Task-nya I/O-bound (HTTP request, baca/tulis file, query DB)
✓ Membutuhkan shared state antar task
✓ Jumlah task tidak terlalu besar (ratusan, bukan ribuan)
Pertimbangkan multiprocessing jika:
✗ Task-nya CPU-bound (komputasi intensif, image processing)
✗ GIL menjadi bottleneck yang terukur
Pertimbangkan asyncio jika:
✗ Jumlah task sangat besar (ribuan koneksi bersamaan)
✗ Task sepenuhnya I/O-bound dan bisa ditulis async
✗ Ingin efisiensi memori lebih tinggi dari thread pool
Ringkasan #
- GIL membatasi threading untuk CPU-bound tasks — Python hanya menjalankan satu thread bytecode pada satu waktu; untuk komputasi berat, gunakan
multiprocessing.- Threading efektif untuk I/O-bound tasks — saat thread menunggu I/O, GIL dilepas dan thread lain bisa berjalan.
- Gunakan
Lockuntuk melindungi shared state — tanpa Lock, operasi baca-tulis bersamaan bisa menyebabkan race condition yang hasilnya tidak deterministik.RLockuntuk kasus rekursif — thread yang sama bisa mengakquire RLock berkali-kali tanpa deadlock.Eventuntuk sinyal antar thread — gunakanset()untuk memberi sinyal danwait()untuk menunggu.Semaphoreuntuk membatasi akses konkuren — ideal untuk connection pool atau rate limiting.ThreadPoolExecutoruntuk manajemen thread yang lebih mudah — abstraksi tingkat tinggi yang mengurus lifecycle thread secara otomatis.- Selalu
join()thread yang kamu buat — pastikan cleanup terjadi dengan benar sebelum program exit.