StreamsBuilderFactoryBean 配置器
通常需要自訂建立 KafkaStreams
物件的 StreamsBuilderFactoryBean
。根據 Spring Kafka 提供的底層支援,binder 允許您自訂 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanConfigurer
來客製化 StreamsBuilderFactoryBean
本身。然後,一旦您透過此配置器存取 StreamsBuilderFactoryBean
,您就可以使用 KafkaStreamsCustomzier
自訂對應的 KafkaStreams
。這兩個自訂器都是 Spring for Apache Kafka 專案的一部分。
以下是使用 StreamsBuilderFactoryBeanConfigurer
的範例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上範例展示了您可以自訂 StreamsBuilderFactoryBean
的事項。您基本上可以從 StreamsBuilderFactoryBean
呼叫任何可用的變更操作來自訂它。此自訂器將在 factory bean 啟動之前由 binder 呼叫。
一旦您存取 StreamsBuilderFactoryBean
,您也可以自訂底層的 KafkaStreams
物件。以下是執行此操作的藍圖。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
將在底層 KafkaStreams
啟動之前由 StreamsBuilderFactoryBeabn
呼叫。
整個應用程式中只能有一個 StreamsBuilderFactoryBeanConfigurer
。那麼,當每個 Kafka Streams 處理器都由個別的 StreamsBuilderFactoryBean
物件支援時,我們該如何處理多個 Kafka Streams 處理器?在這種情況下,如果這些處理器的自訂需要不同,則應用程式需要根據應用程式 ID 套用一些篩選器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 註冊全域狀態儲存
如上所述,binder 未提供註冊全域狀態儲存作為功能的第一級方法。為此,您需要透過 StreamsBuilderFactoryBeanConfigurer
使用自訂器。以下是如何完成此操作。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
必須透過 KafkaStreamsInfrastructureCustomizer
完成對 StreamsBuilder
的任何自訂,如上所示。如果呼叫 StreamsBuilderFactoryBean#getObject()
以存取 StreamsBuilder
物件,則可能無法運作,因為 bean 可能正在初始化,因此會遇到一些循環依賴問題。
如果您有多個處理器,您想要透過使用應用程式 ID 篩選掉其他 StreamsBuilderFactoryBean
物件,將全域狀態儲存附加到正確的 StreamsBuilder
,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 註冊生產異常處理常式
在錯誤處理章節中,我們指出 binder 未提供處理生產異常的第一級方法。儘管如此,您仍然可以使用 StreamsBuilderFacotryBean
自訂器來註冊生產異常處理常式。請參閱下方。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次強調,如果您有多個處理器,您可能需要針對正確的 StreamsBuilderFactoryBean
適當地設定它。您也可以使用組態屬性新增此類生產異常處理常式(請參閱下方以取得更多資訊),但如果您選擇使用程式化方法,這是一個選項。