訊息

Spring Integration Message 是一個通用的資料容器。任何物件都可以作為酬載提供,而且每個 Message 實例都包含標頭,其中包含使用者可擴充的屬性作為鍵值對。

Message 介面

以下清單顯示了 Message 介面的定義

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息傳遞系統可以在不知道資料類型的情況下傳遞它。隨著應用程式發展以支援新類型,或當類型本身被修改或擴充時,訊息傳遞系統不會受到影響。另一方面,當訊息傳遞系統中的某些元件確實需要存取關於 Message 的資訊時,此類中繼資料通常可以儲存到訊息標頭中的中繼資料並從中檢索。

訊息標頭

正如 Spring Integration 允許將任何 Object 用作 Message 的酬載一樣,它也支援任何 Object 類型作為標頭值。實際上,MessageHeaders 類別實作了 java.util.Map_ 介面,如下列類別定義所示

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
即使 MessageHeaders 類別實作了 Map,它實際上是一個唯讀實作。任何嘗試將值 put 到 Map 中都會導致 UnsupportedOperationExceptionremoveclear 也是如此。由於訊息可能會傳遞給多個消費者,因此無法修改 Map 的結構。同樣地,訊息的酬載 Object 在初始建立後也無法 set。但是,標頭值本身(或酬載 Object)的可變性有意留給框架使用者決定。

作為 Map 的實作,可以透過呼叫 get(..) 並使用標頭名稱來檢索標頭。或者,您可以提供預期的 Class 作為額外參數。更好的是,當檢索其中一個預先定義的值時,可以使用方便的 getter。以下範例顯示了這三個選項中的每一個

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了預先定義的訊息標頭

表 1. 預先定義的訊息標頭
標頭名稱 標頭類型 用途
 MessageHeaders.ID
 java.util.UUID

此訊息實例的識別碼。每次訊息變更時都會變更。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

訊息建立的時間。每次訊息變更時都會變更。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

當未設定明確的輸出通道且沒有 ROUTING_SLIPROUTING_SLIP 已耗盡時,回覆 (如果有的話) 要傳送到的通道。如果值為 String,則它必須表示 bean 名稱或已由 ChannelRegistry 產生。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

錯誤要傳送到的通道。如果值為 String,則它必須表示 bean 名稱或已由 ChannelRegistry 產生。

許多輸入和輸出配接器實作也提供或預期某些標頭,您可以設定其他使用者定義的標頭。這些標頭的常數可以在存在此類標頭的模組中找到 — 例如。 AmqpHeadersJmsHeaders 等。

MessageHeaderAccessor API

從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息傳遞抽象已移至 spring-messaging 模組,並且引入了 MessageHeaderAccessor API,以提供對訊息傳遞實作的額外抽象。所有 (核心) Spring Integration 特定的訊息標頭常數現在都在 IntegrationMessageHeaderAccessor 類別中宣告。下表描述了預先定義的訊息標頭

表 2. 預先定義的訊息標頭
標頭名稱 標頭類型 用途
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用於關聯兩個或多個訊息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是具有 SEQUENCE_SIZE 的訊息群組中的序列號,但也可以在 <resequencer/> 中使用,以重新排序無界限的訊息群組。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

關聯訊息群組中的訊息數量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示訊息何時過期。框架不直接使用,但可以使用標頭擴充器設定,並在配置了 UnexpiredMessageSelector<filter/> 中使用。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

訊息優先順序 — 例如,在 PriorityChannel 中。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果訊息被等冪接收器攔截器偵測為重複,則為 True。請參閱 等冪接收器企業整合模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果訊息與 Closeable 相關聯,則此標頭存在,該 Closeable 應在訊息處理完成時關閉。一個範例是與使用 FTP、SFTP 等的串流檔案傳輸相關聯的 Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果訊息驅動通道配接器支援 RetryTemplate 的設定,則此標頭包含目前的傳遞嘗試。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果輸入端點支援,則回呼以接受、拒絕或重新排隊訊息。請參閱 延遲確認輪詢訊息來源MQTT 手動確認

IntegrationMessageHeaderAccessor 類別上提供了其中一些標頭的方便類型化 getter,如下列範例所示

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出現在 IntegrationMessageHeaderAccessor 中的標頭,但通常不被使用者程式碼使用(也就是說,它們通常由 Spring Integration 的內部零件使用 — 此處包含它們是為了完整性)

表 3. 預先定義的訊息標頭
標頭名稱 標頭類型 用途
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

