Strimzi - Kubernetes で Kafka を運用するための Operators

| 29 min read
Author: masahiro-kondo masahiro-kondoの画像

Apache Kafka は高速でスケーラブルな pub/sub 型の分散メッセージングシステムです。Kafka クラスターに配置された Topic に Consumer アプリが Subscribe し、Producer が送信するメッセージを順次処理していきます。

Kafka はコンテナや Kubernetes 以前からの歴史があり、近年 Kubernetes で運用する事例も増えてきています。この記事では Kafka を Kubernetes で運用するための OSS の Operator Strimzi についてご紹介します。

Kafka の構成

#

Kafka のスケーラビリティはその構成により実現されています。メッセージが流れる Topic を Partition に分割し、多数の Consumer がメッセージを受信できるようにすると共に、Topic を管理する Broker を複数マシンからなるクラスターとし、異なる Broker で Partition を分散管理することで、冗長化を実現しています。

Broker クラスターを管理する ZooKeeper というサーバー(これもクラスター構成)があり、Broker の状態を監視し、1つの Broker が使えなくなった時に、他の Broker を割り当てるなどの制御を行っています。

Kafka はこのように、かなり複雑な分散システムです。Kubernetes は Web アプリケーションなどのステートレスなワークロードの運用は得意ですが、このような状態を持つ分散システムの運用は難易度が上がります。

bitnami の Helm Chart によるデプロイ構成

#

bitnami から提供されている Helm Chart で Kubernetes 上の Kafka 構成を見てみましょう。

Helm Charts to deploy Apache Kafka in Kubernetes

Minikube に bitnami-kafka という namespace を作ってインストールしました。

helm repo add bitnami https://charts.bitnami.com/bitnami
kubectl create ns bitnami-kafka
helm install sample-cluster bitnami/kafka -n bitnami-kafka

作成されたオブジェクトを見てみます。

$ kubectl get sts,po,svc,pvc,sa -n bitnami-kafka
NAME READY AGE
statefulset.apps/sample-cluster-kafka 1/1 64m
statefulset.apps/sample-cluster-zookeeper 1/1 64m

NAME READY STATUS RESTARTS AGE
pod/sample-cluster-kafka-0 1/1 Running 2 (63m ago) 64m
pod/sample-cluster-zookeeper-0 1/1 Running 0 64m

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/sample-cluster-kafka ClusterIP 10.109.112.100 <none> 9092/TCP 64m
service/sample-cluster-kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 64m
service/sample-cluster-zookeeper ClusterIP 10.99.14.27 <none> 2181/TCP,2888/TCP,3888/TCP 64m
service/sample-cluster-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 64m

NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
persistentvolumeclaim/data-sample-cluster-kafka-0 Bound pvc-a142bf96-fc0a-479d-b773-9f799e637c99 8Gi RWO hostpath 64m
persistentvolumeclaim/data-sample-cluster-zookeeper-0 Bound pvc-1d4d9ab9-a5ad-48cc-85b7-73cb52f2e8a0 8Gi RWO hostpath 64m

NAME SECRETS AGE
serviceaccount/default 1 66m
serviceaccount/sample-cluster-kafka 1 64m

Broker クラスター、ZooKeeper クラスターは、Kubernetes の StatefulSet[1] として作成されます。Broker の Topic 及びメッセージや、ZooKeeper で管理される Broker クラスターの状態などのメタデータは、PV(Persistence Volume) に保存されます。

Topic を作成する(ややメンドウ)

#

Helm Chart でデプロイされた Kubernetes 上の Kafka クラスターの操作について考えると、Kafka クラスターを構成する Broker などのオブジェクトは StatefulSet、Pod、Service などで表現されますが、Topic や Partition などについては Kafka 世界の登場人物であるため、Kubernetes のオブジェクトとして表現されず、kubectl などで操作することはできません。

Topic を作成するには、作業用の Kafka Pod を Kafka のクライアントとして起動して、提供されているシェルを使用する方法があります。まず、bitnami の Kafka のコンテナイメージを利用して、Pod を起動します。

kubectl -n kafka run kafka-client --restart='Never' --image docker.io/bitnami/kafka --command -- sleep infinity

Topic を新規作成するには、この Pod に入って、コマンドを叩くか、次のように here document でコマンドを実行します。

