使用 RabbitMQ Stream 外掛程式
2.4 版引入了對 RabbitMQ Stream Plugin Java Client 的初始支援,適用於 RabbitMQ Stream 外掛程式。
-
RabbitStreamTemplate
-
StreamListenerContainer
將 spring-rabbit-stream
相依性新增至您的專案
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.1.6</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.6'
您可以像平常一樣佈建佇列,使用 RabbitAdmin
bean,並使用 QueueBuilder.stream()
方法來指定佇列類型。例如
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,這僅在您也使用非串流組件(例如 SimpleMessageListenerContainer
或 DirectMessageListenerContainer
)時才有效,因為當 AMQP 連線開啟時,admin 會被觸發以宣告定義的 bean。如果您的應用程式僅使用串流組件,或者您希望使用進階串流設定功能,則應改為設定 StreamAdmin
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
有關 StreamCreator
的更多資訊,請參閱 RabbitMQ 文件。
傳送訊息
RabbitStreamTemplate
提供 RabbitTemplate
(AMQP) 功能的子集。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
RabbitStreamTemplate
實作具有以下建構子和屬性
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
MessageConverter
用於 convertAndSend
方法中,將物件轉換為 Spring AMQP Message
。
StreamMessageConverter
用於將 Spring AMQP Message
轉換為原生串流 Message
。
您也可以直接傳送原生串流 Message
;messageBuilder()
方法提供對 Producer
的訊息建構器的存取。
ProducerCustomizer
提供一種機制,可在建立生產者之前自訂生產者。
有關自訂 Environment
和 Producer
的資訊,請參閱 Java Client 文件。
從 3.0 版開始,方法傳回類型為 CompletableFuture 而不是 ListenableFuture 。 |
接收訊息
非同步訊息接收由 StreamListenerContainer
(以及使用 @RabbitListener
時的 StreamRabbitListenerContainerFactory
)提供。
監聽器容器需要 Environment
以及單一串流名稱。
您可以使用傳統的 MessageListener
接收 Spring AMQP Message
,也可以使用新的介面接收原生串流 Message
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有關支援屬性的資訊,請參閱 訊息監聽器容器設定。
與範本類似,容器具有 ConsumerCustomizer
屬性。
有關自訂 Environment
和 Consumer
的資訊,請參閱 Java Client 文件。
使用 @RabbitListener
時,請設定 StreamRabbitListenerContainerFactory
;目前,大多數 @RabbitListener
屬性(concurrency
等)都會被忽略。僅支援 id
、queues
、autoStartup
和 containerFactory
。此外,queues
只能包含一個串流名稱。
範例
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
2.4.5 版將 adviceChain
屬性新增至 StreamListenerContainer
(及其工廠)。還提供了一個新的工廠 bean,用於建立無狀態重試攔截器,其中包含可選的 StreamMessageRecoverer
,用於在取用原始串流訊息時使用。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支援有狀態重試。 |
超級串流
超級串流是分割串流的抽象概念,透過將許多串流佇列繫結到具有引數 x-super-stream: true
的交換器來實作。
佈建
為了方便起見,可以透過定義 SuperStream
類型的單一 bean 來佈建超級串流。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
RabbitAdmin
偵測到此 bean,並將宣告交換器 (my.super.stream
) 和 3 個佇列(分割區)- my.super-stream-n
,其中 n
為 0
、1
、2
,並以等於 n
的路由金鑰繫結。
如果您也希望透過 AMQP 發佈到交換器,您可以提供自訂路由金鑰
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
金鑰數量必須等於分割區數量。
生產至超級串流
您必須將 superStreamRoutingFunction
新增至 RabbitStreamTemplate
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您也可以使用 RabbitTemplate
透過 AMQP 發佈。
使用單一作用中消費者取用超級串流
在監聽器容器上調用 superStream
方法,以在超級串流上啟用單一作用中消費者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
目前,當並行性大於 1 時,實際並行性會進一步由 Environment 控制;為了實現完全並行性,請將環境的 maxConsumersByConnection 設定為 1。請參閱 設定環境。 |
Micrometer 觀察
自 3.0.5 版以來,現在支援使用 Micrometer 進行觀察,適用於 RabbitStreamTemplate
和串流監聽器容器。容器現在也支援 Micrometer 計時器(在未啟用觀察時)。
在每個組件上設定 observationEnabled
以啟用觀察;這將停用 Micrometer 計時器,因為計時器現在將透過每個觀察進行管理。使用註解監聽器時,請在容器工廠上設定 observationEnabled
。
有關更多資訊,請參閱 Micrometer Tracing。
若要將標籤新增至計時器/追蹤,請分別為範本或監聽器容器設定自訂 RabbitStreamTemplateObservationConvention
或 RabbitStreamListenerObservationConvention
。
預設實作會為範本觀察新增 name
標籤,並為容器新增 listener.id
標籤。
您可以子類別化 DefaultRabbitStreamTemplateObservationConvention
或 DefaultStreamRabbitListenerObservationConvention
,或提供全新的實作。
有關更多詳細資訊,請參閱 Micrometer 觀察文件。