容器工廠
如 @KafkaListener
註解 中所述,ConcurrentKafkaListenerContainerFactory
用於為註解方法建立容器。
從 2.2 版開始,您可以使用相同的工廠來建立任何 ConcurrentMessageListenerContainer
。如果您想要建立多個具有相似屬性的容器,或者您希望使用一些外部設定的工廠(例如 Spring Boot 自動組態提供的工廠),這可能會很有用。建立容器後,您可以進一步修改其屬性,其中許多屬性是透過使用 container.getContainerProperties()
進行設定的。以下範例設定了 ConcurrentMessageListenerContainer
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以這種方式建立的容器不會新增至端點登錄檔。它們應該建立為 @Bean 定義,以便在應用程式內容中註冊。 |
從 2.3.4 版開始,您可以將 ContainerCustomizer
新增至工廠,以在建立和設定每個容器後進一步設定它們。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
從 3.1 版開始,也可以透過在 KafkaListener 註解上指定 'ContainerPostProcessor' 的 bean 名稱,在單個監聽器上應用相同種類的自訂。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)