unisbadri.com » Python Java Golang Typescript Kotlin Ruby Rust Dart PHP
Kafka

Kafka #

Apache Kafka adalah platform streaming terdistribusi yang digunakan untuk membangun aplikasi data real-time. Kafka memungkinkan publikasi, penyimpanan, dan pemrosesan aliran catatan secara berkelanjutan dan teratur.

1. Instalasi Kafka #

Langkah-langkah Instalasi Kafka di Linux #

  1. Download Kafka:

    wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
    
  2. Extract Kafka:

    tar -xzf kafka_2.13-2.8.0.tgz
    cd kafka_2.13-2.8.0
    
  3. Start Zookeeper: Kafka memerlukan Zookeeper untuk mengelola cluster:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. Start Kafka: Setelah Zookeeper berjalan, mulai Kafka server:

    bin/kafka-server-start.sh config/server.properties
    

2. Instalasi Kafka-Python Client #

Untuk menghubungkan dan berinteraksi dengan Kafka dari Python, kita menggunakan library kafka-python. Instal library ini menggunakan pip:

pip install kafka-python

3. Menghubungkan ke Kafka dan Operasi Dasar #

Berikut adalah contoh bagaimana menghubungkan ke Kafka dan melakukan operasi dasar seperti menghasilkan dan mengkonsumsi pesan.

Menghasilkan Pesan (Producer) #

from kafka import KafkaProducer

# Membuat koneksi ke Kafka
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Mengirim pesan ke topik
producer.send('my-topic', b'Hello, World!')
producer.flush()  # Memastikan semua pesan telah terkirim

Mengkonsumsi Pesan (Consumer) #

from kafka import KafkaConsumer

# Membuat koneksi ke Kafka dan berlangganan ke topik
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group'
)

# Menerima dan mencetak pesan dari topik
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")

4. Operasi Lanjutan #

Kafka mendukung berbagai operasi lanjutan seperti partisi, offset manajemen, dan pemrosesan stream.

Mengatur Partisi #

Anda bisa mengirim pesan ke partisi tertentu menggunakan key:

producer.send('my-topic', key=b'key', value=b'This message goes to a partition')

Mengelola Offset #

Anda bisa mengelola offset secara manual jika diperlukan:

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id='my-group'
)

for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")
    consumer.commit()

Pemrosesan Stream #

Untuk pemrosesan stream yang lebih kompleks, Anda bisa menggunakan library seperti faust, yang merupakan pustaka pemrosesan stream berbasis Python yang terintegrasi dengan Kafka.

pip install faust

Contoh aplikasi faust sederhana:

import faust

app = faust.App('myapp', broker='kafka://localhost:9092')

class MyRecord(faust.Record):
    value: str

topic = app.topic('my-topic', value_type=MyRecord)

@app.agent(topic)
async def process(stream):
    async for record in stream:
        print(f'Received message: {record.value}')

if __name__ == '__main__':
    app.main()

Kesimpulan #

Apache Kafka adalah platform yang kuat untuk membangun aplikasi streaming data real-time. Dengan kafka-python dan pustaka pemrosesan stream seperti faust, integrasi Kafka dengan aplikasi Python menjadi lebih mudah dan efisien. Untuk informasi lebih lanjut, Anda bisa merujuk ke dokumentasi resmi Kafka dan kafka-python.