極光日記

【Java / Spring】Spring for Apache Kafka 入門

作成日

はじめに

SpringとKafkaの勉強のためにQuick Tourに沿ってKafkaへのメッセージの送受信を試せるアプリケーションを作成しました。
SpringでKafkaを利用する機能に関する動作確認に使えます。 ソースコードはGitLabのspring-kafka-demoリポジトリにプッシュしました。

動作確認

上述のリポジトリをクローンして動作確認する手順を記載します。
Java17で動作確認しましたが、それ以上ならだいたい問題ないと思います。

動作確認用にKafkaを立ち上げるcompose.yamlを使い、コンテナを立ち上げます。

docker compose up -d

次にアプリケーションを立ち上げます。

./gradlew bootRun

/sendにアクセスするとSpringアプリケーションがKafkaにメッセージを送信するようにしているため、curlで適当なリクエストを送信します。

curl -X GET "http://localhost:8080/send?message=HelloKafka"

このアプリケーションは送信だけでなく、送信先と同じトピックから受信もしているため、curlでメッセージを送信した瞬間に、送信したメッセージを受信します。受信メッセージはログ出力するので、ログから確認できます。

docker composeはkafka-uiというkafkaの情報を画面から確認できるコンテナも立ち上げており、localhost:8081でアクセスできます。この画面から送信されたメッセージを確認したり、メッセージを新たに送信する事もできます。

実装のポイントについて

docker compose

kafka0が

ports:
  - "9092:9092"

となっているところからわかるように、localhost:9092でKafkaにアクセスできます。
また、kafka-uiにlocalhost:8081でアクセスできるようにしています。
動作確認用のdocker composeの詳細はdocker-composeでKafkaの動作確認をするを参照してください(投稿者は私ですが……)

Java

メッセージ送信

Springの公式ドキュメントのQuick Startにある例ではApplicationRunnerから呼び出され、起動時に1回だけメッセージを送信する方式でしたが、それだと動作確認の観点では不便なため、Controllerを追加し、APIリクエスト経由でメッセージを送信できるように修正しました。

メッセージ受信

Springの公式ドキュメントのQuick Startにある例では

@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
    System.out.println(in);
}

のように引数がStringでした。これを、

@KafkaListener(id = "myId", topics = "topic1")
public void listen(ConsumerRecord<String, String> record) {
    log.info(record.toString());
}

のようにしました。
引数がString型だとメッセージの本体しかメソッド内で取得できませんが、ConsumerRecord<String, String>型だとパーティションなどメタデータも取得できます。動作確認などでは便利です。