$ kubectl -n kafka exec -i kafka-client -- bash << 'EOS'
kafka-topics.sh --create --bootstrap-server=sample-cluster-kafka:9092 --replication-factor 1 --partitions 2 --topic topic-1
EOS


Created topic topic-1.

作成された Topic をリスト表示してみます。

$ kubectl -n kafka exec -i kafka-client -- bash << 'EOS'
kafka-topics.sh --list --bootstrap-server=sample-cluster-kafka:9092
EOS


topic-1

そんなに難しい操作ではありませんが、Pod を起動したりするところがやや面倒です。

Broker を増やしてリバランスする (かなりメンドウ)

#

Kafka の通常運用としては、以下のようなオペレーションを実行する必要があります。

  • Broker を増やす
  • Topic を追加する
  • Topic の partition を増やす
  • Kafka のバージョンを上げる

Topic は自動作成もできて、Partition 数や replication-factor (Partition をいくつの Broker に分散配置するか) はデフォルト値を設定できるため、ある程度 Kafka にお任せできますが、スケールアウトのため Broker を追加したり、Topic の Partition を増やす場合は、偏りが発生するため、これを手動でリバランスする必要があります。Kafka 提供のシェルスクリプトを使ってリバランスするオペレーションはけっこう煩雑です。たとえば、Broker が3台で、topic-1 が3つの Partition で構成されていて、Topic の状態を出力すると以下のようになっているとします。

$ kafka-topics.sh --describe --bootstrap-server=sample-cluster-kafka:9092 --topic topic-1
Topic: topic-1 TopicId: _MgPr5IFSKK1XjaYlIkhnQ PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: topic-1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-1 Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: topic-1 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0

この出力は3つの Partition にそれぞれ、1台の Leader と 2台の Replica の Broker が割り当てられていることを表しています。Leader が何らかの原因で落ちると、Replica が Leader に昇格します。

Helm Chart の設定で、Broker を 3台から4台に増やして更新します。(values.yaml で replicas を 3 → 4に変更)

helm upgrade -n bitnami-kafka sample-cluster bitnami/kafka -f values.yaml

Broker は増えますが、再度 Topic の状態を見ても配置は変わっておらず、topic-1 にとっては新規追加された Broker は存在しないのと同じになっています。

$ kafka-topics.sh --describe --bootstrap-server=sample-cluster-kafka:9092 --topic topic-1
Topic: topic-1 TopicId: _MgPr5IFSKK1XjaYlIkhnQ PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: topic-1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-1 Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: topic-1 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0

Partition を全ての Broker を使って最適配置するのは以下のような手順となります。

まず、再配置対象の Topic を指定した JSON ファイルを作成します。

cat <<EOF > topics-to-move.json
{
"topics": [{ "topic": "topic-1" }],
"version": 1
}
EOF

kafka-reassign-partitions.sh を実行して、再配置プランを提案してもらいます。--broker-list に列挙しているのが、使用したい broker の ID です。現状の配置に続いて、再配置の提案が出力されます。

