Apache Kafka 支援

概觀

Spring Integration for Apache Kafka 是基於 Spring for Apache Kafka 專案

您需要將此依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.3.5"

它提供以下元件

輸出通道配接器

輸出通道配接器用於將來自 Spring Integration 通道的訊息發佈到 Apache Kafka 主題。該通道在應用程式上下文中定義,然後連接到將訊息發送到 Apache Kafka 的應用程式。發送者應用程式可以使用 Spring Integration 訊息發佈到 Apache Kafka,這些訊息在內部由輸出通道配接器轉換為 Kafka 記錄,如下所示

  • Spring Integration 訊息的酬載用於填充 Kafka 記錄的酬載。

  • 預設情況下,Spring Integration 訊息的 kafka_messageKey 標頭用於填充 Kafka 記錄的鍵。

您可以自訂發佈訊息的目標主題和分割區,分別透過 kafka_topickafka_partitionId 標頭。

此外,<int-kafka:outbound-channel-adapter> 提供透過在輸出訊息上應用 SpEL 運算式來提取鍵、目標主題和目標分割區的能力。為此,它支援三組互斥的屬性對

  • topictopic-expression

  • message-keymessage-key-expression

  • partition-idpartition-id-expression

這些讓您可以將 topicmessage-keypartition-id 分別指定為配接器上的靜態值,或在執行階段根據請求訊息動態評估其值。

KafkaHeaders 介面 (由 spring-kafka 提供) 包含用於與標頭互動的常數。messageKeytopic 預設標頭現在需要 kafka_ 前綴。當從使用舊標頭的早期版本遷移時,您需要在 <int-kafka:outbound-channel-adapter> 上指定 message-key-expression="headers['messageKey']"topic-expression="headers['topic']"。或者,您可以使用 <header-enricher>MessageBuilder 從上游將標頭變更為來自 KafkaHeaders 的新標頭。如果您使用常數值,也可以透過使用 topicmessage-key 在配接器上設定它們。

注意:如果配接器設定了主題或訊息鍵 (使用常數或運算式),則會使用這些設定,並忽略對應的標頭。如果您希望標頭覆寫設定,則需要在運算式中設定它,例如以下範例

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"

配接器需要 KafkaTemplate,而 KafkaTemplate 又需要適當設定的 KafkaProducerFactory

如果提供了 send-failure-channel (sendFailureChannel) 並且收到 send() 失敗 (同步或非同步),則會將 ErrorMessage 發送到通道。酬載是具有 failedMessagerecord (ProducerRecord) 和 cause 屬性的 KafkaSendFailureException。您可以透過設定 error-message-strategy 屬性來覆寫 DefaultErrorMessageStrategy

如果提供了 send-success-channel (sendSuccessChannel),則在成功發送後,會發送酬載類型為 org.apache.kafka.clients.producer.RecordMetadata 的訊息。

如果您的應用程式使用交易,並且同一個通道配接器用於發佈訊息 (交易由監聽器容器啟動),以及在沒有現有交易的情況下發佈訊息,則您必須在 KafkaTemplate 上設定 transactionIdPrefix,以覆寫容器或交易管理器使用的前綴。容器啟動的交易 (生產者工廠或交易管理器屬性) 使用的前綴在所有應用程式實例上必須相同。僅由生產者啟動的交易使用的前綴在所有應用程式實例上必須是唯一的。

您可以設定必須解析為布林值的 flushExpression。如果您正在使用 linger.msbatch.size Kafka 生產者屬性,則在發送多個訊息後刷新可能很有用;運算式應在最後一條訊息上評估為 Boolean.TRUE,並且將立即發送不完整的批次。預設情況下,運算式會在 KafkaIntegrationHeaders.FLUSH 標頭 (kafka_flush) 中尋找 Boolean 值。如果值為 true,則會發生刷新;如果值為 false 或標頭不存在,則不會發生刷新。

