在 Spring 中管理的 Producer Interceptor

從 3.0.0 版本開始,當涉及到 producer interceptor 時,您可以讓 Spring 直接將其作為 bean 管理,而不是將 interceptor 的類別名稱提供給 Apache Kafka producer 設定。如果您採用這種方法,則需要在 KafkaTemplate 上設定此 producer interceptor。以下範例使用與上述相同的 MyProducerInterceptor,但變更為不使用內部組態屬性。

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在記錄傳送之前,會先調用 producer interceptor 的 onSend 方法。一旦伺服器傳送關於發布資料的確認,則會調用 onAcknowledgement 方法。onAcknowledgement 會在 producer 調用任何使用者回呼之前調用。

如果您有多個透過 Spring 管理的 producer interceptor 需要應用於 KafkaTemplate,則需要改用 CompositeProducerInterceptorCompositeProducerInterceptor 允許依序新增個別的 producer interceptor。底層 ProducerInterceptor 實作中的方法會按照它們新增到 CompositeProducerInterceptor 的順序調用。