接收訊息

本節說明如何在 Spring 中使用 JMS 接收訊息。

同步接收

雖然 JMS 通常與非同步處理相關聯,但您可以同步使用訊息。過載的 receive(..)` 方法提供了此功能。在同步接收期間,呼叫執行緒會阻塞,直到訊息可用為止。這可能是一個危險的操作,因為呼叫執行緒可能會無限期地被阻塞。`receiveTimeout` 屬性指定接收器在放棄等待訊息之前應等待多長時間。

非同步接收:訊息驅動 POJO

Spring 也通過使用 `@JmsListener` 註解來支援註解監聽器端點,並提供一個開放的基礎架構來以程式設計方式註冊端點。到目前為止,這是設定非同步接收器最方便的方法。請參閱 啟用監聽器端點註解 以獲取更多詳細資訊。

以類似於 EJB 世界中的訊息驅動 Bean (MDB) 的方式,訊息驅動 POJO (MDP) 充當 JMS 訊息的接收器。對 MDP 的一個限制(但請參閱 使用 MessageListenerAdapter)是它必須實作 `jakarta.jms.MessageListener` 介面。請注意,如果您的 POJO 在多個執行緒上接收訊息,則務必確保您的實作是執行緒安全的。

以下範例顯示 MDP 的簡單實作

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

一旦您實作了您的 `MessageListener`,就該建立訊息監聽器容器了。

以下範例顯示如何定義和配置 Spring 附帶的訊息監聽器容器之一(在本例中為 `DefaultMessageListenerContainer`)

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

請參閱各種訊息監聽器容器的 Spring javadoc(所有這些容器都實作了 MessageListenerContainer),以獲取每個實作支援的功能的完整描述。

使用 SessionAwareMessageListener 介面

SessionAwareMessageListener 介面是一個 Spring 特定的介面,它提供與 JMS `MessageListener` 介面類似的契約,但也使訊息處理方法可以存取接收 `Message` 的 JMS `Session`。以下列表顯示了 `SessionAwareMessageListener` 介面的定義

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果您希望您的 MDP 能夠回應任何接收到的訊息(通過使用 `onMessage(Message, Session)` 方法中提供的 `Session`),您可以選擇讓您的 MDP 實作此介面(優先於標準 JMS `MessageListener` 介面)。所有 Spring 附帶的訊息監聽器容器實作都支援實作 `MessageListener` 或 `SessionAwareMessageListener` 介面的 MDP。實作 `SessionAwareMessageListener` 的類別帶有一個警告,即它們通過介面綁定到 Spring。是否使用它的選擇完全取決於您作為應用程式開發人員或架構師。

請注意,`SessionAwareMessageListener` 介面的 `onMessage(..)` 方法會拋出 `JMSException`。與標準 JMS `MessageListener` 介面相反,當使用 `SessionAwareMessageListener` 介面時,用戶端程式碼有責任處理任何拋出的例外。

使用 MessageListenerAdapter

MessageListenerAdapter 類別是 Spring 非同步訊息支援中的最後一個元件。簡而言之,它允許您將幾乎任何類別公開為 MDP(儘管有一些限制)。

考慮以下介面定義

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

請注意,儘管該介面既沒有擴展 `MessageListener` 介面也沒有擴展 `SessionAwareMessageListener` 介面,但您仍然可以使用 `MessageListenerAdapter` 類別將其用作 MDP。另請注意,各種訊息處理方法如何根據它們可以接收和處理的各種 `Message` 類型的內容進行強類型化。

現在考慮以下 `MessageDelegate` 介面的實作

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特別是,請注意前面的 `MessageDelegate` 介面實作(`DefaultMessageDelegate` 類別)根本沒有 JMS 相依性。它確實是一個 POJO,我們可以通過以下組態將其變成 MDP

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一個範例顯示了另一個 MDP,它只能處理接收 JMS `TextMessage` 訊息。請注意,訊息處理方法實際上被稱為 `receive`(`MessageListenerAdapter` 中的訊息處理方法的名稱預設為 `handleMessage`),但它是可配置的(您可以在本節稍後看到)。另請注意,`receive(..)` 方法是如何被強類型化為僅接收和回應 JMS `TextMessage` 訊息。以下列表顯示了 `TextMessageDelegate` 介面的定義

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下列表顯示了一個實作 `TextMessageDelegate` 介面的類別

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

隨附的 `MessageListenerAdapter` 的組態將如下所示

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

請注意,如果 `messageListener` 接收到類型不是 `TextMessage` 的 JMS `Message`,則會拋出 `IllegalStateException`(並隨後被吞噬)。`MessageListenerAdapter` 類別的另一個功能是,如果處理器方法傳回非 void 值,則能夠自動發送回回應 `Message`。考慮以下介面和類別

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果您將 `DefaultResponsiveTextMessageDelegate` 與 `MessageListenerAdapter` 結合使用,則從執行 `'receive(..)'` 方法傳回的任何非 null 值(在預設配置中)都會轉換為 `TextMessage`。然後將產生的 `TextMessage` 發送到原始 `Message` 的 JMS `Reply-To` 屬性或 `MessageListenerAdapter` 上設定的預設 `Destination`(如果已配置)中定義的 `Destination`(如果存在)。如果未找到 `Destination`,則會拋出 `InvalidDestinationException`(請注意,此例外不會被吞噬並向上傳播到呼叫堆疊)。

在交易中處理訊息

在交易中調用訊息監聽器僅需要重新配置監聽器容器。

您可以通過監聽器容器定義上的 `sessionTransacted` 標誌來啟用本地資源交易。然後,每個訊息監聽器調用都在活動的 JMS 交易中操作,如果監聽器執行失敗,則訊息接收會回滾。發送回應訊息(通過 `SessionAwareMessageListener`)是同一個本地交易的一部分,但任何其他資源操作(例如資料庫存取)都是獨立操作的。這通常需要在監聽器實作中進行重複訊息檢測,以涵蓋資料庫處理已提交但訊息處理提交失敗的情況。

考慮以下 Bean 定義

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要參與外部管理的交易,您需要配置交易管理器並使用支援外部管理交易的監聽器容器(通常為 `DefaultMessageListenerContainer`)。

要為 XA 交易參與配置訊息監聽器容器,您需要配置 `JtaTransactionManager`(預設情況下,它委派給 Jakarta EE 伺服器的交易子系統)。請注意,底層 JMS `ConnectionFactory` 需要具有 XA 功能,並在您的 JTA 交易協調器中正確註冊。(檢查您的 Jakarta EE 伺服器的 JNDI 資源配置。)這使得訊息接收以及(例如)資料庫存取可以成為同一交易的一部分(具有統一的提交語意,但以 XA 交易日誌開銷為代價)。

以下 Bean 定義建立交易管理器

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然後我們需要將其添加到我們之前的容器配置中。容器會處理其餘部分。以下範例顯示如何執行此操作

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>