RabbitMQ Stream 外掛程式的初始生產者支援
現在提供對 RabbitMQ Stream 外掛程式 的基本支援。若要啟用此功能,您必須將 spring-rabbit-stream
jar 新增至類別路徑 - 它必須與 spring-amqp
和 spring-rabbit
版本相同。
當您將 producerType 屬性設定為 STREAM_SYNC 或 STREAM_ASYNC 時,不支援上述的生產者屬性。 |
若要設定 Binder 使用 stream ProducerType
,Spring Boot 將從應用程式屬性組態 Environment
@Bean
。您可以選擇性地新增一個自訂器來自訂訊息處理器。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
請參閱 RabbitMQ Stream Java Client 文件,以取得關於組態環境和生產者建構器的資訊。
RabbitMQ Super Streams 的生產者支援
請參閱 Super Streams 以取得關於 super streams 的資訊。
使用 super streams 允許自動擴增縮減,在 super stream 的每個分割區上都有單一活動消費者。使用 Spring Cloud Stream,您可以透過 AMQP 或使用 stream client 發布到 super stream。
super stream 必須已存在;生產者綁定不支援建立 super stream。 |
透過 AMQP 發布到 super stream
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用 stream client 發布到 super stream
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
當使用 stream client 時,如果您設定 confirmAckChannel
,成功發送訊息的副本將會被發送到該通道。