Skip to content
Apache Kafka安裝測試
📆2020-08-26 | 📂Data Science

紀錄前陣子嘗試建立Kafka平台模擬事件串流的過程。模擬的情境是,假設有人感染了COVID-19,然後我們要利用Kafka Producer發佈該事件通知,使Kafka Consumer接收到訊息並顯示出來。


What is Apache Kafka?

Apache Kafka官網

Apache Kafka is an open-source event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

維基中文頁面

Kafka最初是由領英開發,由Scala和Java編寫,於2011年初開源。該專案的目標是為處理即時資料提供一個統一、高吞吐、低延遲的平台。

在Kafka中有幾個主要的概念:

Broker實現資料儲存的主機伺服器
Producer訊息的生產者
Consumer訊息的消費者
Topic訊息的分類
PartitionTopic中的訊息會被分為若干Partition,以提高訊息處理效率

Installation

shell
$ pip install kafka-python

Get Started

移動到解壓縮後的Kafka資料夾根目錄

  • 啟動Zookeeper server
shell
$ bin/zookeeper-server-start.sh config/zookeeper.properties
  • 啟動Kafka server
shell
$ bin/kafka-server-start.sh config/server.properties

Apache Zookeeper是用來管理Kafka分散式叢集(Brokers)組態設定與其資源配置的服務。 *啟動server若顯示以下錯誤訊息:

shell
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

參考stackoverflow相關討論,修改/bin/kafka-run-class.sh這個檔案:

shell
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]]
shell
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([^.-]*).*/\1/p')
  • 建立Topic 以下指令會建立一個具有一個副本、一個分區的名稱為covid-19的Topic:
shell
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic covid-19
  • 確認已建立的Topic
shell
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
  • Producer
python
#!usr/bin/env python3
# coding:utf-8


import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
counter = 1
while True:
    msg = '[Total: ' + str(counter) + ']Someone is contracted COVID-19! Be careful!'
    msg = msg.encode('ascii')
    producer.send('covid-19', msg)
    time.sleep(5)
    counter += 1
  • Consumer
python
#!usr/bin/env python3
# coding:utf-8


from kafka import KafkaConsumer
consumer = KafkaConsumer('covid-19')
for msg in consumer:
    print(msg.value.decode('ascii'))

可以看到Consumer都接收到了COVID-19事件通知⬇︎

Ref. Hello World In Kafka Using Python

Last updated: