紀錄前陣子嘗試建立Kafka平台模擬事件串流的過程。模擬的情境是,假設有人感染了COVID-19,然後我們要利用Kafka Producer發佈該事件通知,使Kafka Consumer接收到訊息並顯示出來。
What is Apache Kafka?
Apache Kafka is an open-source event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Kafka最初是由領英開發,由Scala和Java編寫,於2011年初開源。該專案的目標是為處理即時資料提供一個統一、高吞吐、低延遲的平台。
在Kafka中有幾個主要的概念:
Broker | 實現資料儲存的主機伺服器 |
Producer | 訊息的生產者 |
Consumer | 訊息的消費者 |
Topic | 訊息的分類 |
Partition | Topic中的訊息會被分為若干Partition,以提高訊息處理效率 |
Installation
shell
$ pip install kafka-python
Get Started
移動到解壓縮後的Kafka資料夾根目錄
- 啟動Zookeeper server
shell
$ bin/zookeeper-server-start.sh config/zookeeper.properties
- 啟動Kafka server
shell
$ bin/kafka-server-start.sh config/server.properties
*Apache Zookeeper是用來管理Kafka分散式叢集(Brokers)組態設定與其資源配置的服務。 *啟動server若顯示以下錯誤訊息:
shell
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
參考stackoverflow相關討論,修改/bin/kafka-run-class.sh這個檔案:
shell
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]]
shell
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([^.-]*).*/\1/p')
- 建立Topic 以下指令會建立一個具有一個副本、一個分區的名稱為covid-19的Topic:
shell
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic covid-19
- 確認已建立的Topic
shell
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
- Producer
python
#!usr/bin/env python3
# coding:utf-8
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
counter = 1
while True:
msg = '[Total: ' + str(counter) + ']Someone is contracted COVID-19! Be careful!'
msg = msg.encode('ascii')
producer.send('covid-19', msg)
time.sleep(5)
counter += 1
- Consumer
python
#!usr/bin/env python3
# coding:utf-8
from kafka import KafkaConsumer
consumer = KafkaConsumer('covid-19')
for msg in consumer:
print(msg.value.decode('ascii'))
可以看到Consumer都接收到了COVID-19事件通知⬇︎