StreamsBuilderFactoryBean 配置器

通常需要自訂建立 KafkaStreams 物件的 StreamsBuilderFactoryBean。根據 Spring Kafka 提供的底層支援,binder 允許您自訂 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanConfigurer 來客製化 StreamsBuilderFactoryBean 本身。然後,一旦您透過此配置器存取 StreamsBuilderFactoryBean,您就可以使用 KafkaStreamsCustomzier 自訂對應的 KafkaStreams。這兩個自訂器都是 Spring for Apache Kafka 專案的一部分。

以下是使用 StreamsBuilderFactoryBeanConfigurer 的範例。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

以上範例展示了您可以自訂 StreamsBuilderFactoryBean 的事項。您基本上可以從 StreamsBuilderFactoryBean 呼叫任何可用的變更操作來自訂它。此自訂器將在 factory bean 啟動之前由 binder 呼叫。

一旦您存取 StreamsBuilderFactoryBean,您也可以自訂底層的 KafkaStreams 物件。以下是執行此操作的藍圖。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 將在底層 KafkaStreams 啟動之前由 StreamsBuilderFactoryBeabn 呼叫。

整個應用程式中只能有一個 StreamsBuilderFactoryBeanConfigurer。那麼,當每個 Kafka Streams 處理器都由個別的 StreamsBuilderFactoryBean 物件支援時,我們該如何處理多個 Kafka Streams 處理器?在這種情況下,如果這些處理器的自訂需要不同,則應用程式需要根據應用程式 ID 套用一些篩選器。

例如,

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用 StreamsBuilderFactoryBeanConfigurer 註冊全域狀態儲存

如上所述,binder 未提供註冊全域狀態儲存作為功能的第一級方法。為此,您需要透過 StreamsBuilderFactoryBeanConfigurer 使用自訂器。以下是如何完成此操作。

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

必須透過 KafkaStreamsInfrastructureCustomizer 完成對 StreamsBuilder 的任何自訂,如上所示。如果呼叫 StreamsBuilderFactoryBean#getObject() 以存取 StreamsBuilder 物件,則可能無法運作,因為 bean 可能正在初始化,因此會遇到一些循環依賴問題。

如果您有多個處理器,您想要透過使用應用程式 ID 篩選掉其他 StreamsBuilderFactoryBean 物件,將全域狀態儲存附加到正確的 StreamsBuilder,如上所述。

使用 StreamsBuilderFactoryBeanConfigurer 註冊生產異常處理常式

在錯誤處理章節中,我們指出 binder 未提供處理生產異常的第一級方法。儘管如此,您仍然可以使用 StreamsBuilderFacotryBean 自訂器來註冊生產異常處理常式。請參閱下方。

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次強調,如果您有多個處理器,您可能需要針對正確的 StreamsBuilderFactoryBean 適當地設定它。您也可以使用組態屬性新增此類生產異常處理常式(請參閱下方以取得更多資訊),但如果您選擇使用程式化方法,這是一個選項。