原文はこちら。
This article was written by Daniel Kec (Java Developer, Oracle).
https://medium.com/helidon/helidon-2-0-with-kafka-b97ba4f422bd
Helidon 2.0でReactive Messagingがサポートされ、Helidon SE/MPで利用可能な最初のメッセージングコネクタの一つとしてKafkaコネクタが加わりました。Helidon MPはMicroProfileの仕様に準拠しており、CDIの素晴らしい機能を全て活用しています。Helidon SEでは、シンプルなAPI、明示的な設定、そして魔法ではない(no-magic)アプローチを楽しむことができます。
Configuration
Connecting Kafkaへの接続にあたっては通常自明でない構成が必要で、このような構成のオーケストレーションは難しいことがあります。簡単のため、Helidon SE/MPでは、Reactive Messagingのための外部構成でMicroProfileのReactive Messaging仕様で定義されているものと同じ記法をサポートします。
MicroProfile Reactive Messaging Specification – Configuration
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html#_configuration
以下は簡単な例です。
| mp.messaging: | |
| incoming.from-kafka: | |
| connector: helidon-kafka | |
| topic: messaging-test-topic-1 | |
| auto.offset.reset: latest | |
| enable.auto.commit: true | |
| group.id: example-group-1 | |
| outgoing.to-kafka: | |
| connector: helidon-kafka | |
| topic: messaging-test-topic-1 | |
| connector: | |
| helidon-kafka: | |
| bootstrap.servers: localhost:9092 | |
| key.serializer: org.apache.kafka.common.serialization.StringSerializer | |
| value.serializer: org.apache.kafka.common.serialization.StringSerializer | |
| key.deserializer: org.apache.kafka.common.serialization.StringDeserializer | |
| value.deserializer: org.apache.kafka.common.serialization.StringDeserializer |
この例では、同じトピックに接続しているfrom-kafkaという着信チャネルと、to-kafkaという送出チャネルの簡単な構成を示しています。Helidon SE と MP の両方でまったく同じ設定を使用してみましょう。
Kafka in Helidon MP
Helidon MPはMicroProfile Reactive Messaging仕様を実装しているため、必要な依存関係を追加し、アプリケーションスコープのbeanのメソッドに注釈を付けるだけでOKです。
MicroProfile Reactive Messaging Specification ver.1.0
https://download.eclipse.org/microprofile/microprofile-reactive-messaging-1.0/microprofile-reactive-messaging-spec.html
| <dependency> | |
| <groupId>io.helidon.microprofile.messaging</groupId> | |
| <artifactId>helidon-microprofile-messaging</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>io.helidon.messaging.kafka</groupId> | |
| <artifactId>helidon-messaging-kafka</artifactId> | |
| </dependency> |
メッセージのリスニングはシンプルです。
| @ApplicationScoped | |
| public class ExampleBean { | |
| @Incoming("from-kafka") | |
| public void broadcast(String payload) { | |
| System.out.println("Kafka says: " + payload); | |
| } |
メッセージ送信時には、バックプレッシャーの可能性を考慮する必要があります。このような作業のために、バッファを内蔵したSubmissionPublisherを使いましょう。
| public class ExampleBean { | |
| private final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); | |
| public void sendMessage(String message) { | |
| publisher.submit(message); | |
| } | |
| @Outgoing("to-kafka") | |
| public Publisher<String> preparePublisher() { | |
| return ReactiveStreams | |
| .fromPublisher(FlowAdapters.toPublisher(publisher)) | |
| .buildRs(); | |
| } | |
Kafka in Helidon SE
Helidon SEではCDIを使わずにReactive Messagingを利用するための専用APIがあります。このAPIはMicroProfileの仕様の一部ではありませんが、コネクタや構成、メッセージラッピングや確認応答(acknowledgement)と互換性があります。これらすべては、Helidon SE/MP間の移行を可能な限りスムーズにするためのものです。
| <dependency> | |
| <groupId>io.helidon.messaging</groupId> | |
| <artifactId>helidon-messaging</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>io.helidon.messaging.kafka</groupId> | |
| <artifactId>helidon-messaging-kafka</artifactId> | |
| </dependency> |
メッセージのリスニングは、Helidon MPの場合と同様簡単です。
| Channel<String> fromKafka = Channel.create("from-kafka"); | |
| KafkaConnector kafkaConnector = KafkaConnector.create(); | |
| Messaging.builder() | |
| .connector(kafkaConnector) | |
| .listener(fromKafka, payload -> { | |
| System.out.println("Kafka says: " + payload); | |
| }) | |
| .build() | |
| .start(); |
Helidon SEのMessaging APIがEmitterを提供しているので、メッセージを送信するためにSubmissionPublisherをわざわざ使う必要はありません。
| Channel<String> toKafka = Channel.create("to-kafka"); | |
| KafkaConnector kafkaConnector = KafkaConnector.create(); | |
| Emitter<String> emitter = Emitter.create(toKafka); | |
| Messaging.builder() | |
| .emitter(emitter) | |
| .connector(kafkaConnector) | |
| .build() | |
| .start(); | |
| emitter.send("Hello Kafka!"); |
Acknowledgement
メッセージが適切に受信されたことが確実な時点までオフセットコミットを遅延させることが現実的な場合があります。多くは非同期処理中に発生する可能性があり、リアクティブストリームをブロックすることはできませんが、バックプレッシャー以外のフィードバックが必要です。reactive messagingの確認応答機能を使うと非常に便利です。Kafkaコネクタは auto.commit がオフのときに、確認済みメッセージのオフセットのみをコミットしています。
自動コミットをOffにする例です。
| mp.messaging: | |
| incoming.from-kafka: | |
| connector: helidon-kafka | |
| topic: messaging-test-topic-1 | |
| enable.auto.commit: false | |
| ack.timeout.millis: 10000 | |
| group.id: example-group-1 |
この例では、遅延オフセットコミットの原因となっている遅延確認応答を確認できます。
| Channel<String> fromKafka = Channel.create("from-kafka"); | |
| KafkaConnector kafkaConnector = KafkaConnector.create(); | |
| Messaging messaging = Messaging.builder() | |
| .connector(kafkaConnector) | |
| .subscriber(fromKafka, ReactiveStreams.<Message<String>>builder() | |
| //Apply back-pressure, flatMapCompletionStage requests one by one | |
| .flatMapCompletionStage(message -> { | |
| return CompletableFuture.runAsync(() -> { | |
| //Do something lengthy | |
| Multi.timer(300, TimeUnit.MILLISECONDS, executor) | |
| .first() | |
| .await(); | |
| //Acknowledge message has been consumed | |
| message.ack(); | |
| }); | |
| }) | |
| .ignore()) | |
| .build() | |
| .start(); |
この例のように、次から次へとメッセージをリクエストすることでバックプレッシャーをかけているため、利用率はあまり良くありません。非同期メッセージの消費中に致命的な障害が発生した場合、消費されなかったメッセージは確認応答を返さないので、Kafka コネクタはオフセットをコミットしません。それゆえメッセージの消失はありません。リバランス後にメッセージを再び消費します。
これだけのことを20行の読みやすい関数型コードに詰め込んだのに、多くの設定が隠されていることがわかります。ではno-magicアプローチはどうでしょうか?
では、もっと明確にしてみましょう。
Explicit Configuration
Helidon SEのReactive Messagingは任意のチャネルのPublisherコネクタもしくはSubscriberコネクタに対しHelidon Configを直接渡すことができます。以下の例では、Kafka コネクタが独自の Configビルダーを持っていることがわかります。そのため、Kafka に接続するたびに、頻繁に使用するクライアントのプロパティを調べる必要はありません。
| Channel<String> fromKafka = Channel.<String>builder() | |
| .publisherConfig(KafkaConnector.configBuilder() | |
| .bootstrapServers("localhost:9092") | |
| .groupId("example-group-1") | |
| .topic("messaging-test-topic-1") | |
| .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST) | |
| .enableAutoCommit(false) | |
| .keyDeserializer(StringDeserializer.class) | |
| .valueDeserializer(StringDeserializer.class) | |
| .build() | |
| ) | |
| .build(); | |
| // Prepare Kafka connector, can be used by any channel | |
| KafkaConnector kafkaConnector = KafkaConnector.create(); | |
| Messaging.builder() | |
| .connector(kafkaConnector) | |
| .subscriber(fromKafka, ReactiveStreams.<Message<String>>builder() | |
| //Apply back-pressure, flatMapCompletionStage request one by one | |
| .flatMapCompletionStage(message -> { | |
| return CompletableFuture.runAsync(() -> { | |
| //Do something lengthy | |
| Multi.timer(300, TimeUnit.MILLISECONDS, executor) | |
| .first() | |
| .await(); | |
| //Acknowledge message has been consumed | |
| message.ack(); | |
| }); | |
| }) | |
| .ignore()) | |
| .build() | |
| .start(); |
一つのスニペットに全て詰まっています。アブラカダブラ!まさにno-magicですね✨
HelidonにおけるReactive Messagingについて、詳細は以下をどうぞ。
Helidon Messaging with Kafka Examples
https://github.com/oracle/helidon/tree/master/examples/messaging
Reactive Messaging with Helidon 2.0
https://medium.com/helidon/reactive-messaging-with-helidon-2-0-f5de1ca5dc63
https://logico-jp.io/2020/03/29/reactive-messaging-with-helidon-2-0/