回覆管理
MessageListenerAdapter
中現有的支援已經讓您的方法可以有非 void 傳回類型。在這種情況下,調用的結果會封裝在訊息中,並傳送到原始訊息的 ReplyToAddress
標頭中指定的位址,或傳送到監聽器上設定的預設位址。您可以使用訊息抽象概念的 @SendTo
註解來設定該預設位址。
假設我們的 processOrder
方法現在應該傳回 OrderStatus
,我們可以將其撰寫如下以自動傳送回覆
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果您需要在傳輸獨立的方式中設定其他標頭,您可以改為傳回 Message
,類似以下內容
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
或者,您可以使用 beforeSendReplyMessagePostProcessors
容器工廠屬性中的 MessagePostProcessor
來新增更多標頭。從 2.2.3 版開始,被呼叫的 bean/方法可在回覆訊息中使用,這可用於訊息後處理器中,將資訊傳達回呼叫者
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
從 2.2.5 版開始,您可以設定 ReplyPostProcessor
在傳送前回覆訊息;它會在設定 correlationId
標頭以符合請求後呼叫。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
從 3.0 版開始,您可以在容器工廠而非註解上設定後處理器。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id
參數是監聽器 ID。
註解上的設定將取代工廠設定。
@SendTo
值被假定為回覆 exchange
和 routingKey
對,其遵循 exchange/routingKey
模式,其中可以省略其中一個部分。有效值如下
-
thing1/thing2
:replyTo
交換器和routingKey
。thing1/
:replyTo
交換器和預設 (空)routingKey
。thing2
或/thing2
:replyTo
routingKey
和預設 (空) 交換器。/
或空:replyTo
預設交換器和預設routingKey
。
此外,您可以使用沒有 value
屬性的 @SendTo
。這種情況等同於空的 sendTo
模式。只有在入站訊息沒有 replyToAddress
屬性時,才會使用 @SendTo
。
從 1.5 版開始,@SendTo
值可以是 bean 初始化 SpEL 運算式,如下列範例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
運算式必須評估為 String
,它可以是簡單的佇列名稱 (傳送到預設交換器) 或具有 exchange/routingKey
形式,如前一個範例之前所討論。
#{…} 運算式在初始化期間評估一次。 |
對於動態回覆路由,訊息傳送者應包含 reply_to
訊息屬性或使用替代執行階段 SpEL 運算式 (在下一個範例之後說明)。
從 1.6 版開始,@SendTo
可以是 SpEL 運算式,在執行階段針對請求和回覆進行評估,如下列範例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 運算式的執行階段性質以 !{…}
分隔符號表示。運算式的評估上下文 #root
物件具有三個屬性
-
request
:o.s.amqp.core.Message
請求物件。 -
source
:轉換後的o.s.messaging.Message<?>
。 -
result
:方法結果。
上下文具有映射屬性存取器、標準類型轉換器和 bean 解析器,這讓上下文中的其他 bean 可以被引用 (例如,@someBeanName.determineReplyQ(request, result)
)。
總之,#{…}
在初始化期間評估一次,其中 #root
物件是應用程式上下文。Bean 按其名稱引用。!{…}
在執行階段針對每個訊息進行評估,其中根物件具有先前列出的屬性。Bean 以其名稱引用,並以 @
作為前綴。
從 2.1 版開始,也支援簡單的屬性佔位符 (例如,${some.reply.to}
)。對於早期版本,可以使用以下方法作為解決方案,如下列範例所示
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}