回覆管理

MessageListenerAdapter 中現有的支援已經讓您的方法可以有非 void 傳回類型。在這種情況下,調用的結果會封裝在訊息中,並傳送到原始訊息的 ReplyToAddress 標頭中指定的位址,或傳送到監聽器上設定的預設位址。您可以使用訊息抽象概念的 @SendTo 註解來設定該預設位址。

假設我們的 processOrder 方法現在應該傳回 OrderStatus,我們可以將其撰寫如下以自動傳送回覆

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果您需要在傳輸獨立的方式中設定其他標頭,您可以改為傳回 Message,類似以下內容

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

或者,您可以使用 beforeSendReplyMessagePostProcessors 容器工廠屬性中的 MessagePostProcessor 來新增更多標頭。從 2.2.3 版開始,被呼叫的 bean/方法可在回覆訊息中使用,這可用於訊息後處理器中,將資訊傳達回呼叫者

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

從 2.2.5 版開始,您可以設定 ReplyPostProcessor 在傳送前回覆訊息;它會在設定 correlationId 標頭以符合請求後呼叫。

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

從 3.0 版開始,您可以在容器工廠而非註解上設定後處理器。

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

id 參數是監聽器 ID。

註解上的設定將取代工廠設定。

@SendTo 值被假定為回覆 exchangeroutingKey 對,其遵循 exchange/routingKey 模式,其中可以省略其中一個部分。有效值如下

  • thing1/thing2replyTo 交換器和 routingKeything1/replyTo 交換器和預設 (空) routingKeything2/thing2replyTo routingKey 和預設 (空) 交換器。 / 或空:replyTo 預設交換器和預設 routingKey

此外,您可以使用沒有 value 屬性的 @SendTo。這種情況等同於空的 sendTo 模式。只有在入站訊息沒有 replyToAddress 屬性時,才會使用 @SendTo

從 1.5 版開始,@SendTo 值可以是 bean 初始化 SpEL 運算式,如下列範例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

運算式必須評估為 String,它可以是簡單的佇列名稱 (傳送到預設交換器) 或具有 exchange/routingKey 形式,如前一個範例之前所討論。

#{…​} 運算式在初始化期間評估一次。

對於動態回覆路由,訊息傳送者應包含 reply_to 訊息屬性或使用替代執行階段 SpEL 運算式 (在下一個範例之後說明)。

從 1.6 版開始,@SendTo 可以是 SpEL 運算式,在執行階段針對請求和回覆進行評估,如下列範例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 運算式的執行階段性質以 !{…​} 分隔符號表示。運算式的評估上下文 #root 物件具有三個屬性

  • requesto.s.amqp.core.Message 請求物件。

  • source:轉換後的 o.s.messaging.Message<?>

  • result:方法結果。

上下文具有映射屬性存取器、標準類型轉換器和 bean 解析器,這讓上下文中的其他 bean 可以被引用 (例如,@someBeanName.determineReplyQ(request, result))。

總之,#{…​} 在初始化期間評估一次,其中 #root 物件是應用程式上下文。Bean 按其名稱引用。!{…​} 在執行階段針對每個訊息進行評估,其中根物件具有先前列出的屬性。Bean 以其名稱引用,並以 @ 作為前綴。

從 2.1 版開始,也支援簡單的屬性佔位符 (例如,${some.reply.to})。對於早期版本,可以使用以下方法作為解決方案,如下列範例所示

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}