KafkaProducerMessageHandler.sendTimeoutExpression 的預設值已從 10 秒變更為 Kafka 生產者屬性 delivery.timeout.ms + 5000,以便將逾時後的實際 Kafka 錯誤傳播到應用程式,而不是由這個框架產生的逾時。之所以進行此變更是為了保持一致性,因為您可能會遇到意外行為 (Spring 可能使發送逾時,但實際上最終是成功的)。重要事項:該逾時預設為 120 秒,因此您可能希望縮短它以更快地獲得失敗通知。

設定

以下範例顯示如何設定 Apache Kafka 的輸出通道配接器

  • Java DSL

  • Java

  • XML

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

訊息驅動通道配接器

KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) 使用 spring-kafka KafkaMessageListenerContainerConcurrentListenerContainer

此外,mode 屬性可用。它可以接受 recordbatch 值 (預設值:record)。對於 record 模式,每個訊息酬載都是從單個 ConsumerRecord 轉換而來的。對於 batch 模式,酬載是從消費者輪詢返回的所有 ConsumerRecord 實例轉換而來的物件列表。與批次處理的 @KafkaListener 一樣,KafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSET 標頭也是列表,其位置對應於酬載中的位置。

接收到的訊息會填充某些標頭。有關更多資訊,請參閱 KafkaHeaders 類別

Consumer 物件 (在 kafka_consumer 標頭中) 不是執行緒安全的。您必須僅在配接器內呼叫監聽器的執行緒上調用其方法。如果您將訊息移交給另一個執行緒,則不得調用其方法。

當提供 retry-template 時,交付失敗會根據其重試策略進行重試。如果也提供了 error-channel,則預設的 ErrorMessageSendingRecoverer 將用作重試耗盡後的恢復回呼。您也可以使用 recovery-callback 來指定在這種情況下要採取的其他動作,或將其設定為 null 以將最終異常拋給監聽器容器,以便在那裡處理。

在建構 ErrorMessage 時 (用於 error-channelrecovery-callback),您可以透過設定 error-message-strategy 屬性來自訂錯誤訊息。預設情況下,使用 RawRecordHeaderErrorMessageStrategy,以提供對轉換後的訊息以及原始 ConsumerRecord 的存取權。

這種形式的重試是阻塞的,並且如果所有輪詢記錄的總重試延遲可能超過 max.poll.interval.ms 消費者屬性,則可能導致重新平衡。相反,請考慮將 DefaultErrorHandler 新增到監聽器容器,並使用 KafkaErrorSendingMessageRecoverer 進行設定。

設定

以下範例顯示如何設定訊息驅動通道配接器

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

您也可以使用用於 @KafkaListener 註解的容器工廠來建立 ConcurrentMessageListenerContainer 實例以用於其他用途。有關範例,請參閱 Spring for Apache Kafka 文件

使用 Java DSL 時,容器不必設定為 @Bean,因為 DSL 會將容器註冊為 bean。以下範例顯示如何執行此操作

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}

請注意,在這種情況下,配接器被賦予了 id (topic2Adapter)。容器在應用程式上下文中註冊,名稱為 topic2Adapter.container。如果配接器沒有 id 屬性,則容器的 bean 名稱是容器的完整類別名稱加上 #n,其中 n 對於每個容器都會遞增。

輸入通道配接器

KafkaMessageSource 提供了一個可輪詢的通道配接器實作。

設定

  • Java DSL

  • Kotlin

  • Java

  • XML

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
                          e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
    integrationFlow(Kafka.inboundChannelAdapter(cf,
        ConsumerProperties(TEST_TOPIC3).also {
            it.groupId = "kotlinMessageSourceGroup"
        }),
        { poller(Pollers.fixedDelay(100)) }) {
        handle { m ->

        }
    }
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
	consumerProperties.setGroupId("myGroupId");
	consumerProperties.setClientId("myClientId");
    retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        consumer-properties="consumerProperties1"
        ack-factory="ackFactory"
        channel="inbound"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>

