【Java / Spring】Spring for Apache Kafka 入門
作成日
はじめに
SpringとKafkaの勉強のためにQuick Tourに沿ってKafkaへのメッセージの送受信を試せるアプリケーションを作成しました。
SpringでKafkaを利用する機能に関する動作確認に使えます。
ソースコードはGitLabのspring-kafka-demoリポジトリにプッシュしました。
動作確認
上述のリポジトリをクローンして動作確認する手順を記載します。
Java17で動作確認しましたが、それ以上ならだいたい問題ないと思います。
動作確認用にKafkaを立ち上げるcompose.yamlを使い、コンテナを立ち上げます。
docker compose up -d
次にアプリケーションを立ち上げます。
./gradlew bootRun
/send
にアクセスするとSpringアプリケーションがKafkaにメッセージを送信するようにしているため、curlで適当なリクエストを送信します。
curl -X GET "http://localhost:8080/send?message=HelloKafka"
このアプリケーションは送信だけでなく、送信先と同じトピックから受信もしているため、curlでメッセージを送信した瞬間に、送信したメッセージを受信します。受信メッセージはログ出力するので、ログから確認できます。
docker composeはkafka-uiというkafkaの情報を画面から確認できるコンテナも立ち上げており、localhost:8081
でアクセスできます。この画面から送信されたメッセージを確認したり、メッセージを新たに送信する事もできます。
実装のポイントについて
docker compose
kafka0が
ports:
- "9092:9092"
となっているところからわかるように、localhost:9092
でKafkaにアクセスできます。
また、kafka-uiにlocalhost:8081
でアクセスできるようにしています。
動作確認用のdocker composeの詳細はdocker-composeでKafkaの動作確認をするを参照してください(投稿者は私ですが……)
Java
メッセージ送信
Springの公式ドキュメントのQuick Startにある例ではApplicationRunnerから呼び出され、起動時に1回だけメッセージを送信する方式でしたが、それだと動作確認の観点では不便なため、Controllerを追加し、APIリクエスト経由でメッセージを送信できるように修正しました。
メッセージ受信
Springの公式ドキュメントのQuick Startにある例では
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
のように引数がStringでした。これを、
@KafkaListener(id = "myId", topics = "topic1")
public void listen(ConsumerRecord<String, String> record) {
log.info(record.toString());
}
のようにしました。
引数がString
型だとメッセージの本体しかメソッド内で取得できませんが、ConsumerRecord<String, String>
型だとパーティションなどメタデータも取得できます。動作確認などでは便利です。