連線到 Kafka
從 2.5 版開始,這些都擴充了 KafkaResourceFactory
。這允許在執行時期透過將 Supplier<String>
新增至其組態來變更啟動伺服器:setBootstrapServersSupplier(() -> …)
。這將在所有新連線中呼叫以取得伺服器列表。消費者和生產者通常是長期存在的。若要關閉現有的生產者,請在 DefaultKafkaProducerFactory
上呼叫 reset()
。若要關閉現有的消費者,請在 KafkaListenerEndpointRegistry
上呼叫 stop()
(然後 start()
)和/或在任何其他監聽器容器 bean 上呼叫 stop()
和 start()
。
為了方便起見,框架也提供了一個 ABSwitchCluster
,它支援兩組啟動伺服器;其中一組在任何時候都是活動的。設定 ABSwitchCluster
並透過呼叫 setBootstrapServersSupplier()
將其新增至生產者和消費者工廠以及 KafkaAdmin
。當您想要切換時,請呼叫 primary()
或 secondary()
並在生產者工廠上呼叫 reset()
以建立新的連線;對於消費者,stop()
和 start()
所有監聽器容器。當使用 @KafkaListener
時,stop()
和 start()
KafkaListenerEndpointRegistry
bean。
請參閱 Javadoc 以取得更多資訊。
工廠監聽器
從 2.5 版開始,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
可以使用 Listener
進行組態,以便在每次建立或關閉生產者或消費者時接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每種情況下,id
都是透過將 client-id
屬性(從建立後的 metrics()
取得)附加到工廠 beanName
屬性(以 .
分隔)來建立的。
這些監聽器可以用於,例如,在建立新用戶端時建立和繫結 Micrometer KafkaClientMetrics
實例(並在用戶端關閉時關閉它)。
框架提供了完全做到這一點的監聽器;請參閱 Micrometer 原生指標。
預設用戶端 ID 前綴
從 3.2 版開始,對於使用 spring.application.name
屬性定義應用程式名稱的 Spring Boot 應用程式,此名稱現在用作這些用戶端類型的自動產生用戶端 ID 的預設前綴
-
不使用消費者群組的消費者用戶端
-
生產者用戶端
-
管理用戶端
這使得在伺服器端更容易識別這些用戶端,以便進行疑難排解或套用配額。
用戶端類型 | 沒有應用程式名稱 | 具有應用程式名稱 |
---|---|---|
沒有消費者群組的消費者 |
consumer-null-1 |
myapp-consumer-1 |
具有消費者群組 "mygroup" 的消費者 |
consumer-mygroup-1 |
consumer-mygroup-1 |
生產者 |
producer-1 |
myapp-producer-1 |
管理員 |
adminclient-1 |
myapp-admin-1 |