<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="max.poll.records" value="1"/>
        </map>
    </constructor-arg>
</bean>

<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
    <constructor-arg name="topics" value="topic1"/>
    <property name="groupId" value="group"/>
    <property name="clientId" value="client"/>
</bean>

請參閱 javadocs 以取得可用的屬性。

預設情況下,max.poll.records 必須在消費者工廠中明確設定,否則如果消費者工廠是 DefaultKafkaConsumerFactory,則將強制設定為 1。您可以將屬性 allowMultiFetch 設定為 true 以覆寫此行為。

您必須在 max.poll.interval.ms 內輪詢消費者,以避免重新平衡。如果您將 allowMultiFetch 設定為 true,則必須處理所有擷取的記錄,並在 max.poll.interval.ms 內再次輪詢。

此配接器發出的訊息包含標頭 kafka_remainingRecords,其中包含來自先前輪詢的剩餘記錄計數。

從版本 6.2 開始,KafkaMessageSource 支援消費者屬性中提供的 ErrorHandlingDeserializerDeserializationException 從記錄標頭中提取並拋出給調用者。使用 SourcePollingChannelAdapter,此異常被包裝到 ErrorMessage 中並發佈到其 errorChannel。有關更多資訊,請參閱 ErrorHandlingDeserializer 文件。

輸出閘道

輸出閘道用於請求/回覆操作。它與大多數 Spring Integration 閘道不同之處在於,發送執行緒不會在閘道中阻塞,並且回覆在回覆監聽器容器執行緒上處理。如果您的程式碼在同步 訊息傳遞閘道 後面調用閘道,則使用者執行緒會在那裡阻塞,直到收到回覆 (或發生逾時)。

在回覆容器被指派其主題和分割區之前,閘道不接受請求。建議您將 ConsumerRebalanceListener 新增到範本的回覆容器屬性,並在將訊息發送到閘道之前等待 onPartitionsAssigned 呼叫。

KafkaProducerMessageHandler sendTimeoutExpression 預設值為 Kafka 生產者屬性 delivery.timeout.ms + 5000,以便將逾時後的實際 Kafka 錯誤傳播到應用程式,而不是由這個框架產生的逾時。之所以進行此變更是為了保持一致性,因為您可能會遇到意外行為 (Spring 可能使 send() 逾時,但實際上最終是成功的)。重要事項:該逾時預設為 120 秒,因此您可能希望縮短它以更快地獲得失敗通知。

設定

以下範例顯示如何設定閘道

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>

請參閱 javadocs 以取得可用的屬性。

請注意,使用了與 輸出通道配接器 相同的類別,唯一的區別是傳遞到建構函式的 KafkaTemplateReplyingKafkaTemplate。有關更多資訊,請參閱 Spring for Apache Kafka 文件

輸出主題、分割區、鍵等以與輸出配接器相同的方式確定。回覆主題的確定方式如下

  1. 名為 KafkaHeaders.REPLY_TOPIC 的訊息標頭 (如果存在,則必須具有 Stringbyte[] 值) 會針對範本的回覆容器訂閱的主題進行驗證。

  2. 如果範本的 replyContainer 僅訂閱了一個主題,則會使用該主題。

您也可以指定 KafkaHeaders.REPLY_PARTITION 標頭來確定用於回覆的特定分割區。同樣,這會針對範本的回覆容器的訂閱進行驗證。

或者,您也可以使用類似於以下 bean 的設定

@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}

輸入閘道

輸入閘道用於請求/回覆操作。

設定

以下範例顯示如何設定輸入閘道

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlow
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>

請參閱 javadocs 以取得可用的屬性。

當提供 RetryTemplate 時,交付失敗會根據其重試策略進行重試。如果也提供了 error-channel,則預設的 ErrorMessageSendingRecoverer 將用作重試耗盡後的恢復回呼。您也可以使用 recovery-callback 來指定在這種情況下要採取的其他動作,或將其設定為 null 以將最終異常拋給監聽器容器,以便在那裡處理。

