RabbitMQ Stream 外掛程式的初始生產者支援

現在提供對 RabbitMQ Stream 外掛程式 的基本支援。若要啟用此功能,您必須將 spring-rabbit-stream jar 新增至類別路徑 - 它必須與 spring-amqpspring-rabbit 版本相同。

當您將 producerType 屬性設定為 STREAM_SYNCSTREAM_ASYNC 時,不支援上述的生產者屬性。

若要設定 Binder 使用 stream ProducerType,Spring Boot 將從應用程式屬性組態 Environment @Bean。您可以選擇性地新增一個自訂器來自訂訊息處理器。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

請參閱 RabbitMQ Stream Java Client 文件,以取得關於組態環境和生產者建構器的資訊。

RabbitMQ Super Streams 的生產者支援

請參閱 Super Streams 以取得關於 super streams 的資訊。

使用 super streams 允許自動擴增縮減,在 super stream 的每個分割區上都有單一活動消費者。使用 Spring Cloud Stream,您可以透過 AMQP 或使用 stream client 發布到 super stream。

super stream 必須已存在;生產者綁定不支援建立 super stream。

透過 AMQP 發布到 super stream

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用 stream client 發布到 super stream

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

當使用 stream client 時,如果您設定 confirmAckChannel,成功發送訊息的副本將會被發送到該通道。