TCP 訊息關聯
閘道
閘道會自動關聯訊息。但是,您應該將輸出閘道用於相對低量的應用程式。當您設定連線工廠對所有訊息對使用單一共用連線('single-use="false"')時,一次只能處理一則訊息。新訊息必須等到收到先前訊息的回覆後才能處理。當連線工廠設定為每個新訊息都使用新連線('single-use="true"')時,此限制不適用。雖然此設定可以提供比共用連線環境更高的輸送量,但每次訊息對都需要開啟和關閉新連線,這會帶來額外負擔。
因此,對於高容量訊息,請考慮使用協作的通道適配器對。但是,若要這樣做,您需要提供協作邏輯。
Spring Integration 2.2 中引入的另一個解決方案是使用 CachingClientConnectionFactory
,它允許使用共用連線池。
協作輸出和輸入通道適配器
為了實現高容量輸送量(避免使用閘道的缺點,如先前所述),您可以設定一對協作的輸出和輸入通道適配器。您也可以將協作適配器(伺服器端或用戶端)用於完全非同步通訊(而不是請求-回覆語意)。在伺服器端,訊息關聯由適配器自動處理,因為輸入適配器會新增一個標頭,讓輸出適配器能夠判斷在傳送回覆訊息時要使用哪個連線。
在伺服器端,您必須填入 ip_connectionId 標頭,因為它用於將訊息關聯到連線。源自輸入適配器的訊息會自動設定標頭。如果您想要建構其他訊息來傳送,則需要設定標頭。您可以從傳入訊息取得標頭值。 |
在用戶端,如果需要,應用程式必須提供自己的關聯邏輯。您可以使用多種方式來執行此操作。
如果訊息 Payload 具有一些自然關聯資料(例如交易 ID 或訂單號碼),並且您不需要保留來自原始輸出訊息的任何資訊(例如回覆通道標頭),則關聯很簡單,無論如何都會在應用程式層級完成。
如果訊息 Payload 具有一些自然關聯資料(例如交易 ID 或訂單號碼),但您需要保留來自原始輸出訊息的一些資訊(例如回覆通道標頭),您可以保留原始輸出訊息的副本(可能透過使用發布-訂閱通道),並使用聚合器重新組合必要的資料。
對於前述兩種情況,如果 Payload 沒有自然關聯資料,您可以在輸出通道適配器的上游提供一個轉換器,以使用此類資料增強 Payload。此類轉換器可能會將原始 Payload 轉換為一個新物件,其中包含原始 Payload 和訊息標頭的子集。當然,標頭中的即時物件(例如回覆通道)無法包含在轉換後的 Payload 中。
如果您選擇此類策略,則需要確保連線工廠具有適當的序列化器-還原序列化器對,以處理此類 Payload(例如 DefaultSerializer
和 DefaultDeserializer
,它們使用 Java 序列化,或自訂序列化器和還原序列化器)。TCP 連線工廠中提及的 ByteArray*Serializer
選項,包括預設的 ByteArrayCrLfSerializer
,除非轉換後的 Payload 是 String
或 byte[]
,否則它們不支援此類 Payload。
在 2.2 版本之前,當協作通道適配器使用用戶端連線工廠時, 此預設行為在真正的非同步環境中並不適用,因此現在預設為無限逾時。您可以將用戶端連線工廠上的 |
從 5.4 版開始,多個輸出通道適配器和一個 TcpInboundChannelAdapter
可以共用相同的連線工廠。這讓應用程式可以同時支援請求/回覆和任意伺服器 → 用戶端訊息傳遞。請參閱 TCP 閘道 以取得更多資訊。
傳輸標頭
TCP 是一種串流協定。Serializers
和 Deserializers
劃分串流中的訊息。在 3.0 之前,只能透過 TCP 傳輸訊息 Payload(String
或 byte[]
)。從 3.0 開始,您可以傳輸選定的標頭以及 Payload。但是,無法序列化「即時」物件,例如 replyChannel
標頭。
透過 TCP 傳送標頭資訊需要一些額外的設定。
第一步是為 ConnectionFactory
提供一個 MessageConvertingTcpMessageMapper
,它使用 mapper
屬性。此 Mapper 委派給任何 MessageConverter
實作,以將訊息轉換為物件,並從物件轉換回來,該物件可以由設定的 serializer
和 deserializer
序列化和還原序列化。
Spring Integration 提供了一個 MapMessageConverter
,它允許指定新增至 Map
物件的標頭列表,以及 Payload。產生的 Map 具有兩個項目:payload
和 headers
。headers
項目本身是一個 Map
,並且包含選定的標頭。
第二步是提供一個序列化器和一個還原序列化器,它們可以在 Map
和某種線路格式之間進行轉換。這可以是自訂的 Serializer
或 Deserializer
,如果您對等系統不是 Spring Integration 應用程式,則通常需要此類序列化器或還原序列化器。
Spring Integration 提供了一個 MapJsonSerializer
,用於將 Map
轉換為 JSON 和從 JSON 轉換回來。它使用 Spring Integration JsonObjectMapper
。如果需要,您可以提供自訂的 JsonObjectMapper
。預設情況下,序列化器會在物件之間插入換行符號 (0x0a
) 字元。請參閱 Javadoc 以取得更多資訊。
JsonObjectMapper 使用類別路徑中的任何版本的 Jackson 。 |
您也可以使用 Map
的標準 Java 序列化,方法是使用 DefaultSerializer
和 DefaultDeserializer
。
以下範例顯示了連線工廠的設定,該工廠使用 JSON 傳輸 correlationId
、sequenceNumber
和 sequenceSize
標頭
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述設定傳送的訊息,其 Payload 為 'something',在線路上會顯示如下
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}