連線到 Kafka

從 2.5 版開始,這些都擴充了 KafkaResourceFactory。這允許在執行時期透過將 Supplier<String> 新增至其組態來變更啟動伺服器:setBootstrapServersSupplier(() -> …​)。這將在所有新連線中呼叫以取得伺服器列表。消費者和生產者通常是長期存在的。若要關閉現有的生產者,請在 DefaultKafkaProducerFactory 上呼叫 reset()。若要關閉現有的消費者,請在 KafkaListenerEndpointRegistry 上呼叫 stop()(然後 start())和/或在任何其他監聽器容器 bean 上呼叫 stop()start()

為了方便起見,框架也提供了一個 ABSwitchCluster,它支援兩組啟動伺服器;其中一組在任何時候都是活動的。設定 ABSwitchCluster 並透過呼叫 setBootstrapServersSupplier() 將其新增至生產者和消費者工廠以及 KafkaAdmin。當您想要切換時,請呼叫 primary()secondary() 並在生產者工廠上呼叫 reset() 以建立新的連線;對於消費者,stop()start() 所有監聽器容器。當使用 @KafkaListener 時,stop()start() KafkaListenerEndpointRegistry bean。

請參閱 Javadoc 以取得更多資訊。

工廠監聽器

從 2.5 版開始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以使用 Listener 進行組態,以便在每次建立或關閉生產者或消費者時接收通知。

生產者工廠監聽器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消費者工廠監聽器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每種情況下,id 都是透過將 client-id 屬性(從建立後的 metrics() 取得)附加到工廠 beanName 屬性(以 . 分隔)來建立的。

這些監聽器可以用於,例如,在建立新用戶端時建立和繫結 Micrometer KafkaClientMetrics 實例(並在用戶端關閉時關閉它)。

框架提供了完全做到這一點的監聽器;請參閱 Micrometer 原生指標

預設用戶端 ID 前綴

從 3.2 版開始,對於使用 spring.application.name 屬性定義應用程式名稱的 Spring Boot 應用程式,此名稱現在用作這些用戶端類型的自動產生用戶端 ID 的預設前綴

  • 不使用消費者群組的消費者用戶端

  • 生產者用戶端

  • 管理用戶端

這使得在伺服器端更容易識別這些用戶端,以便進行疑難排解或套用配額。

表 1. 具有 spring.application.name=myapp 的 Spring Boot 應用程式的範例用戶端 ID
用戶端類型 沒有應用程式名稱 具有應用程式名稱

沒有消費者群組的消費者

consumer-null-1

myapp-consumer-1

具有消費者群組 "mygroup" 的消費者

consumer-mygroup-1

consumer-mygroup-1

生產者

producer-1

myapp-producer-1

管理員

adminclient-1

myapp-admin-1