在建構 ErrorMessage 時 (用於 error-channelrecovery-callback),您可以透過設定 error-message-strategy 屬性來自訂錯誤訊息。預設情況下,使用 RawRecordHeaderErrorMessageStrategy,以提供對轉換後的訊息以及原始 ConsumerRecord 的存取權。

這種形式的重試是阻塞的,並且如果所有輪詢記錄的總重試延遲可能超過 max.poll.interval.ms 消費者屬性,則可能導致重新平衡。相反,請考慮將 DefaultErrorHandler 新增到監聽器容器,並使用 KafkaErrorSendingMessageRecoverer 進行設定。

以下範例顯示如何使用 Java DSL 設定簡單的大寫轉換器

或者,您可以使用類似於以下程式碼來設定大寫轉換器

@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlow
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}

您也可以使用用於 @KafkaListener 註解的容器工廠來建立 ConcurrentMessageListenerContainer 實例以用於其他用途。有關範例,請參閱 Spring for Apache Kafka 文件訊息驅動通道配接器

由 Apache Kafka 主題支援的通道

Spring Integration 具有由 Apache Kafka 主題支援的 MessageChannel 實作,用於持久性。

每個通道都需要一個 KafkaTemplate 用於發送端,以及一個監聽器容器工廠 (用於可訂閱通道) 或一個 KafkaMessageSource 用於可輪詢通道。

Java DSL 設定

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />

訊息轉換

提供了一個 StringJsonMessageConverter。有關更多資訊,請參閱 Spring for Apache Kafka 文件

將此轉換器與訊息驅動通道配接器一起使用時,您可以指定要將傳入的酬載轉換為的類型。這可以透過在配接器上設定 payload-type 屬性 (payloadType 屬性) 來實現。以下範例顯示如何在 XML 設定中執行此操作

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Thing"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

以下範例顯示如何在 Java 設定中在配接器上設定 payload-type 屬性 (payloadType 屬性)

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
    return kafkaMessageDrivenChannelAdapter;
}

Null 酬載和日誌壓縮 '墓碑' 記錄

Spring Messaging Message<?> 物件不能有 null 酬載。當您將端點用於 Apache Kafka 時,null 酬載 (也稱為墓碑記錄) 由類型為 KafkaNull 的酬載表示。有關更多資訊,請參閱 Spring for Apache Kafka 文件

Spring Integration 端點的 POJO 方法可以使用真正的 null 值而不是 KafkaNull。為此,請使用 @Payload(required = false) 標記參數。以下範例顯示如何執行此操作

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

KStream 呼叫 Spring Integration 流程

您可以使用 MessagingTransformerKStream 調用整合流程

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

當整合流程以介面開始時,建立的代理具有流程 bean 的名稱,並附加 ".gateway",因此如果需要,此 bean 名稱可以用作 @Qualifier

讀取/處理/寫入場景的效能考量

許多應用程式會從一個主題 (topic) 進行消費,執行一些處理,然後寫入另一個主題。在大多數情況下,如果 write 失敗,應用程式會希望拋出例外,以便可以重試傳入的請求,和/或將其發送到死信主題。底層的消息監聽器容器支持此功能,並具有適當配置的錯誤處理程序。但是,為了支持這一點,我們需要阻止監聽器線程,直到寫入操作成功(或失敗),以便可以將任何異常拋給容器。當消費單個記錄時,這可以通過在出站適配器上設置 sync 屬性來實現。但是,當消費批次時,使用 sync 會導致顯著的性能下降,因為應用程式將等待每次發送的結果,然後再發送下一條消息。您也可以執行多次發送,然後等待這些發送的結果。這可以通過向消息處理程序添加 futuresChannel 來實現。要啟用此功能,請將 KafkaIntegrationHeaders.FUTURE_TOKEN 添加到出站消息;然後可以使用它將 Future 與特定的已發送消息關聯起來。以下是如何使用此功能的示例

@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}