RabbitMQ Stream 外掛程式的初始消費者支援

現在提供對 RabbitMQ Stream 外掛程式 的基本支援。若要啟用此功能,您必須將 spring-rabbit-stream jar 新增至類別路徑 - 它必須與 spring-amqpspring-rabbit 版本相同。

當您將 containerType 屬性設定為 stream 時,不支援上述消費者屬性;concurrency 僅支援超級串流。每個繫結只能消費單一串流佇列。

若要將 Binder 組態為使用 containerType=stream,Spring Boot 將自動從應用程式屬性組態 Environment @Bean。您可以選擇性地新增自訂器來自訂接聽器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

傳遞至自訂器的 name 引數為 destination + '.' + group + '.container'

串流 name() (用於偏移追蹤目的) 設定為繫結 destination + '.' + group。可以使用上面顯示的 ConsumerCustomizer 變更它。如果您決定使用手動偏移追蹤,則 Context 可作為訊息標頭使用

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

請參閱 RabbitMQ Stream Java Client 文件,以取得關於組態環境和消費者建構器的資訊。

RabbitMQ 超級串流的消費者支援

請參閱 超級串流,以取得關於超級串流的資訊。

使用超級串流允許自動擴增縮減,在超級串流的每個分割區上使用單一作用中消費者。

組態範例

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

此框架將建立一個名為 super 的超級串流,具有 9 個分割區。最多可以部署此應用程式的 3 個實例。