注目イベント!
アドベントカレンダー2024開催中!
一年を締めくくる特別なイベント、アドベントカレンダーを今年も開催しています!
初心者からベテランまで楽しめる内容で、毎日新しい技術トピックをお届けします。
詳細はこちらから!
event banner

Debezium によるチェンジデータキャプチャー

| 20 min read
Author: masahiro-kondo masahiro-kondoの画像

Change Data Capture (CDC) は、データベースで発生した変更をキャプチャーして別のシステムに伝播させ、応答できるようにする仕組みです。CDC を利用することで、テーブルの更新をポーリングするバッチ処理などを作り込まずに、イベントドリブンな応答処理を実装できます。CDC はイベントソーシングと並んで分散システムをリアクティブに連携させるためのソリューションとして位置付けられます。[1]

Amazon DynamoDBAzure CosmosDB などのマネージドサービスでは、変更イベントを通知する仕組みを備えているものがあります。Debezium を使うと、MySQL や PostgreSQL などオンプレミス環境で広く使われている DBMS でも CDC を利用できます。

この記事では Debezium の概要を簡単に説明し、PostgreSQL と Debezium を使った開発環境を macOS 上に構築する手順を示します。

Debezium の概要

#

Debezium は、Red Hat が OSS として開発している CDC のためのプラットフォームであり、Kafka Connect として動作します。

Kafka は 高速でスケーラブルな分散型メッセージングシステムであり、Kafka Connect は Kafka と他のシステムでデータをストリーミングするためのフレームワークです。Kafka Connect にはデータソース側の Souce Connector と Consumer 側の Sink Connector があります。Debezium は Souce Connector に位置付けられます。

flowchart LR
    subgraph Source System
      SA[Application]-->|Write|Database
    end
    subgraph Source Connector
      Debezium
    end
    subgraph Kafka Broker
      topic
    end
    subgraph Sink Connector
      Listener
    end
    subgraph Consumer System
      CA[Application]
    end
    Database-->|Capture|Debezium
    topic-->Listener
    Debezium-->topic
    Listener-->CA

Debezium 設定で指定されたテーブルの変更が所定の Kafka topic にメッセージとして投入されます。Consumer システムではその topic に subscribe し、イベント受信時の処理を実装します。

Debezium のドライバーは各 DBMS ごとに提供されています。

  • MySQL
  • MongoDB
  • PostgreSQL
  • SQL Server
  • Oracle[2]
  • Cassandra

Debezium は、ソースとなるアプリケーションのデータベースのトランザクションログを監視して変更をキャプチャーします。PostgreSQL では WAL (Write Ahead Log)、MySQL では row-lebel binlog を監視します。このため、JDBC などのクエリ用のドライバーで実装された Source Connector に比べて DBMS の負荷が小さく高速です。

Debezium による開発環境構築

#

macOS で PostgreSQL と Debezium の環境を構築して動作を確認してみます。HomeBrew を使用します。

PostgreSQL インストールとデータベース作成

#

PostgreSQL をインストールして、コマンドのディレクトリに PATH を通します。

brew install postgresql@13
echo 'export PATH="/opt/postgresql@13/bin:$PATH"' >> ~/.zshrc

postgresql.conf で wal_level を設定します。

vi /opt/homebrew/var/postgresql@13/postgesql.conf
Information

postgresql.conf は環境によって /usr/local/var/postgresql@13 配下に生成されている場合もあります。

wal_level = logical

WAL level logical を指定する必要があり、出力されるログの量は多くなります。

PostgreSQL のサービスを起動します。

brew services start postgresql@13

PostgreSQL に接続して wal_level の設定を確認します。

psql postgres -c 'show wal_level;'
 wal_level 
-----------
 logical
(1 row)

キャプチャー対象のテーブル作成

#

データベースを作成してキャプチャー対象のテーブルを作成します。以下のような DDL を記述したファイルを用意しました。スキーマ dzaccount というシンプルなテーブルを1つ作るだけの DDL です。

DROP SCHEMA IF EXISTS dz CASCADE;
CREATE SCHEMA IF NOT EXISTS dz;

CREATE TABLE dz.account(
  id character(5) NOT NULL,
  name character varying(20) NOT NULL
);

データベース db1 を作成し DDL ファイル(ここでは ddl.sql) を実行します[3]

psql postgres -c 'CREATE USER postgres SUPERUSER;'
psql -U postgres -c 'CREATE DATABASE db1;'
psql -U postgres -d db1 -f ddl.sql

テーブルができていることを確認します。

$ psql -U postgres -d db1 -c '\d dz.account;'
                       Table "dz.account"
 Column |         Type          | Collation | Nullable | Default
--------+-----------------------+-----------+----------+---------
 id     | character(5)          |           | not null |
 name   | character varying(20) |           | not null |

Kafka インストール

#

