KafkaJS で Kafka Consumer / Producer を書く

| 4 min read
Author: masahiro-kondo masahiro-kondoの画像

Apache Kafka ではクライアントライブラリが様々な言語で開発されています。Confluent 公式では、Java / C++ / Go / .NET / Python がサポートされています。

Kafka クライアント | Confluent Documentation

Node.js では KafkaJS が利用できます。個人開発のプロジェクトですが、記事執筆時点でスター3.2k、コントリビューター117人、採用プロジェクト10k以上とかなりメジャーなライブラリです。ドキュメントも充実しています。

KafkaJS · KafkaJS, a modern Apache Kafka client for Node.js

GitHub - tulios/kafkajs: A modern Apache Kafka client for node.js

Node.js で Kafka のクライアントを書くメリットは以下のようになるでしょう。

  • プロセスの起動が早い
  • コードがシンプルに書ける
  • TypeScript も使える

KafkaJS の公式ドキュメントは以下です。

Getting Started · KafkaJS

KafkaJS のインストール

#

Node.js のプロジェクトを作って KafkaJS をインストールするには以下のようにします。

mkdir kafka-clients && cd kafka-clients
npm init --y
npm i kafkajs

package.json は以下のように指定しました。ES Modules と top-level await を使うために main のファイル拡張子は mjs にしています。

  • package.json
{
"name": "kafka-clients",
"version": "0.1.0",
"main": "index.mjs",
"scripts": {
"start": "node index.mjs"
},
"dependencies": {
"kafkajs": "^2.2.4"
}
}
Information

top-level await を使うと、mjs ファイルのトップレベルで、Promise から値を取り出すことができるので、従来のように async 関数を定義する必要がなくなり、シンプルに書けます。

  • 従来の書き方
(async() => {
// 非同期な処理の呼び出し
})().catch(console.error);

Consumer を書く

#

Consumer のコードサンプルです。

  • index.mjs
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ // 1
clientId: 'my-app',
brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'test-group'});  // 2
await consumer.connect(); // 3
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); // 4

await consumer.run({
eachMessage: async ({ topic, partition, message }) => { // 5
console.log(message.value.toString()); // 6
const correlationId = message.headers['correlation-id']; // 7
if (correlationId) {
console.log(correlationId.toString());
}
},
});
  1. Kafka クライアントの作成では、clientIdbrokers の指定が必須になっています。クライアント設定の詳細については Client Configuration · KafkaJS を参照してください。
  2. Kafka クライアントの consumer メソッドで groupId を指定して、Consumer を生成します。
  3. Consumer オブジェクトの connect メソッドで Kafka クライアント生成時に定義した Kafka クラスターに接続します。
  4. 対象の Kafka トピックに subscribe します。
  5. Consumer オブジェクトの run メソッドに eachMessage ハンドラーを定義して Kafka トピックから取り出したメッセージを処理する function を書きます。
  6. メッセージを取得します。ログを出力しているだけですが、本来はメッセージに応じたビジネスロジックを実行します。
  7. ヘッダー情報の取り出しです。

eachMessage ハンドラーでは、subscribe している topic、割り当てられている partition、到達した message が取得できます。メッセージの本体は、value プロパティで、ヘッダーは headers プロパティで取り出せます。メッセージが JSON で送信されている場合は、JSON.parse でパースして取り出します。

Information

Client ID はアプリケーションの単位で命名します。Kafka へのリクエストを追跡する際に識別しやすい名前にするのが推奨されています。
Group ID は同一トピックに割り当てられる Consumer のグループに付与します。同一 Group ID の Consumer は同一トピックの異なる partition からメッセージを受信するように割り当てられます。

Consumer 処理の詳細なオプションについては Consuming Messages · KafkaJS を参照してください。

Producer を書く

#

Producer のサンプルです。

  • index.mjs
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});

const producer = kafka.producer(); // 1

await producer.connect(); // 2
await producer.send({ // 3
topic: 'test-topic',
messages: [
{
value: 'Hello KafkaJS user!',
headers: {
'correlation-id': '1234',
},
}
],
});
  1. Kafka クライアントの producer メソッドで、Producer を生成します。
  2. Producer の connect メソッドで Kafka クラスターに接続します。
  3. Producer の send メソッドで、トピックを指定してメッセージを送信します。

send メソッドでは、メッセージを配列で渡します。value にはメッセージの本体、headers にはヘッダー情報を含めます。メッセージが JavaScript オブジェクトの場合、JSON.stringify で変換します。

Producer の処理の詳細なオプションについては Producing Messages · KafkaJS を参照してください。

最後に

#

KafkaJS で Consumer / Producer を実装する方法をご紹介しました。Java の場合、Spring for Apache Kafka を使ってかなりシンプルに実装できます。それでも、Node.js + KafkaJS はプロジェクトを作って動かすまでの手間が格段に少なくライトウェイトです。プロジェクトの事情に合わせて適したクライアントライブラリを選択していただければと思います。

豆蔵デベロッパーサイト - 先週のアクセスランキング
  1. 基本から理解するJWTとJWT認証の仕組み (2022-12-08)
  2. AWS認定資格を12個すべて取得したので勉強したことなどをまとめます (2022-12-12)
  3. Nuxt3入門(第4回) - Nuxtのルーティングを理解する (2022-10-09)
  4. Nuxt3入門(第1回) - Nuxtがサポートするレンダリングモードを理解する (2022-09-25)
  5. Nuxt3入門(第8回) - Nuxt3のuseStateでコンポーネント間で状態を共有する (2022-10-28)
  6. Jest再入門 - 関数・モジュールモック編 (2022-07-03)
  7. 自然言語処理初心者が「GPT2-japanese」で遊んでみた (2022-07-08)
  8. IoT を使ってみる(その6:MQTTブローカー Mosquitto編) (2022-10-08)
  9. Nuxt3入門(第3回) - ユニバーサルフェッチでデータを取得する (2022-10-06)
  10. 統計学で避けて通れない自由度の話 (2022-06-20)