訊息
Spring Integration Message
是一個通用的資料容器。任何物件都可以作為酬載提供,而且每個 Message
實例都包含標頭,其中包含使用者可擴充的屬性作為鍵值對。
Message
介面
以下清單顯示了 Message
介面的定義
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message
介面是 API 的核心部分。透過將資料封裝在通用包裝器中,訊息傳遞系統可以在不知道資料類型的情況下傳遞它。隨著應用程式發展以支援新類型,或當類型本身被修改或擴充時,訊息傳遞系統不會受到影響。另一方面,當訊息傳遞系統中的某些元件確實需要存取關於 Message
的資訊時,此類中繼資料通常可以儲存到訊息標頭中的中繼資料並從中檢索。
訊息標頭
正如 Spring Integration 允許將任何 Object
用作 Message
的酬載一樣,它也支援任何 Object
類型作為標頭值。實際上,MessageHeaders
類別實作了 java.util.Map_
介面,如下列類別定義所示
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
即使 MessageHeaders 類別實作了 Map ,它實際上是一個唯讀實作。任何嘗試將值 put 到 Map 中都會導致 UnsupportedOperationException 。remove 和 clear 也是如此。由於訊息可能會傳遞給多個消費者,因此無法修改 Map 的結構。同樣地,訊息的酬載 Object 在初始建立後也無法 set 。但是,標頭值本身(或酬載 Object)的可變性有意留給框架使用者決定。 |
作為 Map
的實作,可以透過呼叫 get(..)
並使用標頭名稱來檢索標頭。或者,您可以提供預期的 Class
作為額外參數。更好的是,當檢索其中一個預先定義的值時,可以使用方便的 getter。以下範例顯示了這三個選項中的每一個
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了預先定義的訊息標頭
標頭名稱 | 標頭類型 | 用途 |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
此訊息實例的識別碼。每次訊息變更時都會變更。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
訊息建立的時間。每次訊息變更時都會變更。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
當未設定明確的輸出通道且沒有 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
錯誤要傳送到的通道。如果值為 |
許多輸入和輸出配接器實作也提供或預期某些標頭,您可以設定其他使用者定義的標頭。這些標頭的常數可以在存在此類標頭的模組中找到 — 例如。 AmqpHeaders
、JmsHeaders
等。
MessageHeaderAccessor
API
從 Spring Framework 4.0 和 Spring Integration 4.0 開始,核心訊息傳遞抽象已移至 spring-messaging
模組,並且引入了 MessageHeaderAccessor
API,以提供對訊息傳遞實作的額外抽象。所有 (核心) Spring Integration 特定的訊息標頭常數現在都在 IntegrationMessageHeaderAccessor
類別中宣告。下表描述了預先定義的訊息標頭
標頭名稱 | 標頭類型 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用於關聯兩個或多個訊息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是具有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
關聯訊息群組中的訊息數量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示訊息何時過期。框架不直接使用,但可以使用標頭擴充器設定,並在配置了 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
訊息優先順序 — 例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果訊息被等冪接收器攔截器偵測為重複,則為 True。請參閱 等冪接收器企業整合模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果訊息與 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果訊息驅動通道配接器支援 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果輸入端點支援,則回呼以接受、拒絕或重新排隊訊息。請參閱 延遲確認輪詢訊息來源 和 MQTT 手動確認。 |
IntegrationMessageHeaderAccessor
類別上提供了其中一些標頭的方便類型化 getter,如下列範例所示
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出現在 IntegrationMessageHeaderAccessor
中的標頭,但通常不被使用者程式碼使用(也就是說,它們通常由 Spring Integration 的內部零件使用 — 此處包含它們是為了完整性)
標頭名稱 | 標頭類型 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
當需要巢狀關聯時使用的關聯資料堆疊 (例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
請參閱 路由滑道。 |
訊息 ID 產生
當訊息在應用程式中轉換時,每次變更 (例如,由轉換器) 時,都會指派新的訊息 ID。訊息 ID 是 UUID
。從 Spring Integration 3.0 開始,用於 IS 產生的預設策略比之前的 java.util.UUID.randomUUID()
實作更有效率。它使用基於安全隨機種子的簡單隨機數,而不是每次都建立安全隨機數。
可以透過在應用程式內容中宣告實作 org.springframework.util.IdGenerator
的 bean 來選擇不同的 UUID 產生策略。
在一個類別載入器中只能使用一個 UUID 產生策略。這表示,如果兩個或多個應用程式內容在同一個類別載入器中執行,它們會共用相同的策略。如果其中一個內容變更了策略,則所有內容都會使用它。如果同一個類別載入器中的兩個或多個內容宣告了 org.springframework.util.IdGenerator 類型的 bean,它們都必須是同一個類別的實例。否則,嘗試取代自訂策略的內容將無法初始化。如果策略相同,但已參數化,則使用第一個初始化的內容中的策略。 |
除了預設策略之外,還提供了兩個額外的 IdGenerators
。org.springframework.util.JdkIdGenerator
使用之前的 UUID.randomUUID()
機制。當實際上不需要 UUID 且簡單的遞增值就足夠時,可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
。
唯讀標頭
MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是唯讀標頭,無法覆寫。
自 4.3.2 版起,MessageBuilder
提供了 readOnlyHeaders(String… readOnlyHeaders)
API,以自訂不應從上游 Message
複製的標頭清單。預設情況下,只有 MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是唯讀的。提供全域 spring.integration.readOnly.headers
屬性 (請參閱 全域屬性) 以自訂框架元件的 DefaultMessageBuilderFactory
。當您不想由 ObjectToJsonTransformer
填入某些現成的標頭 (例如 contentType
) 時,這會很有用 (請參閱 JSON 轉換器)。
當您嘗試使用 MessageBuilder
建置新訊息時,此類標頭會被忽略,並且會向記錄發出特定的 INFO
訊息。
標頭傳播
當訊息由訊息產生端點 (例如 服務啟動器) 處理 (和修改) 時,一般而言,輸入標頭會傳播到輸出訊息。一個例外是 轉換器,當完整訊息傳回框架時。在這種情況下,使用者程式碼負責整個輸出訊息。當轉換器僅傳回酬載時,輸入標頭會傳播。此外,只有當標頭在輸出訊息中尚不存在時才會傳播標頭,讓您可以根據需要變更標頭值。
從 4.3.10 版開始,您可以設定訊息處理器 (修改訊息並產生輸出) 以抑制特定標頭的傳播。若要設定您不希望複製的標頭,請在 MessageProducingMessageHandler
抽象類別上呼叫 setNotPropagatedHeaders()
或 addNotPropagatedHeaders()
方法。
您也可以透過在 META-INF/spring.integration.properties
中將 readOnlyHeaders
屬性設定為逗號分隔的標頭清單,來全域抑制特定訊息標頭的傳播。
從 5.0 版開始,AbstractMessageProducingHandler
上的 setNotPropagatedHeaders()
實作會套用簡單的模式 (xxx*
、**xxx**
、*xxx
或 xxx*yyy
) 以允許篩選具有通用字尾或字首的標頭。如需更多資訊,請參閱 PatternMatchUtils
Javadoc。當其中一個模式為 *
(星號) 時,不會傳播任何標頭。所有其他模式都會被忽略。在這種情況下,服務啟動器的行為與轉換器相同,任何必要的標頭都必須在從服務方法傳回的 Message
中提供。notPropagatedHeaders()
選項在 Java DSL 的 ConsumerEndpointSpec
中可用。它也適用於 <service-activator>
元件的 XML 設定,作為 not-propagated-headers
屬性。
訊息實作
Message
介面的基本實作是 GenericMessage<T>
,它提供了兩個建構子,如下列清單所示
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
建立 Message
時,會產生隨機唯一 ID。接受標頭 Map
的建構子會將提供的標頭複製到新建立的 Message
。
還有一個方便的 Message
實作,旨在傳達錯誤狀況。此實作採用 Throwable
物件作為其酬載,如下列範例所示
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
請注意,此實作利用了 GenericMessage
基底類別已參數化的事實。因此,如兩個範例所示,在檢索 Message
酬載 Object
時不需要強制轉型。
MessageBuilder
輔助類別
您可能會注意到,Message
介面定義了其酬載和標頭的檢索方法,但未提供 setter。原因是 Message
在初始建立後無法修改。因此,當 Message
實例傳送給多個消費者時 (例如,透過發佈-訂閱通道),如果其中一個消費者需要傳送具有不同酬載類型的回覆,則它必須建立新的 Message
。因此,其他消費者不會受到這些變更的影響。請記住,多個消費者可能會存取相同的酬載實例或標頭值,而此類實例本身是否不可變是由您決定的。換句話說,Message
實例的契約類似於不可修改的 Collection
,而 MessageHeaders
map 進一步說明了這一點。即使 MessageHeaders
類別實作了 java.util.Map
,任何嘗試在 MessageHeaders
實例上調用 put
操作 (或 'remove' 或 'clear') 都會導致 UnsupportedOperationException
。
Spring Integration 提供了一種更方便的方式來建構訊息,而不是要求建立和填入 Map 以傳遞到 GenericMessage 建構子中:MessageBuilder
。MessageBuilder
提供了兩種工廠方法,用於從現有的 Message
或具有酬載 Object
建立 Message
實例。從現有的 Message
建置時,該 Message
的標頭和酬載會複製到新的 Message
,如下列範例所示
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要建立具有新酬載的 Message
,但仍想從現有的 Message
複製標頭,則可以使用其中一種 'copy' 方法,如下列範例所示
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
請注意,copyHeadersIfAbsent
方法不會覆寫現有值。此外,在前面的範例中,您可以看到如何使用 setHeader
設定任何使用者定義的標頭。最後,預先定義的標頭以及用於設定任何標頭的非破壞性方法 (MessageHeaders
也定義了預先定義標頭名稱的常數) 都有 set
方法可用。
您也可以使用 MessageBuilder
設定訊息的優先順序,如下列範例所示
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
只有在使用 PriorityChannel
時才會考慮 priority
標頭 (如下一章所述)。它被定義為 java.lang.Integer
。