Kafka をインストールして、ZooKeeper と Kafka のサービスを起動します。ZooKeeper は Kafka Cluster のメタ情報を管理するサービスです。

brew install kafka
brew services start zookeeper
brew services start kafka

記事執筆時点では、Kafka 3.1.0 がインストールされました。

Debezium インストール

#

続いて Debezium をインストールします。

Installing Debezium :: Debezium Documentation

公式ドキュメントのリンク先は Nightly ビルドのアーカイブに向いていますので、このページのリンク先からダウンロードできるのは Stable 版ではありません。記事執筆時点の Stable release は 1.8 で、以下のページのリンクからダウンロード可能です。

https://debezium.io/documentation/reference/1.8/install.html

ここでは、Kafka の plugin ディレクトリを作成し、ダウンロードしたアーカイブを解凍したディレクトリをコピーしました。

curl -LO https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.8.1.Final/debezium-connector-postgres-1.8.1.Final-plugin.tar.gz
tar xfz debezium-connector-postgres-1.8.1.Final-plugin.tar.gz
sudo mkdir -p /usr/local/share/kafka/plugins
sudo mv debezium-connector-postgres /usr/local/share/kafka/plugins

Kafka の connect-distributed.properties ファイルを編集します。

vi /opt/homebrew/etc/kafka/connect-distributed.properties

plugin.path で上記で作成した plugin ディレクトリを指定します。

plugin.path=/usr/local/share/kafka/plugins

connect-distributed コマンドを使って Kafka Connect を起動します。上記で編集した properties ファイルを指定して plugin ディレクトリを認識させる必要があります。

connect-distributed /opt/homebrew/etc/kafka/connect-distributed.properties

8083 ポートで Kafka Connect の REST API エンドポイントが起動するので、plugin の一覧を習得して Debezium が有効化されていることを確認します。

$ curl http://localhost:8083/connector-plugins | jq .
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   599  100   599    0     0  31412      0 --:--:-- --:--:-- --:--:-- 46076
[
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "1.8.1.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.1.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "3.1.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

Debezium plugin が読み込まれました。Debezium 以外に FileStreamSourceConnector などいくつかの plugin も組み込まれています。

キャプチャー設定登録

#

次に、PostgreSQL から変更データを受け取れるように以下のような JSON ファイル (ここでは、register-postgres.json としました) を作成します。PostgreSQL への接続情報のほか、キャプチャー対象のテーブルを whitelist で指定しています。

{
  "name": "postgres-connector",
  "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "localhost",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname" : "db1",
      "database.server.name": "postgres",
      "table.whitelist": "dz.account",
      "plugin.name": "pgoutput"
  }
}

Kafka Connect の REST API でキャプチャー設定を登録します。成功すると、登録した情報が JSON で返却されます。

$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

HTTP/1.1 201 Created
Date: Mon, 28 Feb 2022 05:27:15 GMT
Location: http://localhost:8083/connectors/postgres-connector
Content-Type: application/json
Content-Length: 409
Server: Jetty(9.4.43.v20210629)

{"name":"postgres-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"localhost","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"db1","database.server.name":"postgres","table.whitelist":"dz.account","plugin.name":"pgoutput","name":"postgres-connector"},"tasks":[],"type":"source"}

kafka-topics コマンドで Kafka cluster に作成された topic の一覧を見ると、Kafka Connect 関連の topic が作成されています。

$ kafka-topics --bootstrap-server=localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status

account テーブルにデータを挿入します。

psql -U postgres -d db1 -c "INSERT INTO dz.account (id, name) VALUES ('00001', 'Alice');"

データ挿入後に topic を確認すると、postgres.dz.account という topic が作成されています。topic 名は <データベース名>.<スキーマ名>.<テーブル名> という規約に従います。

$ kafka-topics --bootstrap-server=localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
postgres.dz.account

Kafka の console consumer で topic のイベントを確認してみます。

$ kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic postgres.dz.account \
  --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"postgres.dz.account.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"postgres.dz.account.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.dz.account.Envelope"},"payload":{"before":null,"after":{"id":"00001","name":"Alice"},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"postgres","ts_ms":1646026975330,"snapshot":"false","db":"db1","sequence":"[null,\"23102744\"]","schema":"dz","table":"account","txId":493,"lsn":23102744,"xmin":null},"op":"c","ts_ms":1646026975842,"transaction":null}}

メタデータが多くて見づらいですが、以下のような payload が送信されたことがわかります。after に挿入されたデータ、source にデータソースの情報が入っています。

"payload": {
    "before": null,
    "after": {
        "id": "00001",
        "name": "Alice"
    },
    "source": {
        "version": "1.8.1.Final",
        "connector": "postgresql",
        "name": "postgres",
        "ts_ms": 1646026975330,
        "snapshot": "false",
        "db": "db1",
        "sequence": "[null,\"23102744\"]",
        "schema": "dz",
        "table": "account",
        "txId": 493,
        "lsn": 23102744,
        "xmin": null
    },
    "op": "c",
    "ts_ms": 1646026975842,
    "transaction": null
}

