TCP 訊息關聯

IP 端點的一個目標是提供與 Spring Integration 應用程式以外的系統進行通訊。因此,預設情況下僅傳送和接收訊息 Payload。自 3.0 版起,您可以使用 JSON、Java 序列化或自訂序列化器和還原序列化器來傳輸標頭。請參閱 傳輸標頭 以取得更多資訊。框架未提供訊息關聯(除非使用閘道)或伺服器端的協作通道適配器。本文稍後,我們將討論應用程式可用的各種關聯技術。在大多數情況下,即使訊息 Payload 包含一些自然關聯資料(例如訂單號碼),也需要特定的應用程式層級訊息關聯。

閘道

閘道會自動關聯訊息。但是,您應該將輸出閘道用於相對低量的應用程式。當您設定連線工廠對所有訊息對使用單一共用連線('single-use="false"')時,一次只能處理一則訊息。新訊息必須等到收到先前訊息的回覆後才能處理。當連線工廠設定為每個新訊息都使用新連線('single-use="true"')時,此限制不適用。雖然此設定可以提供比共用連線環境更高的輸送量,但每次訊息對都需要開啟和關閉新連線,這會帶來額外負擔。

因此,對於高容量訊息,請考慮使用協作的通道適配器對。但是,若要這樣做,您需要提供協作邏輯。

Spring Integration 2.2 中引入的另一個解決方案是使用 CachingClientConnectionFactory,它允許使用共用連線池。

協作輸出和輸入通道適配器

為了實現高容量輸送量(避免使用閘道的缺點,如先前所述),您可以設定一對協作的輸出和輸入通道適配器。您也可以將協作適配器(伺服器端或用戶端)用於完全非同步通訊(而不是請求-回覆語意)。在伺服器端,訊息關聯由適配器自動處理,因為輸入適配器會新增一個標頭,讓輸出適配器能夠判斷在傳送回覆訊息時要使用哪個連線。

在伺服器端,您必須填入 ip_connectionId 標頭,因為它用於將訊息關聯到連線。源自輸入適配器的訊息會自動設定標頭。如果您想要建構其他訊息來傳送,則需要設定標頭。您可以從傳入訊息取得標頭值。

在用戶端,如果需要,應用程式必須提供自己的關聯邏輯。您可以使用多種方式來執行此操作。

如果訊息 Payload 具有一些自然關聯資料(例如交易 ID 或訂單號碼),並且您不需要保留來自原始輸出訊息的任何資訊(例如回覆通道標頭),則關聯很簡單,無論如何都會在應用程式層級完成。

如果訊息 Payload 具有一些自然關聯資料(例如交易 ID 或訂單號碼),但您需要保留來自原始輸出訊息的一些資訊(例如回覆通道標頭),您可以保留原始輸出訊息的副本(可能透過使用發布-訂閱通道),並使用聚合器重新組合必要的資料。

對於前述兩種情況,如果 Payload 沒有自然關聯資料,您可以在輸出通道適配器的上游提供一個轉換器,以使用此類資料增強 Payload。此類轉換器可能會將原始 Payload 轉換為一個新物件,其中包含原始 Payload 和訊息標頭的子集。當然,標頭中的即時物件(例如回覆通道)無法包含在轉換後的 Payload 中。

如果您選擇此類策略,則需要確保連線工廠具有適當的序列化器-還原序列化器對,以處理此類 Payload(例如 DefaultSerializerDefaultDeserializer,它們使用 Java 序列化,或自訂序列化器和還原序列化器)。TCP 連線工廠中提及的 ByteArray*Serializer 選項,包括預設的 ByteArrayCrLfSerializer,除非轉換後的 Payload 是 Stringbyte[],否則它們不支援此類 Payload。

在 2.2 版本之前,當協作通道適配器使用用戶端連線工廠時,so-timeout 屬性預設為預設回覆逾時(10 秒)。這表示,如果在輸入適配器在此期間內未收到任何資料,則會關閉 Socket。

此預設行為在真正的非同步環境中並不適用,因此現在預設為無限逾時。您可以將用戶端連線工廠上的 so-timeout 屬性設定為 10000 毫秒,以恢復先前的預設行為。

從 5.4 版開始,多個輸出通道適配器和一個 TcpInboundChannelAdapter 可以共用相同的連線工廠。這讓應用程式可以同時支援請求/回覆和任意伺服器 → 用戶端訊息傳遞。請參閱 TCP 閘道 以取得更多資訊。

傳輸標頭

TCP 是一種串流協定。SerializersDeserializers 劃分串流中的訊息。在 3.0 之前,只能透過 TCP 傳輸訊息 Payload(Stringbyte[])。從 3.0 開始,您可以傳輸選定的標頭以及 Payload。但是,無法序列化「即時」物件,例如 replyChannel 標頭。

透過 TCP 傳送標頭資訊需要一些額外的設定。

第一步是為 ConnectionFactory 提供一個 MessageConvertingTcpMessageMapper,它使用 mapper 屬性。此 Mapper 委派給任何 MessageConverter 實作,以將訊息轉換為物件,並從物件轉換回來,該物件可以由設定的 serializerdeserializer 序列化和還原序列化。

Spring Integration 提供了一個 MapMessageConverter,它允許指定新增至 Map 物件的標頭列表,以及 Payload。產生的 Map 具有兩個項目:payloadheadersheaders 項目本身是一個 Map,並且包含選定的標頭。

第二步是提供一個序列化器和一個還原序列化器,它們可以在 Map 和某種線路格式之間進行轉換。這可以是自訂的 SerializerDeserializer,如果您對等系統不是 Spring Integration 應用程式,則通常需要此類序列化器或還原序列化器。

Spring Integration 提供了一個 MapJsonSerializer,用於將 Map 轉換為 JSON 和從 JSON 轉換回來。它使用 Spring Integration JsonObjectMapper。如果需要,您可以提供自訂的 JsonObjectMapper。預設情況下,序列化器會在物件之間插入換行符號 (0x0a) 字元。請參閱 Javadoc 以取得更多資訊。

JsonObjectMapper 使用類別路徑中的任何版本的 Jackson

您也可以使用 Map 的標準 Java 序列化,方法是使用 DefaultSerializerDefaultDeserializer

以下範例顯示了連線工廠的設定,該工廠使用 JSON 傳輸 correlationIdsequenceNumbersequenceSize 標頭

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用上述設定傳送的訊息,其 Payload 為 'something',在線路上會顯示如下

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}