Multi Process #

Threading di Python dibatasi oleh GIL — hanya satu thread yang bisa menjalankan bytecode Python pada satu waktu. Untuk tugas yang benar-benar membutuhkan paralelisme CPU, seperti komputasi numerik, pemrosesan gambar, atau encoding video, multiprocessing adalah jawabannya. Setiap proses mendapat interpreter dan memori tersendiri, sehingga GIL bukan lagi penghalang. Konsekuensinya, berbagi data antar proses tidak bisa dilakukan semudah berbagi variabel — dibutuhkan mekanisme khusus seperti Queue, Pipe, dan Manager.

Proses vs Thread #

Sebelum masuk ke kode, penting memahami perbedaan mendasar antara proses dan thread, karena ini mempengaruhi cara kamu mendesain program.

                  Thread                      Process
                  ──────────────────────────────────────
Memori            Berbagi (shared)            Terpisah (isolated)
GIL               Terikat GIL                 Bebas GIL
Overhead          Ringan                      Lebih berat (fork/spawn)
Komunikasi        Variabel biasa + Lock       Queue / Pipe / Manager
Cocok untuk       I/O-bound tasks             CPU-bound tasks
Crash isolation   Satu thread crash           Satu proses crash
                  bisa pengaruhi lain         tidak pengaruhi lain
Di Windows, semua kode yang membuat proses harus berada di dalam blok if __name__ == '__main__':. Ini mencegah proses child melakukan spawn rekursif saat modul di-import ulang. Di Linux/macOS hal ini tidak wajib tapi tetap merupakan praktik yang baik.

Membuat Proses #

Sama seperti threading, ada dua cara membuat proses: menggunakan fungsi target atau mewarisi multiprocessing.Process.

Cara 1: Proses dengan Fungsi #

import multiprocessing
import time

def kompresi_file(nama_file):
    print(f"[{nama_file}] Mulai kompresi...")
    time.sleep(2)  # simulasi CPU-bound task
    print(f"[{nama_file}] Kompresi selesai.")

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=kompresi_file, args=("video_1.mp4",))
    p2 = multiprocessing.Process(target=kompresi_file, args=("video_2.mp4",))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("Semua file selesai dikompresi.")

Cara 2: Proses dengan Subkelas #

Cocok untuk proses yang memerlukan state internal atau metode tambahan.

import multiprocessing
import time

class ProsesKompresi(multiprocessing.Process):
    def __init__(self, nama_file):
        super().__init__()
        self.nama_file = nama_file

    def run(self):
        print(f"[{self.nama_file}] PID: {self.pid} — mulai kompresi...")
        time.sleep(2)
        print(f"[{self.nama_file}] Selesai.")

if __name__ == '__main__':
    proses = [ProsesKompresi(f"video_{i}.mp4") for i in range(3)]
    for p in proses:
        p.start()
    for p in proses:
        p.join()

Process Pool #

Membuat proses satu per satu untuk banyak task tidak efisien karena overhead fork/spawn cukup besar. multiprocessing.Pool mengelola sekumpulan worker process yang siap menerima task — mirip ThreadPoolExecutor tapi untuk proses.

import multiprocessing
import time

def hitung_kuadrat(n):
    time.sleep(0.1)  # simulasi komputasi
    return n * n

if __name__ == '__main__':
    data = list(range(20))

    # Pool dengan 4 worker process
    with multiprocessing.Pool(processes=4) as pool:
        hasil = pool.map(hitung_kuadrat, data)

    print(hasil)
    # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

Untuk kontrol lebih, gunakan pool.apply_async() agar bisa mengambil hasil secara asinkron:

import multiprocessing

def proses_chunk(chunk):
    return sum(chunk)

if __name__ == '__main__':
    # Bagi data besar menjadi chunk
    data = list(range(1000))
    ukuran_chunk = 100
    chunks = [data[i:i+ukuran_chunk] for i in range(0, len(data), ukuran_chunk)]

    with multiprocessing.Pool(processes=4) as pool:
        futures = [pool.apply_async(proses_chunk, (chunk,)) for chunk in chunks]
        hasil = [f.get() for f in futures]  # ambil hasil masing-masing

    print(f"Total: {sum(hasil)}")  # 499500

ProcessPoolExecutor dari concurrent.futures adalah alternatif API yang lebih modern dan konsisten dengan ThreadPoolExecutor:

from concurrent.futures import ProcessPoolExecutor, as_completed

def faktorisasi(n):
    faktor = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            faktor.append(d)
            n //= d
        d += 1
    if n > 1:
        faktor.append(n)
    return faktor

if __name__ == '__main__':
    bilangan = [112272535095293, 112582705942171, 112272535095293]

    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = {executor.submit(faktorisasi, n): n for n in bilangan}
        for future in as_completed(futures):
            n = futures[future]
            print(f"Faktor dari {n}: {future.result()}")

Komunikasi Antar Proses dengan Queue #

Karena proses tidak berbagi memori, data harus dikirim secara eksplisit. multiprocessing.Queue adalah struktur data thread-safe dan process-safe yang ideal untuk pola producer-consumer.

import multiprocessing
import time

def producer(queue, jumlah):
    for i in range(jumlah):
        item = f"task-{i}"
        queue.put(item)
        print(f"Producer: mengirim {item}")
        time.sleep(0.3)
    queue.put(None)  # sentinel: tanda selesai

def consumer(queue, worker_id):
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # teruskan sentinel ke consumer lain
            break
        print(f"Consumer-{worker_id}: memproses {item}")
        time.sleep(0.5)