topic connect-status に送信されたメッセージを見てみると、postgres.dz.account という topic が作成されたことが記録されています。

$ kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic connect-status \
  --from-beginning
{"state":"RUNNING","trace":null,"worker_id":"127.0.0.1:8083","generation":2}
{"state":"RUNNING","trace":null,"worker_id":"127.0.0.1:8083","generation":3}
{"topic":{"name":"postgres.dz.account","connector":"postgres-connector","task":0,"discoverTimestamp":1646026976168}}

イベントデータの調整

#

先ほどの payload の before は、INSERT なので null になっていると思いきや、UPDATE しても null のままです。更新前の情報を取得したいケースもあるでしょう。before を有効化するには、PostgreSQL のテーブルの REPLICA IDENTITYFULLUSING INDEX に設定する必要があります。

ALTER TABLE dz.account REPLICA IDENTITY FULL;

この状態で、レコードを更新します。

psql -U postgres -d db1 -c "UPDATE dz.account SET name = 'Bob' WHERE id = '00001';"

payload に before の値も入るようになります。

"payload": {
    "before": {
        "id": "00001",
        "name": "Alice"
    },
    "after": {
        "id": "00001",
        "name": "Bob"
    },
    "source": {
    },
    "op": "u",
    "ts_ms": 1646028575960,
    "transaction": null
}

送信されたメッセージには payload 以外のスキーマ情報が含まれているため、列が多いほどデータサイズも巨大になります。これは、Debezium の設計思想で、アップストリームのデータベース構造が時間の経過とともに変化してもイベントが意味をなすよう自己記述的になっているためです。スキーマレジストリを利用することでメッセージとメタデータ管理を分離可能です。

Information

スキーマレジストリではメッセージ形式に Avro のような軽量バイナリフォーマットも利用できます。

スキーマレジストリのような、やや複雑な構成を導入せずに、シンプルに payload だけをやり取りしたいケースもあるでしょう。そのためにはキャプチャー設定で JsonConverter のスキーマ情報を無効化するオプションを追加します (key.converter 以降の4行)。

{
  "name": "postgres-connector",
  "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "localhost",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname" : "db1",
      "database.server.name": "postgres",
      "table.whitelist": "dz.account",
      "plugin.name": "pgoutput",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": false,
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": false
  }
}

登録されている postgres-connector の定義を一旦削除します。

curl -X DELETE http://localhost:8083/connectors/postgres-connector

Kafka Connect を再起動します。

Information

connect-distributed を起動したターミナルで Ctrl + C を入力して停止し、再度 connect-distributed を起動します。

そして、上記のファイルを再登録します。

$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
HTTP/1.1 201 Created
Date: Mon, 28 Feb 2022 06:47:22 GMT
Location: http://localhost:8083/connectors/postgres-connector
Content-Type: application/json
Content-Length: 615
Server: Jetty(9.4.43.v20210629)

{"name":"postgres-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"localhost","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"db1","database.server.name":"postgres","table.whitelist":"dz.account","plugin.name":"pgoutput","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","name":"postgres-connector"},"tasks":[],"type":"source"}

これで topic には payload の中身だけが流れるようになります。

{
    "before": {
        "id": "00001",
        "name": "Alice"
    },
    "after": {
        "id": "00001",
        "name": "Bob"
    },
    "source": {
        "version": "1.8.1.Final",
        "connector": "postgresql",
        "name": "postgres",
        "ts_ms": 1646031007028,
        "snapshot": "false",
        "db": "db1",
        "sequence": "[\"23105640\",\"23105696\"]",
        "schema": "dz",
        "table": "account",
        "txId": 497,
        "lsn": 23105696,
        "xmin": null
    },
    "op": "u",
    "ts_ms": 1646031007522,
    "transaction": null
}

アプリケーション開発

#

以上、PostgreSQL テーブルの変更を Debezium でキャプチャーできる環境を構築しました。あとは、topic に subscribe して CDC メッセージを消費する Consumer を書けば CDC に対応したアプリケーションを作成できます。利用したいプログラミング言語に対応した Kafka Client ライブラリが提供されています。

Kafka Clients | Confluent Documentation

参考

#

  1. マイクロサービスのための分散データ 〜 イベントソーシング vs チェンジデータキャプチャ - 赤帽エンジニアブログ ↩︎

  2. Oracle の場合は XStream に依存しています。 ↩︎

  3. ここでは先行して postgres というロールも作成しています。 ↩︎

豆蔵では共に高め合う仲間を募集しています!

recruit

具体的な採用情報はこちらからご覧いただけます。