Multi Process #
Threading di Python dibatasi oleh GIL — hanya satu thread yang bisa menjalankan bytecode Python pada satu waktu. Untuk tugas yang benar-benar membutuhkan paralelisme CPU, seperti komputasi numerik, pemrosesan gambar, atau encoding video, multiprocessing adalah jawabannya. Setiap proses mendapat interpreter dan memori tersendiri, sehingga GIL bukan lagi penghalang. Konsekuensinya, berbagi data antar proses tidak bisa dilakukan semudah berbagi variabel — dibutuhkan mekanisme khusus seperti Queue, Pipe, dan Manager.
Proses vs Thread #
Sebelum masuk ke kode, penting memahami perbedaan mendasar antara proses dan thread, karena ini mempengaruhi cara kamu mendesain program.
Thread Process
──────────────────────────────────────
Memori Berbagi (shared) Terpisah (isolated)
GIL Terikat GIL Bebas GIL
Overhead Ringan Lebih berat (fork/spawn)
Komunikasi Variabel biasa + Lock Queue / Pipe / Manager
Cocok untuk I/O-bound tasks CPU-bound tasks
Crash isolation Satu thread crash Satu proses crash
bisa pengaruhi lain tidak pengaruhi lain
Di Windows, semua kode yang membuat proses harus berada di dalam blok if __name__ == '__main__':. Ini mencegah proses child melakukan spawn rekursif saat modul di-import ulang. Di Linux/macOS hal ini tidak wajib tapi tetap merupakan praktik yang baik.Membuat Proses #
Sama seperti threading, ada dua cara membuat proses: menggunakan fungsi target atau mewarisi multiprocessing.Process.
Cara 1: Proses dengan Fungsi #
import multiprocessing
import time
def kompresi_file(nama_file):
print(f"[{nama_file}] Mulai kompresi...")
time.sleep(2) # simulasi CPU-bound task
print(f"[{nama_file}] Kompresi selesai.")
if __name__ == '__main__':
p1 = multiprocessing.Process(target=kompresi_file, args=("video_1.mp4",))
p2 = multiprocessing.Process(target=kompresi_file, args=("video_2.mp4",))
p1.start()
p2.start()
p1.join()
p2.join()
print("Semua file selesai dikompresi.")
Cara 2: Proses dengan Subkelas #
Cocok untuk proses yang memerlukan state internal atau metode tambahan.
import multiprocessing
import time
class ProsesKompresi(multiprocessing.Process):
def __init__(self, nama_file):
super().__init__()
self.nama_file = nama_file
def run(self):
print(f"[{self.nama_file}] PID: {self.pid} — mulai kompresi...")
time.sleep(2)
print(f"[{self.nama_file}] Selesai.")
if __name__ == '__main__':
proses = [ProsesKompresi(f"video_{i}.mp4") for i in range(3)]
for p in proses:
p.start()
for p in proses:
p.join()
Process Pool #
Membuat proses satu per satu untuk banyak task tidak efisien karena overhead fork/spawn cukup besar. multiprocessing.Pool mengelola sekumpulan worker process yang siap menerima task — mirip ThreadPoolExecutor tapi untuk proses.
import multiprocessing
import time
def hitung_kuadrat(n):
time.sleep(0.1) # simulasi komputasi
return n * n
if __name__ == '__main__':
data = list(range(20))
# Pool dengan 4 worker process
with multiprocessing.Pool(processes=4) as pool:
hasil = pool.map(hitung_kuadrat, data)
print(hasil)
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Untuk kontrol lebih, gunakan pool.apply_async() agar bisa mengambil hasil secara asinkron:
import multiprocessing
def proses_chunk(chunk):
return sum(chunk)
if __name__ == '__main__':
# Bagi data besar menjadi chunk
data = list(range(1000))
ukuran_chunk = 100
chunks = [data[i:i+ukuran_chunk] for i in range(0, len(data), ukuran_chunk)]
with multiprocessing.Pool(processes=4) as pool:
futures = [pool.apply_async(proses_chunk, (chunk,)) for chunk in chunks]
hasil = [f.get() for f in futures] # ambil hasil masing-masing
print(f"Total: {sum(hasil)}") # 499500
ProcessPoolExecutor dari concurrent.futures adalah alternatif API yang lebih modern dan konsisten dengan ThreadPoolExecutor:
from concurrent.futures import ProcessPoolExecutor, as_completed
def faktorisasi(n):
faktor = []
d = 2
while d * d <= n:
while n % d == 0:
faktor.append(d)
n //= d
d += 1
if n > 1:
faktor.append(n)
return faktor
if __name__ == '__main__':
bilangan = [112272535095293, 112582705942171, 112272535095293]
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(faktorisasi, n): n for n in bilangan}
for future in as_completed(futures):
n = futures[future]
print(f"Faktor dari {n}: {future.result()}")
Komunikasi Antar Proses dengan Queue #
Karena proses tidak berbagi memori, data harus dikirim secara eksplisit. multiprocessing.Queue adalah struktur data thread-safe dan process-safe yang ideal untuk pola producer-consumer.
import multiprocessing
import time
def producer(queue, jumlah):
for i in range(jumlah):
item = f"task-{i}"
queue.put(item)
print(f"Producer: mengirim {item}")
time.sleep(0.3)
queue.put(None) # sentinel: tanda selesai
def consumer(queue, worker_id):
while True:
item = queue.get()
if item is None:
queue.put(None) # teruskan sentinel ke consumer lain
break
print(f"Consumer-{worker_id}: memproses {item}")
time.sleep(0.5)
if __name__ == '__main__':
q = multiprocessing.Queue()
p_prod = multiprocessing.Process(target=producer, args=(q, 6))
p_cons1 = multiprocessing.Process(target=consumer, args=(q, 1))
p_cons2 = multiprocessing.Process(target=consumer, args=(q, 2))
p_prod.start()
p_cons1.start()
p_cons2.start()
p_prod.join()
p_cons1.join()
p_cons2.join()
print("Semua task selesai diproses.")
Jangan gunakanqueue.empty()sebagai kondisi berhenti — ada race condition antara pengecekanempty()dan pengambilan item. Selalu gunakan sentinel value (nilai khusus sepertiNone) sebagai penanda bahwa producer sudah selesai mengirim data.
Komunikasi Dua Arah dengan Pipe #
multiprocessing.Pipe() membuat sepasang koneksi yang bisa saling mengirim dan menerima data. Cocok untuk komunikasi point-to-point antara dua proses.
import multiprocessing
def worker_hitung(conn):
data = conn.recv() # terima data dari parent
hasil = [x ** 2 for x in data]
conn.send(hasil) # kirim hasil kembali
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker_hitung, args=(child_conn,))
p.start()
parent_conn.send(list(range(10))) # kirim data ke child
hasil = parent_conn.recv() # tunggu dan terima hasil
p.join()
print(f"Hasil kuadrat: {hasil}")
Perbedaan Queue vs Pipe:
Queue:
✓ Bisa diakses oleh banyak proses sekaligus
✓ Thread-safe dan process-safe
✓ Cocok untuk pola producer-consumer dengan N worker
Pipe:
✓ Lebih cepat (overhead lebih rendah)
✓ Cocok untuk komunikasi dua arah point-to-point
✗ Hanya untuk dua endpoint — tidak cocok untuk banyak proses
Shared State dengan Manager #
Proses tidak bisa berbagi variabel Python biasa. Jika kamu perlu beberapa proses membaca dan menulis ke struktur data yang sama, gunakan multiprocessing.Manager yang mengelola objek di proses server terpisah dan membuatnya aksesibel secara proxy.
import multiprocessing
def kumpulkan_hasil(worker_id, hasil_list, hasil_dict, lock):
data = worker_id * 10 # simulasi komputasi
with lock:
hasil_list.append(data)
hasil_dict[f"worker-{worker_id}"] = data
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
shared_list = manager.list()
shared_dict = manager.dict()
lock = manager.Lock()
proses = [
multiprocessing.Process(
target=kumpulkan_hasil,
args=(i, shared_list, shared_dict, lock)
)
for i in range(5)
]
for p in proses:
p.start()
for p in proses:
p.join()
print("Hasil list:", sorted(shared_list))
print("Hasil dict:", dict(shared_dict))
Untuk kebutuhan yang lebih sederhana — counter atau flag boolean — multiprocessing.Value dan multiprocessing.Array lebih efisien dari Manager karena menggunakan shared memory langsung:
import multiprocessing
def tambah_counter(counter, lock):
for _ in range(10000):
with lock:
counter.value += 1
if __name__ == '__main__':
counter = multiprocessing.Value('i', 0) # 'i' = integer
lock = multiprocessing.Lock()
proses = [
multiprocessing.Process(target=tambah_counter, args=(counter, lock))
for _ in range(4)
]
for p in proses:
p.start()
for p in proses:
p.join()
print(f"Counter akhir: {counter.value}") # selalu 40000
Kapan Menggunakan Multiprocessing vs Alternatif #
Gunakan multiprocessing jika:
✓ Task CPU-bound: komputasi berat, encoding, parsing data besar
✓ Perlu paralelisme sejati melewati batas GIL
✓ Task bisa dibagi menjadi chunk independen
Pertimbangkan threading jika:
✗ Task I/O-bound: HTTP request, baca/tulis file, query DB
✗ Perlu berbagi state dengan mudah (threading lebih sederhana)
✗ Overhead proses terlalu besar untuk task yang singkat
Pertimbangkan asyncio jika:
✗ Task I/O-bound dengan concurrency sangat tinggi (ribuan task)
✗ Ingin single-threaded event loop tanpa overhead proses/thread
Ringkasan #
- Multiprocessing mengatasi GIL — setiap proses memiliki interpreter sendiri, sehingga komputasi CPU-bound bisa berjalan paralel sejati.
- Proses tidak berbagi memori — data harus dikirim eksplisit via
Queue,Pipe,Manager,Value, atauArray.Pool.map()untuk batch processing — cara paling ringkas mendistribusikan fungsi ke banyak input secara paralel.Queueuntuk producer-consumer — gunakan sentinel value (None) sebagai penanda selesai, bukanqueue.empty().Pipeuntuk komunikasi point-to-point — lebih cepat dari Queue tapi hanya untuk dua endpoint.Manageruntuk shared object kompleks — list dan dict yang bisa diakses banyak proses, dengan trade-off overhead lebih tinggi.ValuedanArrayuntuk shared memory sederhana — lebih efisien dari Manager untuk tipe data primitif.- Selalu gunakan
if __name__ == '__main__':— wajib di Windows, praktik baik di semua platform.