if __name__ == '__main__':
    q = multiprocessing.Queue()

    p_prod = multiprocessing.Process(target=producer, args=(q, 6))
    p_cons1 = multiprocessing.Process(target=consumer, args=(q, 1))
    p_cons2 = multiprocessing.Process(target=consumer, args=(q, 2))

    p_prod.start()
    p_cons1.start()
    p_cons2.start()

    p_prod.join()
    p_cons1.join()
    p_cons2.join()

    print("Semua task selesai diproses.")
Jangan gunakan queue.empty() sebagai kondisi berhenti — ada race condition antara pengecekan empty() dan pengambilan item. Selalu gunakan sentinel value (nilai khusus seperti None) sebagai penanda bahwa producer sudah selesai mengirim data.

Komunikasi Dua Arah dengan Pipe #

multiprocessing.Pipe() membuat sepasang koneksi yang bisa saling mengirim dan menerima data. Cocok untuk komunikasi point-to-point antara dua proses.

import multiprocessing

def worker_hitung(conn):
    data = conn.recv()  # terima data dari parent
    hasil = [x ** 2 for x in data]
    conn.send(hasil)    # kirim hasil kembali
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=worker_hitung, args=(child_conn,))
    p.start()

    parent_conn.send(list(range(10)))  # kirim data ke child
    hasil = parent_conn.recv()         # tunggu dan terima hasil

    p.join()
    print(f"Hasil kuadrat: {hasil}")

Perbedaan Queue vs Pipe:

Queue:
  ✓ Bisa diakses oleh banyak proses sekaligus
  ✓ Thread-safe dan process-safe
  ✓ Cocok untuk pola producer-consumer dengan N worker

Pipe:
  ✓ Lebih cepat (overhead lebih rendah)
  ✓ Cocok untuk komunikasi dua arah point-to-point
  ✗ Hanya untuk dua endpoint — tidak cocok untuk banyak proses

Shared State dengan Manager #

Proses tidak bisa berbagi variabel Python biasa. Jika kamu perlu beberapa proses membaca dan menulis ke struktur data yang sama, gunakan multiprocessing.Manager yang mengelola objek di proses server terpisah dan membuatnya aksesibel secara proxy.

import multiprocessing

def kumpulkan_hasil(worker_id, hasil_list, hasil_dict, lock):
    data = worker_id * 10  # simulasi komputasi
    with lock:
        hasil_list.append(data)
        hasil_dict[f"worker-{worker_id}"] = data

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        shared_list = manager.list()
        shared_dict = manager.dict()
        lock = manager.Lock()

        proses = [
            multiprocessing.Process(
                target=kumpulkan_hasil,
                args=(i, shared_list, shared_dict, lock)
            )
            for i in range(5)
        ]

        for p in proses:
            p.start()
        for p in proses:
            p.join()

        print("Hasil list:", sorted(shared_list))
        print("Hasil dict:", dict(shared_dict))

Untuk kebutuhan yang lebih sederhana — counter atau flag boolean — multiprocessing.Value dan multiprocessing.Array lebih efisien dari Manager karena menggunakan shared memory langsung:

import multiprocessing

def tambah_counter(counter, lock):
    for _ in range(10000):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    counter = multiprocessing.Value('i', 0)  # 'i' = integer
    lock = multiprocessing.Lock()

    proses = [
        multiprocessing.Process(target=tambah_counter, args=(counter, lock))
        for _ in range(4)
    ]

    for p in proses:
        p.start()
    for p in proses:
        p.join()

    print(f"Counter akhir: {counter.value}")  # selalu 40000

Kapan Menggunakan Multiprocessing vs Alternatif #

Gunakan multiprocessing jika:
  ✓ Task CPU-bound: komputasi berat, encoding, parsing data besar
  ✓ Perlu paralelisme sejati melewati batas GIL
  ✓ Task bisa dibagi menjadi chunk independen

Pertimbangkan threading jika:
  ✗ Task I/O-bound: HTTP request, baca/tulis file, query DB
  ✗ Perlu berbagi state dengan mudah (threading lebih sederhana)
  ✗ Overhead proses terlalu besar untuk task yang singkat

Pertimbangkan asyncio jika:
  ✗ Task I/O-bound dengan concurrency sangat tinggi (ribuan task)
  ✗ Ingin single-threaded event loop tanpa overhead proses/thread

Ringkasan #

  • Multiprocessing mengatasi GIL — setiap proses memiliki interpreter sendiri, sehingga komputasi CPU-bound bisa berjalan paralel sejati.
  • Proses tidak berbagi memori — data harus dikirim eksplisit via Queue, Pipe, Manager, Value, atau Array.
  • Pool.map() untuk batch processing — cara paling ringkas mendistribusikan fungsi ke banyak input secara paralel.
  • Queue untuk producer-consumer — gunakan sentinel value (None) sebagai penanda selesai, bukan queue.empty().
  • Pipe untuk komunikasi point-to-point — lebih cepat dari Queue tapi hanya untuk dua endpoint.
  • Manager untuk shared object kompleks — list dan dict yang bisa diakses banyak proses, dengan trade-off overhead lebih tinggi.
  • Value dan Array untuk shared memory sederhana — lebih efisien dari Manager untuk tipe data primitif.
  • Selalu gunakan if __name__ == '__main__': — wajib di Windows, praktik baik di semua platform.

← Sebelumnya: Multi Threading   Berikutnya: Context Manager →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact