RabbitMQ Stream 外掛程式的初始消費者支援
現在提供對 RabbitMQ Stream 外掛程式 的基本支援。若要啟用此功能,您必須將 spring-rabbit-stream
jar 新增至類別路徑 - 它必須與 spring-amqp
和 spring-rabbit
版本相同。
當您將 containerType 屬性設定為 stream 時,不支援上述消費者屬性;concurrency 僅支援超級串流。每個繫結只能消費單一串流佇列。 |
若要將 Binder 組態為使用 containerType=stream
,Spring Boot 將自動從應用程式屬性組態 Environment
@Bean
。您可以選擇性地新增自訂器來自訂接聽器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
傳遞至自訂器的 name
引數為 destination + '.' + group + '.container'
。
串流 name()
(用於偏移追蹤目的) 設定為繫結 destination + '.' + group
。可以使用上面顯示的 ConsumerCustomizer
變更它。如果您決定使用手動偏移追蹤,則 Context
可作為訊息標頭使用
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
請參閱 RabbitMQ Stream Java Client 文件,以取得關於組態環境和消費者建構器的資訊。
RabbitMQ 超級串流的消費者支援
請參閱 超級串流,以取得關於超級串流的資訊。
使用超級串流允許自動擴增縮減,在超級串流的每個分割區上使用單一作用中消費者。
組態範例
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
此框架將建立一個名為 super
的超級串流,具有 9 個分割區。最多可以部署此應用程式的 3 個實例。