訣竅、技巧與秘訣

使用 Kafka 的簡單 DLQ

問題陳述

作為開發人員,我想要編寫一個消費者應用程式,從 Kafka 主題處理記錄。但是,如果處理過程中發生某些錯誤,我不希望應用程式完全停止。相反地,我想要將錯誤記錄發送到 DLT (死信主題),然後繼續處理新記錄。

解決方案

此問題的解決方案是使用 Spring Cloud Stream 中的 DLQ 功能。為了便於討論,讓我們假設以下是我們的處理器函數。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

這是一個非常簡單的函數,它會為處理的所有記錄拋出例外,但您可以採用此函數並將其擴展到任何其他類似情況。

為了將錯誤記錄發送到 DLT,我們需要提供以下組態。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

為了啟用 DLQ,應用程式必須提供群組名稱。匿名消費者無法使用 DLQ 功能。我們還需要透過將 Kafka 消費者綁定上的 enableDLQ 屬性設定為 true 來啟用 DLQ。最後,我們可以選擇性地透過在 Kafka 消費者綁定上提供 dlqName 來提供 DLT 名稱,否則在此案例中預設為 error.input-topic.my-group

請注意,在上面提供的範例消費者中,payload 的類型為 byte[]。預設情況下,Kafka binder 中的 DLQ 生產者期望 payload 的類型為 byte[]。如果不是這種情況,那麼我們需要為正確的序列化器提供組態。例如,讓我們將消費者函數重寫如下

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

現在,我們需要告訴 Spring Cloud Stream,當寫入 DLT 時,我們希望如何序列化資料。以下是此情境的修改後組態

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

具有進階重試選項的 DLQ

問題陳述

這與上面的秘訣類似,但作為開發人員,我想要組態處理重試的方式。

解決方案

如果您遵循上面的秘訣,那麼當處理遇到錯誤時,您將獲得 Kafka binder 中內建的預設重試選項。

預設情況下,binder 會重試最多 3 次,初始延遲為 1 秒,每次退避的乘數為 2.0,最大延遲為 10 秒。您可以如下變更所有這些組態

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您願意,您也可以透過提供布林值對應來提供可重試例外的清單。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

預設情況下,將重試上面地圖中未列出的任何例外。如果不希望這樣,那麼您可以透過提供以下內容來停用它,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您也可以提供自己的 RetryTemplate 並將其標記為 @StreamRetryTemplate,binder 將掃描並使用它。當您想要更複雜的重試策略和政策時,這非常有用。

如果您有多個 @StreamRetryTemplate bean,那麼您可以使用屬性來指定您的綁定想要哪一個,

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

使用 DLQ 處理反序列化錯誤

問題陳述

我的處理器在 Kafka 消費者中遇到反序列化例外。我預期 Spring Cloud Stream DLQ 機制會捕捉到這種情況,但它沒有。我該如何處理這個問題?

解決方案

當 Kafka 消費者拋出無法復原的反序列化例外時,Spring Cloud Stream 提供的正常 DLQ 機制將無濟於事。這是因為,此例外甚至在消費者的 poll() 方法返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助 binder 處理這種情況。讓我們來探索一下這些方法。

假設這是我們的函數

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

這是一個簡單的函數,它接受 String 參數。

我們想要繞過 Spring Cloud Stream 提供的訊息轉換器,並想要改用原生反序列化器。對於 String 類型的情況,這沒有太大的意義,但對於更複雜的類型 (如 AVRO 等),您必須依賴外部反序列化器,因此想要將轉換委派給 Kafka。

現在,當消費者接收到資料時,讓我們假設有一個錯誤記錄導致反序列化錯誤,例如,有人傳遞了 Integer 而不是 String。在這種情況下,如果您不在應用程式中執行任何操作,例外將會透過鏈傳播,您的應用程式最終將會退出。

為了處理這個問題,您可以新增一個 ListenerContainerCustomizer @Bean,它組態一個 DefaultErrorHandler。這個 DefaultErrorHandler 組態了一個 DeadLetterPublishingRecoverer。我們還需要為消費者組態一個 ErrorHandlingDeserializer。這聽起來像是很多複雜的事情,但實際上,在這種情況下,它歸結為這 3 個 bean。

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

讓我們分析它們中的每一個。第一個是 ListenerContainerCustomizer bean,它接受 DefaultErrorHandler。容器現在使用該特定的錯誤處理程式進行自訂。您可以這裡了解有關容器自訂的更多資訊。

第二個 bean 是 DefaultErrorHandler,它組態為發布到 DLT。請參閱這裡以取得有關 DefaultErrorHandler 的更多詳細資訊。

第三個 bean 是 DeadLetterPublishingRecoverer,它最終負責發送到 DLT。預設情況下,DLT 主題被命名為 ORIGINAL_TOPIC_NAME.DLT。不過,您可以變更它。請參閱文件以取得更多詳細資訊。

