主題命名
重試主題和 DLT 的命名方式是在主要主題後綴提供的或預設值,並附加該主題的延遲或索引。
範例
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix"
預設行為是為每次嘗試建立單獨的重試主題,並附加索引值:retry-0、retry-1、…、retry-n。因此,預設情況下,重試主題的數量是設定的 maxAttempts 減 1。 |
重試主題和 DLT 後綴
您可以指定重試主題和 DLT 主題將使用的後綴。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
預設後綴分別為重試主題的 "-retry" 和 dlt 的 "-dlt"。 |
附加主題的索引或延遲
您可以在後綴後附加主題的索引或延遲值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
預設行為是後綴延遲值,但具有多個主題的固定延遲設定除外,在這種情況下,主題會後綴主題的索引。 |
單一主題用於固定延遲重試
如果您使用固定延遲策略(例如 FixedBackOffPolicy
或 NoBackOffPolicy
),則可以使用單一主題來完成非阻塞重試。此主題將後綴提供的或預設的後綴,並且不會附加索引或延遲值。
之前的 FixedDelayStrategy 現在已棄用,可以用 SameIntervalTopicReuseStrategy 替換。 |
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
預設行為是為每次嘗試建立單獨的重試主題,並附加其索引值:retry-0、retry-1、… |
單一主題用於 maxInterval 指數延遲
如果您使用指數退避策略 (`ExponentialBackOffPolicy`),則可以使用單一重試主題來完成延遲為已設定的 `maxInterval` 的嘗試的非阻塞重試。
這個「最終」重試主題將後綴提供的或預設的後綴,並且將附加索引或 maxInterval
值。
透過選擇對具有 maxInterval 延遲的重試使用單一主題,設定長時間保持重試的指數重試策略可能會變得更加可行,因為在這種方法中,您不需要大量主題。 |
從 3.2 開始,預設行為是在使用指數退避時,為相同間隔重複使用重試主題,重試主題後綴延遲值,最後一個重試主題為相同間隔(對應於 maxInterval
延遲)重複使用。
例如,當設定指數退避,initialInterval=1_000
、multiplier=2
和 maxInterval=16_000
時,為了保持嘗試一小時,需要將 maxAttempts
設定為 229,並且預設情況下,需要的重試主題將是
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
當使用重試主題數量等於設定的 maxAttempts
減 1 的策略時,最後一個重試主題(對應於 maxInterval
延遲)後綴附加索引將會是
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
如果需要多個主題,則可以使用以下設定來完成。
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
自訂命名策略
更複雜的命名策略可以透過註冊實作 RetryTopicNamesProviderFactory
的 bean 來完成。預設實作是 SuffixingRetryTopicNamesProviderFactory
,並且可以透過以下方式註冊不同的實作
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
作為範例,以下實作除了標準後綴之外,還在重試/dlt 主題名稱中新增前綴
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}