註解驅動的監聽器端點
非同步接收訊息最簡單的方式是使用註解驅動的監聽器端點基礎架構。簡而言之,它讓您可以將受管 Bean 的方法公開為 JMS 監聽器端點。以下範例示範如何使用它
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
前述範例的概念是,每當 `jakarta.jms.Destination` `myDestination` 上有訊息可用時,就會相應地調用 `processOrder` 方法(在本例中,使用 JMS 訊息的內容,類似於 `MessageListenerAdapter` 提供的功能)。
註解端點基礎架構會使用 `JmsListenerContainerFactory`,在幕後為每個註解方法建立一個訊息監聽器容器。此容器未向應用程式上下文註冊,但可以使用 `JmsListenerEndpointRegistry` Bean 輕鬆找到以進行管理。
`@JmsListener` 是 Java 8 上可重複的註解,因此您可以透過向其新增額外的 `@JmsListener` 宣告,將多個 JMS 目的地與同一個方法關聯。 |
啟用監聽器端點註解
若要啟用 `@JmsListener` 註解的支援,您可以將 `@EnableJms` 新增至您的 `@Configuration` 類別之一,如下列範例所示
-
Java
-
Kotlin
-
Xml
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DestinationResolver destinationResolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(destinationResolver);
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
}
@Configuration
@EnableJms
class JmsConfiguration {
@Bean
fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
setDestinationResolver(destinationResolver)
setSessionTransacted(true)
setConcurrency("3-10")
}
}
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="sessionTransacted" value="true"/>
<property name="concurrency" value="3-10"/>
</bean>
預設情況下,基礎架構會尋找名為 `jmsListenerContainerFactory` 的 Bean,作為工廠的來源,以用於建立訊息監聽器容器。在本例中(並忽略 JMS 基礎架構設定),您可以使用三個執行緒的核心集區大小和十個執行緒的最大集區大小來調用 `processOrder` 方法。
您可以自訂每個註解要使用的監聽器容器工廠,或者您可以透過實作 `JmsListenerConfigurer` 介面來組態明確的預設值。只有在至少一個端點在未指定容器工廠的情況下註冊時,才需要預設值。如需詳細資訊和範例,請參閱實作 `JmsListenerConfigurer` 的類別的 javadoc。
程式化端點註冊
`JmsListenerEndpoint` 提供了 JMS 端點的模型,並負責為該模型組態容器。除了 `@JmsListener` 註解偵測到的端點之外,基礎架構還允許您以程式化方式組態端點。以下範例示範如何執行此操作
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前述範例中,我們使用了 `SimpleJmsListenerEndpoint`,它提供了要調用的實際 `MessageListener`。但是,您也可以建立自己的端點變體來描述自訂調用機制。
請注意,您可以完全跳過使用 `@JmsListener`,而僅透過 `JmsListenerConfigurer` 以程式化方式註冊您的端點。
註解端點方法簽章
到目前為止,我們一直在端點中注入簡單的 `String`,但它實際上可以具有非常彈性的方法簽章。在以下範例中,我們將其重寫為注入具有自訂標頭的 `Order`
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
您可以注入 JMS 監聽器端點的主要元素如下
-
原始 `jakarta.jms.Message` 或其任何子類別(前提是它符合傳入訊息類型)。
-
`jakarta.jms.Session`,用於選擇性存取原生 JMS API(例如,用於傳送自訂回覆)。
-
代表傳入 JMS 訊息的 `org.springframework.messaging.Message`。請注意,此訊息同時包含自訂標頭和標準標頭(如 `JmsHeaders` 所定義)。
-
`@Header` 註解的方法引數,用於提取特定標頭值,包括標準 JMS 標頭。
-
一個 `@Headers` 註解的引數,也必須可指派給 `java.util.Map`,才能存取所有標頭。
-
未註解且非受支援類型(`Message` 或 `Session`)之一的元素會被視為有效負載。您可以透過使用 `@Payload` 註解參數來明確表示這一點。您也可以透過新增額外的 `@Valid` 來開啟驗證。
注入 Spring 的 `Message` 抽象化的能力對於受益於儲存在傳輸特定訊息中的所有資訊,而無需依賴傳輸特定 API 特別有用。以下範例示範如何執行此操作
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
方法引數的處理由 `DefaultMessageHandlerMethodFactory` 提供,您可以進一步自訂它以支援其他方法引數。您也可以在那裡自訂轉換和驗證支援。
例如,如果我們想確保我們的 `Order` 在處理之前是有效的,我們可以像以下範例所示,使用 `@Valid` 註解有效負載並組態必要的驗證器
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
回應管理
`MessageListenerAdapter` 中的現有支援已允許您的方法具有非 `void` 回傳類型。在這種情況下,調用的結果會封裝在 `jakarta.jms.Message` 中,並在原始訊息的 `JMSReplyTo` 標頭中指定的目的地或在監聽器上組態的預設目的地中傳送。您現在可以使用訊息傳輸抽象化的 `@SendTo` 註解來設定該預設目的地。
假設我們的 `processOrder` 方法現在應該回傳 `OrderStatus`,我們可以編寫它以自動傳送回應,如下列範例所示
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果您有多個 `@JmsListener` 註解的方法,您也可以將 `@SendTo` 註解放在類別層級,以共用預設回覆目的地。 |
如果您需要以與傳輸無關的方式設定其他標頭,您可以改為回傳 `Message`,方法類似於以下內容
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果您需要在執行階段計算回應目的地,您可以將您的回應封裝在 `JmsResponse` 實例中,該實例也提供執行階段要使用的目的地。我們可以將先前的範例重寫如下
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
最後,如果您需要為回應指定一些 QoS 值,例如優先順序或存活時間,您可以相應地組態 `JmsListenerContainerFactory`,如下列範例所示
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
QosSettings replyQosSettings = new QosSettings();
replyQosSettings.setPriority(2);
replyQosSettings.setTimeToLive(10000);
factory.setReplyQosSettings(replyQosSettings);
return factory;
}
}