我們還需要透過應用程式組態組態一個 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委派給實際的反序列化器。如果發生錯誤,它會將記錄的鍵/值設定為 null,並包含訊息的原始位元組。然後,它會在標頭中設定例外,並將此記錄傳遞給監聽器,然後監聽器呼叫已註冊的錯誤處理程式。

以下是所需的組態

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我們透過綁定上的 configuration 屬性提供 ErrorHandlingDeserializer。我們也指示要委派的實際反序列化器是 StringDeserializer

請記住,上面的 dlq 屬性與本秘訣中的討論無關。它們純粹旨在解決任何應用程式層級的錯誤。

Kafka binder 中的基本偏移量管理

問題陳述

我想要編寫一個 Spring Cloud Stream Kafka 消費者應用程式,但不確定它如何管理 Kafka 消費者偏移量。您可以解釋一下嗎?

解決方案

我們建議您閱讀文件中關於此內容的章節,以全面了解它。

以下是它的要點

Kafka 預設支援兩種偏移量類型以開始使用 - earliestlatest。它們的語義從它們的名稱中即可自我解釋。

假設您是第一次執行消費者。如果您在 Spring Cloud Stream 應用程式中遺漏了 group.id,那麼它將成為匿名消費者。每當您有匿名消費者時,在這種情況下,Spring Cloud Stream 應用程式預設會從主題分割區中 latest 可用的偏移量開始。另一方面,如果您明確指定 group.id,那麼預設情況下,Spring Cloud Stream 應用程式將從主題分割區中 earliest 可用的偏移量開始。

在上述兩種情況下 (具有顯式群組的消費者和匿名群組),可以使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 並將其設定為 earliestlatest 來切換起始偏移量。

現在,假設您之前已經執行過消費者,現在再次啟動它。在這種情況下,上述案例中的起始偏移量語義不適用,因為消費者找到了消費者群組已提交的偏移量 (在匿名消費者的情況下,儘管應用程式未提供 group.id,但 binder 會為您自動產生一個)。它只是從最後提交的偏移量開始接續。即使您提供了 startOffset 值,情況也是如此。

但是,您可以使用 resetOffsets 屬性覆寫消費者從最後提交的偏移量開始的預設行為。為了執行此操作,請將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 設定為 true (預設為 false)。然後確保您提供 startOffset 值 (earliestlatest)。當您執行此操作,然後啟動消費者應用程式時,每次啟動時,它都會像第一次啟動一樣啟動,並忽略分割區的任何已提交偏移量。

在 Kafka 中搜尋任意偏移量

問題陳述

使用 Kafka binder,我知道它可以將偏移量設定為 earliestlatest,但我有一個需求是將偏移量搜尋到中間的某個位置,即任意偏移量。是否有辦法使用 Spring Cloud Stream Kafka binder 來實現這一點?

解決方案

先前我們看到了 Kafka binder 如何讓您處理基本偏移量管理。預設情況下,binder 不允許您倒轉到任意偏移量,至少透過我們在該秘訣中看到的機制不行。但是,binder 提供了一些低階策略來實現此用例。讓我們探索一下它們。

首先,當您想要重設為除了 earliestlatest 之外的任意偏移量時,請確保將 resetOffsets 組態保留為預設值,即 false。然後,您必須提供 KafkaBindingRebalanceListener 類型的自訂 bean,它將注入到所有消費者綁定中。它是一個介面,帶有一些預設方法,但以下是我們感興趣的方法

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

讓我們看看詳細資訊。

本質上,每次在主題分割區的初始指派期間或在重新平衡之後都會調用此方法。為了更好地說明,讓我們假設我們的主題是 foo,它有 4 個分割區。最初,我們僅在群組中啟動單一消費者,並且此消費者將從所有分割區消費。當消費者第一次啟動時,所有 4 個分割區都正在進行初始指派。但是,我們不希望從預設值 (earliest,因為我們定義了一個群組) 開始消費分割區,而是對於每個分割區,我們希望它們在搜尋到任意偏移量後再開始消費。想像一下,您有一個業務案例需要從某些偏移量消費,如下所示。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

這可以透過如下實作上述方法來實現。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

這只是一個基本的實作。真實世界的用例比這複雜得多,您需要相應地調整,但這確實為您提供了一個基本的草圖。當消費者 seek 失敗時,它可能會拋出一些執行階段例外,您需要決定在這些情況下該怎麼做。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我們啟動具有相同群組 ID 的第二個消費者會怎麼樣?

