容器工廠

@KafkaListener 註解 中所述,ConcurrentKafkaListenerContainerFactory 用於為註解方法建立容器。

從 2.2 版開始,您可以使用相同的工廠來建立任何 ConcurrentMessageListenerContainer。如果您想要建立多個具有相似屬性的容器,或者您希望使用一些外部設定的工廠(例如 Spring Boot 自動組態提供的工廠),這可能會很有用。建立容器後,您可以進一步修改其屬性,其中許多屬性是透過使用 container.getContainerProperties() 進行設定的。以下範例設定了 ConcurrentMessageListenerContainer

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
以這種方式建立的容器不會新增至端點登錄檔。它們應該建立為 @Bean 定義,以便在應用程式內容中註冊。

從 2.3.4 版開始,您可以將 ContainerCustomizer 新增至工廠,以在建立和設定每個容器後進一步設定它們。

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

從 3.1 版開始,也可以透過在 KafkaListener 註解上指定 'ContainerPostProcessor' 的 bean 名稱,在單個監聽器上應用相同種類的自訂。

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)