當需要巢狀關聯時使用的關聯資料堆疊 (例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

請參閱 路由滑道

訊息 ID 產生

當訊息在應用程式中轉換時,每次變更 (例如,由轉換器) 時,都會指派新的訊息 ID。訊息 ID 是 UUID。從 Spring Integration 3.0 開始,用於 IS 產生的預設策略比之前的 java.util.UUID.randomUUID() 實作更有效率。它使用基於安全隨機種子的簡單隨機數,而不是每次都建立安全隨機數。

可以透過在應用程式內容中宣告實作 org.springframework.util.IdGenerator 的 bean 來選擇不同的 UUID 產生策略。

在一個類別載入器中只能使用一個 UUID 產生策略。這表示,如果兩個或多個應用程式內容在同一個類別載入器中執行,它們會共用相同的策略。如果其中一個內容變更了策略,則所有內容都會使用它。如果同一個類別載入器中的兩個或多個內容宣告了 org.springframework.util.IdGenerator 類型的 bean,它們都必須是同一個類別的實例。否則,嘗試取代自訂策略的內容將無法初始化。如果策略相同,但已參數化,則使用第一個初始化的內容中的策略。

除了預設策略之外,還提供了兩個額外的 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 機制。當實際上不需要 UUID 且簡單的遞增值就足夠時,可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

唯讀標頭

MessageHeaders.IDMessageHeaders.TIMESTAMP 是唯讀標頭,無法覆寫。

自 4.3.2 版起,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,以自訂不應從上游 Message 複製的標頭清單。預設情況下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是唯讀的。提供全域 spring.integration.readOnly.headers 屬性 (請參閱 全域屬性) 以自訂框架元件的 DefaultMessageBuilderFactory。當您不想由 ObjectToJsonTransformer 填入某些現成的標頭 (例如 contentType) 時,這會很有用 (請參閱 JSON 轉換器)。

當您嘗試使用 MessageBuilder 建置新訊息時,此類標頭會被忽略,並且會向記錄發出特定的 INFO 訊息。

從 5.0 版開始,訊息傳遞閘道標頭擴充器酬載擴充器標頭篩選器 不允許您在使用 DefaultMessageBuilderFactory 時設定 MessageHeaders.IDMessageHeaders.TIMESTAMP 標頭名稱,並且它們會擲回 BeanInitializationException

標頭傳播

當訊息由訊息產生端點 (例如 服務啟動器) 處理 (和修改) 時,一般而言,輸入標頭會傳播到輸出訊息。一個例外是 轉換器,當完整訊息傳回框架時。在這種情況下,使用者程式碼負責整個輸出訊息。當轉換器僅傳回酬載時,輸入標頭會傳播。此外,只有當標頭在輸出訊息中尚不存在時才會傳播標頭,讓您可以根據需要變更標頭值。

從 4.3.10 版開始,您可以設定訊息處理器 (修改訊息並產生輸出) 以抑制特定標頭的傳播。若要設定您不希望複製的標頭,請在 MessageProducingMessageHandler 抽象類別上呼叫 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您也可以透過在 META-INF/spring.integration.properties 中將 readOnlyHeaders 屬性設定為逗號分隔的標頭清單,來全域抑制特定訊息標頭的傳播。

從 5.0 版開始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 實作會套用簡單的模式 (xxx***xxx***xxxxxx*yyy) 以允許篩選具有通用字尾或字首的標頭。如需更多資訊,請參閱 PatternMatchUtils Javadoc。當其中一個模式為 * (星號) 時,不會傳播任何標頭。所有其他模式都會被忽略。在這種情況下,服務啟動器的行為與轉換器相同,任何必要的標頭都必須在從服務方法傳回的 Message 中提供。notPropagatedHeaders() 選項在 Java DSL 的 ConsumerEndpointSpec 中可用。它也適用於 <service-activator> 元件的 XML 設定,作為 not-propagated-headers 屬性。

標頭傳播抑制不適用於那些不修改訊息的端點,例如 橋接器路由器

訊息實作

Message 介面的基本實作是 GenericMessage<T>,它提供了兩個建構子,如下列清單所示

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

建立 Message 時,會產生隨機唯一 ID。接受標頭 Map 的建構子會將提供的標頭複製到新建立的 Message

還有一個方便的 Message 實作,旨在傳達錯誤狀況。此實作採用 Throwable 物件作為其酬載,如下列範例所示

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

請注意,此實作利用了 GenericMessage 基底類別已參數化的事實。因此,如兩個範例所示,在檢索 Message 酬載 Object 時不需要強制轉型。

MessageBuilder 輔助類別

您可能會注意到,Message 介面定義了其酬載和標頭的檢索方法,但未提供 setter。原因是 Message 在初始建立後無法修改。因此,當 Message 實例傳送給多個消費者時 (例如,透過發佈-訂閱通道),如果其中一個消費者需要傳送具有不同酬載類型的回覆,則它必須建立新的 Message。因此,其他消費者不會受到這些變更的影響。請記住,多個消費者可能會存取相同的酬載實例或標頭值,而此類實例本身是否不可變是由您決定的。換句話說,Message 實例的契約類似於不可修改的 Collection,而 MessageHeaders map 進一步說明了這一點。即使 MessageHeaders 類別實作了 java.util.Map,任何嘗試在 MessageHeaders 實例上調用 put 操作 (或 'remove' 或 'clear') 都會導致 UnsupportedOperationException

Spring Integration 提供了一種更方便的方式來建構訊息,而不是要求建立和填入 Map 以傳遞到 GenericMessage 建構子中:MessageBuilderMessageBuilder 提供了兩種工廠方法,用於從現有的 Message 或具有酬載 Object 建立 Message 實例。從現有的 Message 建置時,該 Message 的標頭和酬載會複製到新的 Message,如下列範例所示

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要建立具有新酬載的 Message,但仍想從現有的 Message 複製標頭,則可以使用其中一種 'copy' 方法,如下列範例所示

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

請注意,copyHeadersIfAbsent 方法不會覆寫現有值。此外,在前面的範例中,您可以看到如何使用 setHeader 設定任何使用者定義的標頭。最後,預先定義的標頭以及用於設定任何標頭的非破壞性方法 (MessageHeaders 也定義了預先定義標頭名稱的常數) 都有 set 方法可用。

您也可以使用 MessageBuilder 設定訊息的優先順序,如下列範例所示

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

只有在使用 PriorityChannel 時才會考慮 priority 標頭 (如下一章所述)。它被定義為 java.lang.Integer