當我們新增第二個消費者時,將會發生重新平衡,並且某些分割區將會移動。假設新消費者獲得分割區 23。當這個新的 Spring Cloud Stream 消費者呼叫此 onPartitionsAssigned 方法時,它會看到這是消費者上分割區 23 的初始指派。因此,由於對 initial 引數的條件檢查,它將執行搜尋操作。在第一個消費者的情況下,它現在只有分割區 01。但是,對於這個消費者來說,這只是一個重新平衡事件,而不是被視為初始指派。因此,由於對 initial 引數的條件檢查,它將不會重新搜尋到給定的偏移量。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 我該如何使用 Kafka binder 手動確認?

問題陳述

使用 Kafka binder,我想要在我的消費者中手動確認訊息。我該如何做到這一點?

解決方案

預設情況下,Kafka binder 委派給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackModebatch。請參閱這裡以取得有關該內容的更多詳細資訊。

在某些情況下,您想要停用此預設提交行為並依賴手動提交。以下步驟允許您執行此操作。

將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE。當它像這樣設定時,消費者方法接收到的訊息中將會有一個名為 kafka_acknowledgment (來自 KafkaHeaders.ACKNOWLEDGMENT) 的標頭。

例如,將此想像為您的消費者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然後,將屬性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 我該如何在 Spring Cloud Stream 中覆寫預設綁定名稱?

問題陳述

Spring Cloud Stream 根據函數定義和簽名建立預設綁定,但我該如何覆寫這些綁定以使用更符合網域友善的名稱?

解決方案

假設以下是您的函數簽名。

@Bean
public Function<String, String> uppercase(){
...
}

預設情況下,Spring Cloud Stream 將建立如下所示的綁定。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下屬性將這些綁定覆寫為其他名稱。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之後,所有綁定屬性都必須在新名稱 my-transformer-inmy-transformer-out 上進行設定。

以下是另一個使用 Kafka Streams 和多個輸入的範例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

預設情況下,Spring Cloud Stream 將為此函數建立三個不同的綁定名稱。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想要在這些綁定上設定某些組態時,都必須使用這些綁定名稱。您不喜歡這樣,並且想要使用更符合網域友善且可讀的綁定名稱,例如,類似於以下名稱。

  1. orders

  2. accounts

  3. enrichedOrders

您可以透過簡單地設定這三個屬性來輕鬆做到這一點

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

一旦您執行此操作,它將覆寫預設綁定名稱,並且您想要在其上設定的任何屬性都必須使用這些新綁定名稱。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 我該如何將訊息鍵作為記錄的一部分發送?

問題陳述

我需要連同記錄的 payload 一起發送一個鍵,是否有辦法在 Spring Cloud Stream 中做到這一點?

解決方案

通常,您可能需要發送關聯性資料結構 (如 map) 作為具有鍵和值的記錄。Spring Cloud Stream 允許您以直接的方式執行此操作。以下是執行此操作的基本藍圖,但您可能需要根據您的特定用例進行調整。