$ kafka-reassign-partitions.sh --bootstrap-server=sample-cluster-kafka:9092 --topics-to-move-json-file topics-to-move.json --broker-list=0,1,2,3 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-1","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-1","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-1","partition":2,"replicas":[1,0],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-1","partition":0,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"topic-1","partition":1,"replicas":[3,0],"log_dirs":["any","any"]},{"topic":"topic-1","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]}

提案されたプランの JSON をコピーして、再配置用の JSON ファイルを作成します。

cat <<EOF > expand-cluster-reassignment.json
{"version":1,"partitions":[{"topic":"topic-1","partition":0,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"topic-1","partition":1,"replicas":[3,0],"log_dirs":["any","any"]},{"topic":"topic-1","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]}
EOF

上記の JSON ファイルを入力として、kafka-reassign-partitions.sh を実行し、再配置を実行します。

$ kafka-reassign-partitions.sh --bootstrap-server=sample-cluster-kafka:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
:
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-1-0,topic-1-1,topic-1-2

再配置結果を確認します。全ての Broker が各 Partirion の Leader と Replica に再配置されました。

$ kafka-topics.sh --describe --bootstrap-server=sample-cluster-kafka:9092 --topic topic-1
Topic: topic-1 TopicId: _MgPr5IFSKK1XjaYlIkhnQ PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: topic-1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: topic-1 Partition: 1 Leader: 3 Replicas: 3,0 Isr: 0,3
Topic: topic-1 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1,0

以上「読んでいただいてお疲れ様でした」って書きたくなるほど面倒です。それほど頻度は高くないとはいえ通常運用の範囲ですし、作業用の Pod に入ってコマンドをチマチマ叩くのは効率悪そうです。

Strimzi の導入

#

とても 前置きが長くなってしまいましたが、やっとここで Strimzi を導入して使っていきます。

Strimzi - Apache Kafka on Kubernetes

Strimzi は Cloud Native Computing Foundation の Sandbox project としてホスティングされている Kafka の Operator です。OpenShift をサポートするため RedHat に所属する開発者もいますが、コミュニティベースの開発活動が続けられています。

Quick Start に従って記事執筆時点の最新バージョン(v0.29.0)を設定していきます。

Strimzi Quick Start guide (0.29.0)

Strimzi の GitHub Release ページからダウンロードして展開。ディレクトリには、各種 Strimzi を使用するための各種 Manifest が格納されています。

curl -LO https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.29.0/strimzi-0.29.0.zip
unzip strimzi-0.29.0.zip

今回は、Strimzi を kafka namespace に作成するので、RoleBinding の Manifest に書かれている namespace を置換します。[2]

cd strimzi-0.29.0
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml を編集して、クラスター用の namespace (ここでは my-kafka-project) を直接指定します。

# ...
env:
- name: STRIMZI_NAMESPACE
value: my-kafka-project
# valueFrom:
# fieldRef:
# fieldPath: metadata.namespace
# ...

必要な namespace を作成します。

kubectl create ns kafka
kubectl create ns my-kafka-project

Strimzi の Cluster Operator が namespace my-kafka-project を監視できるよう RoleBinding を my-kafka-project に指定します。

$ kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created

$ kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created

Strimzi の CRD (Cusotm Resouce Definition) と RBAC を namespace kafka にデプロイします。

$ kubectl create -f install/cluster-operator/ -n kafka
serviceaccount/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-client created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-client-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/strimzipodsets.core.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created
configmap/strimzi-cluster-operator created
deployment.apps/strimzi-cluster-operator created

これで、Strimzi のインストールは終わりました。次に、CRD Kafka を使って Kafka クラスターを作ります。以下は公式ドキュメントに従って、here document により作成していますが、YAML ファイルを作成して適用しても同じです。

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
EOF

クラスターが Ready になるまで wait しておくと1分ぐらいで Ready になります。

$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n my-kafka-project
kafka.kafka.strimzi.io/my-cluster condition met

Strimzi でデプロイされた Kafka クラスター構成

#

my-kafka-project に作成されたオブジェクトを見てみます。bitnami の chart でデプロイされたものと同じような構成ですが、Strimzi の entity-operator の Deployment、Pod、ServiceAccount ができています。

$ strimzi-0.29.0 kubectl get deploy,sts,po,svc,pvc,sa -n my-kafka-project
NAME READY UP-TO-DATE AVAILABLE AGE
my-cluster-entity-operator 1/1 1 1 2m22s

NAME READY AGE
statefulset.apps/my-cluster-kafka 1/1 2m44s
statefulset.apps/my-cluster-zookeeper 1/1 3m46s

NAME READY STATUS RESTARTS AGE
pod/my-cluster-entity-operator-79d44dd6c-5q9hz 3/3 Running 0 2m22s
pod/my-cluster-kafka-0 1/1 Running 0 2m44s
pod/my-cluster-zookeeper-0 1/1 Running 0 3m46s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/my-cluster-kafka-0 NodePort 10.99.189.164 <none> 9094:31594/TCP 2m44s
service/my-cluster-kafka-bootstrap ClusterIP 10.96.101.128 <none> 9091/TCP,9092/TCP,9093/TCP 2m44s
service/my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 2m44s
service/my-cluster-kafka-external-bootstrap NodePort 10.104.10.45 <none> 9094:31759/TCP 2m44s
service/my-cluster-zookeeper-client ClusterIP 10.111.188.223 <none> 2181/TCP 3m46s
service/my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 3m46s

NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
persistentvolumeclaim/data-0-my-cluster-kafka-0 Bound pvc-ca10ad68-fd23-4782-8ccf-22b87d355d5e 100Gi RWO hostpath 2m44s
persistentvolumeclaim/data-my-cluster-zookeeper-0 Bound pvc-81adf737-195d-4516-a750-047fe5cff30b 100Gi RWO hostpath 3m46s

NAME SECRETS AGE
serviceaccount/default 1 14m
serviceaccount/my-cluster-entity-operator 1 2m22s
serviceaccount/my-cluster-kafka 1 2m44s
serviceaccount/my-cluster-zookeeper 1 3m46s

CRD をデプロイした namespace kafka に作成されたオブジェクトです。

kubectl -n kafka get crd,sa,deploy,po
NAME CREATED AT
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io 2022-05-25T00:22:08Z
customresourcedefinition.apiextensions.k8s.io/strimzipodsets.core.strimzi.io 2022-05-25T00:22:08Z

NAME SECRETS AGE
serviceaccount/default 1 6h8m
serviceaccount/strimzi-cluster-operator 1 5h41m

NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/strimzi-cluster-operator 1/1 1 1 5h41m

NAME READY STATUS RESTARTS AGE
pod/strimzi-cluster-operator-d5f95d997-nvpqm 1/1 Running 38 (6m4s ago) 5h41m

それぞれの namespace のオブジェクトを簡単に図示してみました。

namespace kafka に配置された CRD でさまざまなオペレーションを Kuberenetes のインターフェースを通して操作できるようになります。

Topic を作成する (Strimzi 編)

#

Strimzi では Kafka の Topic は KafkaTopic という CRD で定義されており、Manifest を適用するだけで作成できてしまいます。

$ cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 3
replicas: 1
EOF

kafkatopic.kafka.strimzi.io/my-topic created

Kubernetes のオブジェクトとして、Topic を kubectl で確認できます。

$ kubectl -n my-kafka-project get kafkatopic my-topic       
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
my-topic my-cluster 3 1 True

ここで、bitnami の Helm Chart の時と同様に Strimzi で使われている kafka イメージを使ってクライアント用の Pod を起動して確認していきます。

kubectl -n my-kafka-project run kafka-client --restart='Never' --image quay.io/strimzi/kafka:0.29.0-kafka-3.2.0 --command -- sleep infinity

Topic の Manifest を適用したことで実際に Kafka の Topic が作成されていることを here document を実行して確認してみました。

$ kubectl -n my-kafka-project exec -i kafka-client -- bash << 'EOS'
bin/kafka-topics.sh --describe --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
EOS

Topic: my-topic TopicId: Pan_0la7Q-m4QA8HFvKqWA PartitionCount: 3 ReplicationFactor: 1 Configs: min.insync.replicas=1,message.format.version=3.0-IV1
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0

Broker は1個なので、全ての Partition が 同じ Broker に作られ、Replica はない状態です。ついでに Kafka 付属の console-producer と console-consumer を使って my-topic でメッセージをやりとりしてみます。

Producer を起動してメッセージを入力。

$ kubectl -n my-kafka-project exec -it kafka-client -- bash
% bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
>hello

別のターミナルで、Consumer 受信を確認。

$ kubectl -n my-kafka-project exec -it kafka-client -- bash
% bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
hello

Kafka Cluster としてちゃんと動作しているようです。

Topic の追加は Manifest を適用するだけでした。Topic の修正も Manifest を変更して適用するだけです。

kubectl edit で my-topic の partition を4に増やして適用します。

kubectl -n my-kafka-project edit kafkatopic my-topic

Topic を確認、Partition 数が変更されました。

$ kubectl -n my-kafka-project get kafkatopic my-topic
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
my-topic my-cluster 4 1 True

Broker を増やしてリバランスする (Strimzi + Cruise Control 編)

#

あの面倒だったリバランスについては、Strimzi は Cruise Control を導入しており、オペレーションがかなり楽になっています。

Strimzi のドキュメントにはこのようにあります。

Cruise Control は、Kafka クラスター全体のデータ監視とバランシングを簡略化するためのオープンソースプロジェクトです。Cruise Controlは、Kafka クラスターと共にデプロイされて、トラフィックを監視し、よりバランスの取れた Partition 割り当てを提案し、それらの提案に基づいて Partition の再割り当てを実行します。
Cruise Control は、リソース使用率情報を収集して、Kafka クラスターのワークロードをモデル化および分析します。Cruise Control は、最適化目標に基づいて、クラスターを効果的にリバランスする最適化提案を生成します。最適化の提案が承認されると、Cruise Control はリバランスを適用します。
Prometheus は、最適化の提案やリバランス操作に関連するデータを含む、Cruise Control のためのメトリクスデータを抽出できます。Cruise Control 用のサンプル Manifest と Grafana ダッシュボードが Strimzi に付属しています。

Cruise Control は LinkedIn で開発されている Kafka オートメーションの OSS です。

GitHub - linkedin/cruise-control: Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.

Kafka 標準のリバランスと違い、Prometheus のメトリクスも利用できるため、より精度の高い提案が得られそうです。

Strimzi のドキュメントには概要しか書いてありませんが、以下のブログに Cruise Control を使ったリバランスの手順が書かれています。

Cluster balancing with Cruise Control

ということで、Cruise Control を使ったリバランスの手順を確認していきます。上記でデプロイした Kafka クラスターは Cruise Control がない構成だったので、これを破棄し、Cruise Controll を含む構成でデプロイします。Broker が複数必要なので、kafka の replicas を2にしています。Kafka の spec に cruiseControl: {} を追加しています。

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 2
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
cruiseControl: {}
EOF

デプロイされたオブジェクトを確認。 Cruise Control の Deployment と Pod ができています。

$ kubectl get deploy,sts,po,svc -n my-kafka-project
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/my-cluster-cruise-control 1/1 1 1 2m5s
deployment.apps/my-cluster-entity-operator 1/1 1 1 2m27s

NAME READY AGE
statefulset.apps/my-cluster-kafka 2/2 3m2s
statefulset.apps/my-cluster-zookeeper 1/1 3m26s

NAME READY STATUS RESTARTS AGE
pod/my-cluster-cruise-control-857c6d665-4whmw 1/1 Running 0 2m5s
pod/my-cluster-entity-operator-79d44dd6c-tjj9n 3/3 Running 0 2m27s
pod/my-cluster-kafka-0 1/1 Running 0 3m2s
pod/my-cluster-kafka-1 1/1 Running 0 3m2s
pod/my-cluster-zookeeper-0 1/1 Running 0 3m26s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/my-cluster-cruise-control ClusterIP 10.98.47.144 <none> 9090/TCP 2m5s
service/my-cluster-kafka-0 NodePort 10.107.99.207 <none> 9094:30909/TCP 3m3s
service/my-cluster-kafka-1 NodePort 10.99.93.45 <none> 9094:31078/TCP 3m3s
service/my-cluster-kafka-bootstrap ClusterIP 10.107.191.25 <none> 9091/TCP,9092/TCP,9093/TCP 3m3s
service/my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 3m3s
service/my-cluster-kafka-external-bootstrap NodePort 10.102.73.249 <none> 9094:32702/TCP 3m3s
service/my-cluster-zookeeper-client ClusterIP 10.98.8.86 <none> 2181/TCP 3m26s
service/my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 3m26s

この状態から、リバランスを実行していきます。Strimzi の examples ディレクトリにリバランスのサンプルがいくつかあります。この中で、examples/cruise-control/kafka-rebalance-full.yaml を適用。リバランスも KafkaRebalance という CRD になっています。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
labels:
strimzi.io/cluster: my-cluster
# no goals specified, using the default goals from the Cruise Control configuration
spec: {}

この Manifest では、Goal を指定していないので、Goal については Cruise Control のデフォルト値が適用されることになります。

Manifest を適用します。

$ kubectl apply -f kafka-rebalance-full.yaml -n my-kafka-project
kafkarebalance.kafka.strimzi.io/my-rebalance created

適用した KafkaRebalance を describe すると Status/Optimization Result にリバランスの提案が出力されています。Status は ProposalReady です。

$ kubectl describe kafkarebalance my-rebalance -n my-kafka-project
Name: my-rebalance
Namespace: my-kafka-project
Labels: strimzi.io/cluster=my-cluster
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaRebalance
Metadata:
Creation Timestamp: 2022-05-25T09:04:02Z
Generation: 1
(..)
Status:
Conditions:
Last Transition Time: 2022-05-25T09:04:03.507351Z
Status: True
Type: ProposalReady
Observed Generation: 1
Optimization Result:
After Before Load Config Map: my-rebalance
Data To Move MB: 0
Excluded Brokers For Leadership:
Excluded Brokers For Replica Move:
Excluded Topics:
Intra Broker Data To Move MB: 0
Monitored Partitions Percentage: 100
Num Intra Broker Replica Movements: 0
Num Leader Movements: 17
Num Replica Movements: 42
On Demand Balancedness Score After: 83.92687872002556
On Demand Balancedness Score Before: 80.87946050436929
Provision Recommendation:
Provision Status: RIGHT_SIZED
Recent Windows: 1
Session Id: cc73d00a-0d49-41cb-b589-9b9efb042f6d
Events: <none>

この Kafka クラスターではまだ何のイベントも処理していませんので、Intra Broker Data To Move MB は 0MB ですが、42 の Replica の移動が提案されました。

リバランス提案を承認し、適用するには kubectl annotate で approve します。

$ kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve -n my-kafka-project
kafkarebalance.kafka.strimzi.io/my-rebalance annotated

これで、実際にリバランスが開始されます。Status は Reblancing になります。

$ kubectl describe kafkarebalance my-rebalance -n my-kafka-project
(..)
Status:
Conditions:
Last Transition Time: 2022-05-25T09:12:36.842261Z
Status: True
Type: Rebalancing

完了すると Status が Ready になります。

$ kubectl describe kafkarebalance my-rebalance -n my-kafka-project
(..)
Status:
Conditions:
Last Transition Time: 2022-05-25T09:15:22.110294Z
Status: True
Type: Ready

以上のように、Strimzi に組み込まれた Cruise Control により、リバランスのオペレーションはかなり楽になっています。

この例では、デフォルトの Goal を使用しましたが、個別の Goal を設定するサンプルや Broker を追加・削除する際のサンプルなども提供されています。

Strimzi の様々な Operator や Configuration

#

Strimzi の Kafka の運用に必要な様々な Operator や Configurationが実装されています。これらは namespace kafka にデプロイされた CRD により実現されています。

Operators

  • Cluster Operator
  • Entity Operator
  • Topic Operator
  • User Operator

Configuration

  • Cluster configuration
  • MirrorMaker configuration
  • Kafka Connect configuration

以前紹介した Debezium も Kafka Connect であり、Configuration でインストール可能です。

詳細は、ドキュメントを参照してください。

Strimzi Overview guide (0.29.0)

まとめ

#

以上のように、Strimzi は Kafka を Kubernetes で運用する上で必要になる様々な Operator や Configuration が実装されており、面倒なリバランス処理も自動化できる優れた OSS です。Kafka のような複雑な構成のアプリケーションもこういった Operator により、Kubernetes 上で運用でき、コンテナ化の恩恵を得ることができます。


  1. Kubernetes におけるステートフルなアプリケーションのためのワークロード。 ↩︎

  2. デフォルトの namespace は myproject になっています。 ↩︎

豆蔵デベロッパーサイト - 先週のアクセスランキング
  1. 基本から理解するJWTとJWT認証の仕組み (2022-12-08)
  2. AWS認定資格を12個すべて取得したので勉強したことなどをまとめます (2022-12-12)
  3. Nuxt3入門(第4回) - Nuxtのルーティングを理解する (2022-10-09)
  4. Nuxt3入門(第1回) - Nuxtがサポートするレンダリングモードを理解する (2022-09-25)
  5. Nuxt3入門(第8回) - Nuxt3のuseStateでコンポーネント間で状態を共有する (2022-10-28)
  6. Jest再入門 - 関数・モジュールモック編 (2022-07-03)
  7. 自然言語処理初心者が「GPT2-japanese」で遊んでみた (2022-07-08)
  8. IoT を使ってみる(その6:MQTTブローカー Mosquitto編) (2022-10-08)
  9. Nuxt3入門(第3回) - ユニバーサルフェッチでデータを取得する (2022-10-06)
  10. 統計学で避けて通れない自由度の話 (2022-06-20)