Kafka #
Apache Kafka adalah platform streaming terdistribusi yang digunakan untuk membangun aplikasi data real-time. Kafka memungkinkan publikasi, penyimpanan, dan pemrosesan aliran catatan secara berkelanjutan dan teratur.
1. Instalasi Kafka #
Langkah-langkah Instalasi Kafka di Linux #
-
Download Kafka:
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
-
Extract Kafka:
tar -xzf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0
-
Start Zookeeper: Kafka memerlukan Zookeeper untuk mengelola cluster:
bin/zookeeper-server-start.sh config/zookeeper.properties
-
Start Kafka: Setelah Zookeeper berjalan, mulai Kafka server:
bin/kafka-server-start.sh config/server.properties
2. Instalasi Kafka-Python Client #
Untuk menghubungkan dan berinteraksi dengan Kafka dari Python, kita menggunakan library kafka-python
. Instal library ini menggunakan pip
:
pip install kafka-python
3. Menghubungkan ke Kafka dan Operasi Dasar #
Berikut adalah contoh bagaimana menghubungkan ke Kafka dan melakukan operasi dasar seperti menghasilkan dan mengkonsumsi pesan.
Menghasilkan Pesan (Producer) #
from kafka import KafkaProducer
# Membuat koneksi ke Kafka
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Mengirim pesan ke topik
producer.send('my-topic', b'Hello, World!')
producer.flush() # Memastikan semua pesan telah terkirim
Mengkonsumsi Pesan (Consumer) #
from kafka import KafkaConsumer
# Membuat koneksi ke Kafka dan berlangganan ke topik
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
# Menerima dan mencetak pesan dari topik
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
4. Operasi Lanjutan #
Kafka mendukung berbagai operasi lanjutan seperti partisi, offset manajemen, dan pemrosesan stream.
Mengatur Partisi #
Anda bisa mengirim pesan ke partisi tertentu menggunakan key:
producer.send('my-topic', key=b'key', value=b'This message goes to a partition')
Mengelola Offset #
Anda bisa mengelola offset secara manual jika diperlukan:
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='my-group'
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
consumer.commit()
Pemrosesan Stream #
Untuk pemrosesan stream yang lebih kompleks, Anda bisa menggunakan library seperti faust
, yang merupakan pustaka pemrosesan stream berbasis Python yang terintegrasi dengan Kafka.
pip install faust
Contoh aplikasi faust
sederhana:
import faust
app = faust.App('myapp', broker='kafka://localhost:9092')
class MyRecord(faust.Record):
value: str
topic = app.topic('my-topic', value_type=MyRecord)
@app.agent(topic)
async def process(stream):
async for record in stream:
print(f'Received message: {record.value}')
if __name__ == '__main__':
app.main()
Kesimpulan #
Apache Kafka adalah platform yang kuat untuk membangun aplikasi streaming data real-time. Dengan kafka-python
dan pustaka pemrosesan stream seperti faust
, integrasi Kafka dengan aplikasi Python menjadi lebih mudah dan efisien. Untuk informasi lebih lanjut, Anda bisa merujuk ke dokumentasi resmi Kafka dan kafka-python.