以下是範例生產者方法 (又名 Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

這是一個簡單的函數,它發送一個具有 String payload 的訊息,但也帶有一個鍵。請注意,我們使用 KafkaHeaders.MESSAGE_KEY 將鍵設定為訊息標頭。

如果您想要將鍵從預設的 kafka_messageKey 變更為其他鍵,那麼在組態中,我們需要指定此屬性

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

請注意,我們使用綁定名稱 supplier-out-0,因為那是我們的函數名稱,請相應地更新。

然後,當我們生產訊息時,我們使用這個新鍵。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 我該如何使用原生序列化器和反序列化器,而不是 Spring Cloud Stream 完成的訊息轉換?

問題陳述

我想要在 Kafka 中使用原生 Serializer 和 Deserializer,而不是使用 Spring Cloud Stream 中的訊息轉換器。預設情況下,Spring Cloud Stream 使用其內建的訊息轉換器來處理此轉換。我該如何繞過這個問題並將責任委派給 Kafka?

解決方案

這真的很容易做到。

您所要做的就是提供以下屬性來啟用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然後,您還需要設定序列化器。有幾種方法可以做到這一點。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或使用 binder 組態。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

當使用 binder 方式時,它會應用於所有綁定,而將它們設定在綁定上則是每個綁定。

在反序列化端,您只需要將反序列化器作為組態提供即可。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在 binder 層級設定它們。

有一個可選屬性,您可以設定它來強制執行原生解碼。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,在 Kafka binder 的情況下,這是沒有必要的,因為當它到達 binder 時,Kafka 已經使用已組態的反序列化器對它們進行了反序列化。

說明偏移量重設如何在 Kafka Streams binder 中運作

問題陳述

預設情況下,Kafka Streams binder 始終從新消費者的最早偏移量開始。有時,從最新偏移量開始對應用程式有利或有需要。Kafka Streams binder 允許您執行此操作。

解決方案

在我們查看解決方案之前,讓我們先看看以下情境。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我們有一個 BiConsumer bean,它需要兩個輸入綁定。在這種情況下,第一個綁定用於 KStream,第二個綁定用於 KTable。當第一次執行此應用程式時,預設情況下,兩個綁定都從 earliest 偏移量開始。如果我想要由於某些需求而從 latest 偏移量開始,該怎麼辦?您可以透過啟用以下屬性來執行此操作。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想要一個綁定從 latest 偏移量開始,而另一個綁定從預設的 earliest 消費,那麼請從組態中省略後一個綁定。

請記住,一旦存在已提交的偏移量,這些設定將被採用,並且已提交的偏移量優先。

追蹤在 Kafka 中成功發送記錄 (生產)

問題陳述

我有一個 Kafka 生產者應用程式,並且想要追蹤我的所有成功發送。

解決方案

讓我們假設我們在應用程式中有以下供應商。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然後,我們需要定義一個新的 MessageChannel bean 來捕獲所有成功的發送資訊。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下來,在應用程式組態中定義此屬性,以提供 recordMetadataChannel 的 bean 名稱。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此時,成功的發送資訊將被發送到 fooRecordChannel

您可以編寫一個 IntegrationFlow 如下所示以查看資訊。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,payload 是發送到 Kafka 的內容,而訊息標頭包含一個名為 kafka_recordMetadata 的特殊鍵。它的值是 RecordMetadata,其中包含有關主題分割區、目前偏移量等的資訊。

在 Kafka 中新增自訂標頭對應器

問題陳述

我有一個 Kafka 生產者應用程式,它設定了一些標頭,但它們在消費者應用程式中遺失了。為什麼會這樣?

解決方案

在正常情況下,這應該沒問題。

想像一下,您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消費者端,您仍然應該看到標頭 "foo",並且以下內容不應給您帶來任何問題。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在應用程式中提供自訂標頭對應器,那麼這將不起作用。假設您在應用程式中有一個空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果那是您的實作,那麼您將在消費者端遺失 foo 標頭。您可能在這些 KafkaHeaderMapper 方法內部有一些邏輯。您需要以下內容來填入 foo 標頭。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

這將正確地將 foo 標頭從生產者端填入到消費者端。

關於 id 標頭的特別注意事項

在 Spring Cloud Stream 中,id 標頭是一個特殊的標頭,但有些應用程式可能想要有特殊的自訂 ID 標頭,例如 custom-idIDId。第一個 (custom-id) 將在沒有任何自訂標頭映射器的情況下,從生產者傳播到消費者。但是,如果您使用框架保留的 id 標頭的變體(例如 IDIdiD 等)進行生產,那麼您將遇到框架內部結構的問題。請參閱此 StackOverflow 線程 以獲得關於此用例的更多上下文。在這種情況下,您必須使用自訂的 KafkaHeaderMapper 來映射區分大小寫的 ID 標頭。例如,假設您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的 Id 標頭將從消費者端消失,因為它與框架 id 標頭衝突。您可以提供自訂的 KafkaHeaderMapper 來解決此問題。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

透過這樣做,idId 標頭都將從生產者端傳遞到消費者端。

在事務中生產到多個主題

問題陳述

我該如何將事務消息生產到多個 Kafka 主題?

如需更多背景資訊,請參閱此 StackOverflow 問題

解決方案

在 Kafka binder 中使用事務支持進行事務處理,然後提供 AfterRollbackProcessor。為了生產到多個主題,請使用 StreamBridge API。

以下是此操作的程式碼片段

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

為了進行測試,您可以使用以下配置

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要注意事項

請確保您的應用程式配置中沒有任何 DLQ 設定,因為我們手動配置 DLT(預設情況下,它將發佈到名為 input.DLT 的主題,基於初始消費者函數)。此外,將消費者綁定上的 maxAttempts 重置為 1,以避免 binder 進行重試。在上面的範例中,總共會嘗試 3 次(初始嘗試 + FixedBackoff 中的 2 次嘗試)。

請參閱 StackOverflow 線程 以取得關於如何測試此程式碼的更多詳細資訊。如果您使用 Spring Cloud Stream 透過新增更多消費者函數來測試它,請確保將消費者綁定上的 isolation-level 設定為 read-committed

StackOverflow 線程 也與此討論相關。

執行多個可輪詢消費者時要避免的陷阱

問題陳述

我如何才能執行多個可輪詢消費者的實例,並為每個實例產生唯一的 client.id

解決方案

假設我有以下定義

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

當執行應用程式時,Kafka 消費者會產生一個 client.id(類似於 consumer-my-group-1)。對於正在執行的每個應用程式實例,此 client.id 將會相同,導致意想不到的問題。

為了修正此問題,您可以在每個應用程式實例上新增以下屬性

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

請參閱此 GitHub issue 以取得更多詳細資訊。