TCP 連線工廠

總覽

對於 TCP,底層連線的設定是透過使用連線工廠來提供的。提供了兩種連線工廠類型:用戶端連線工廠和伺服器連線工廠。用戶端連線工廠建立連出連線。伺服器連線工廠監聽連入連線。

輸出通道配接器使用用戶端連線工廠,但您也可以提供對用戶端連線工廠的參考給輸入通道配接器。該配接器接收在輸出配接器建立的連線上接收到的任何連入訊息。

輸入通道配接器或閘道器使用伺服器連線工廠。(事實上,沒有伺服器連線工廠,連線工廠就無法運作)。您也可以提供對伺服器連線工廠的參考給輸出配接器。然後,您可以使用該配接器在相同連線上傳送回覆給連入訊息。

只有在回覆包含連線工廠插入到原始訊息中的 ip_connectionId 標頭時,回覆訊息才會路由到連線。
這是在輸入和輸出配接器之間共用連線工廠時執行的訊息關聯程度。這種共用允許透過 TCP 進行非同步雙向通訊。預設情況下,僅使用 TCP 傳輸有效負載資訊。因此,任何訊息關聯都必須由下游元件(例如彙集器或其他端點)執行。版本 3.0 中引入了對傳輸選定標頭的支援。如需更多資訊,請參閱 TCP 訊息關聯

您最多可以為每種類型的一個配接器提供對連線工廠的參考。

Spring 整合提供了使用 java.net.Socketjava.nio.channel.SocketChannel 的連線工廠。

以下範例顯示了一個簡單的伺服器連線工廠,它使用 java.net.Socket 連線

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下範例顯示了一個簡單的伺服器連線工廠,它使用 java.nio.channel.SocketChannel 連線

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
從 Spring 整合版本 4.2 開始,如果伺服器設定為監聽隨機埠(透過將埠設定為 0),您可以使用 getPort() 取得作業系統選擇的實際埠。此外,getServerSocketAddress() 可讓您取得完整的 SocketAddress。請參閱 TcpServerConnectionFactory 介面的 Javadoc 以取得更多資訊。
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下範例顯示了一個用戶端連線工廠,它使用 java.net.Socket 連線並為每個訊息建立一個新連線

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

從版本 5.2 開始,用戶端連線工廠支援屬性 connectTimeout,以秒為單位指定,預設為 60。

訊息劃分(序列化器和還原序列化器)

TCP 是一種串流協定。這表示必須為透過 TCP 傳輸的資料提供一些結構,以便接收者可以將資料劃分為離散的訊息。連線工廠設定為使用序列化器和還原序列化器,以在訊息有效負載和透過 TCP 傳送的位元之間進行轉換。這是透過為連入和連出訊息分別提供還原序列化器和序列化器來完成的。Spring 整合提供了許多標準序列化器和還原序列化器。

ByteArrayCrlfSerializer* 將位元組陣列轉換為位元組串流,後跟歸位字元和換行字元 (\r\n)。這是預設的序列化器(和還原序列化器),可以與 telnet 作為用戶端一起使用(例如)。

ByteArraySingleTerminatorSerializer* 將位元組陣列轉換為位元組串流,後跟單個終止字元(預設值為 0x00)。

ByteArrayLfSerializer* 將位元組陣列轉換為位元組串流,後跟單個換行字元 (0x0a)。

ByteArrayStxEtxSerializer* 將位元組陣列轉換為位元組串流,前面加上 STX (0x02),後面跟著 ETX (0x03)。

ByteArrayLengthHeaderSerializer 將位元組陣列轉換為位元組串流,前面加上網路位元組順序(大端序)的二進位長度。這是一種有效的還原序列化器,因為它不必剖析每個位元組來尋找終止字元序列。它也可以用於包含二進位資料的有效負載。前面的序列化器僅支援有效負載中的文字。長度標頭的預設大小為四個位元組(一個整數),允許訊息最大為 (2^31 - 1) 個位元組。但是,對於最大 255 個位元組的訊息,length 標頭可以是單個位元組(無符號),或者對於最大 (2^16 - 1) 個位元組的訊息,可以是無符號短整數(2 個位元組)。如果您需要標頭的任何其他格式,您可以子類別化 ByteArrayLengthHeaderSerializer 並為 readHeaderwriteHeader 方法提供實作。絕對最大資料大小為 (2^31 - 1) 個位元組。從版本 5.2 開始,標頭值可以包含標頭的長度以及有效負載。設定 inclusive 屬性以啟用該機制(對於生產者和消費者,它必須設定為相同)。

ByteArrayRawSerializer*,將位元組陣列轉換為位元組串流,並且不新增其他訊息劃分資料。使用此序列化器(和還原序列化器),訊息的結尾由用戶端以有序方式關閉 Socket 來指示。使用此序列化器時,訊息接收會暫停,直到用戶端關閉 Socket 或發生逾時。逾時不會產生訊息。當正在使用此序列化器且用戶端是 Spring 整合應用程式時,用戶端必須使用設定為 single-use="true" 的連線工廠。這樣做會導致配接器在傳送訊息後關閉 Socket。序列化器本身不會關閉連線。您應該僅將此序列化器與通道配接器(而非閘道器)使用的連線工廠一起使用,並且連線工廠應由輸入或輸出配接器使用,但不能同時使用。另請參閱本節稍後的 ByteArrayElasticRawDeserializer。但是,自版本 5.2 以來,輸出閘道器有一個新的屬性 closeStreamAfterSend;這允許使用原始序列化器/還原序列化器,因為 EOF 會訊號通知伺服器,同時保持連線開啟以接收回覆。

在版本 4.2.2 之前,當使用非封鎖 I/O (NIO) 時,此序列化器將逾時(在讀取期間)視為檔案結尾,並且到目前為止讀取的資料會作為訊息發出。這是不可靠的,不應用於分隔訊息。它現在將這種情況視為例外。在不太可能發生的情況下,您以這種方式使用它,您可以透過將 treatTimeoutAsEndOfMessage 建構子引數設定為 true 來還原先前的行為。

每個都是 AbstractByteArraySerializer 的子類別,它同時實作了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。為了向後相容性,使用 AbstractByteArraySerializer 的任何子類別進行序列化的連線也接受首先轉換為位元組陣列的 String。這些序列化器和還原序列化器中的每一個都會將包含相應格式的輸入串流轉換為位元組陣列有效負載。

為了避免由於行為不端的用戶端(不遵守已設定序列化器的協定)而導致記憶體耗盡,這些序列化器強制執行最大訊息大小。如果連入訊息超過此大小,則會擲回例外。預設最大訊息大小為 2048 個位元組。您可以透過設定 maxMessageSize 屬性來增加它。如果您使用預設序列化器或還原序列化器,並且希望增加最大訊息大小,則必須將最大訊息大小宣告為具有設定的 maxMessageSize 屬性的明確 Bean,並設定連線工廠以使用該 Bean。

本節前面標有 * 的類別使用中間緩衝區,並將解碼的資料複製到正確大小的最終緩衝區。從版本 4.3 開始,您可以透過設定 poolSize 屬性來設定這些緩衝區,以讓這些原始緩衝區可以重複使用,而不是為每個訊息配置和丟棄,這是預設行為。將屬性設定為負值會建立一個沒有界限的集區。如果集區有界限,您也可以設定 poolWaitTimeout 屬性(以毫秒為單位),在此之後,如果沒有可用的緩衝區,則會擲回例外。它預設為無限大。這樣的例外會導致 Socket 被關閉。

如果您希望在自訂還原序列化器中使用相同的機制,您可以擴充 AbstractPooledBufferByteArraySerializer(而不是其父類別 AbstractByteArraySerializer)並實作 doDeserialize() 而不是 deserialize()。緩衝區會自動傳回集區。AbstractPooledBufferByteArraySerializer 還提供了一個方便的實用方法:copyToSizedArray()

版本 5.0 新增了 ByteArrayElasticRawDeserializer。這與上述 ByteArrayRawSerializer 的還原序列化器端類似,只是不必設定 maxMessageSize。在內部,它使用 ByteArrayOutputStream,讓緩衝區可以根據需要成長。用戶端必須以有序方式關閉 Socket 以訊號通知訊息結尾。

此還原序列化器應僅在信任對等方時使用;它容易受到由於記憶體不足情況而導致的 DoS 攻擊。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之間進行轉換。您可以將此序列化器與 MessageConvertingTcpMessageMapperMapMessageConverter 結合使用,以傳輸 JSON 中選定的標頭和有效負載。

Jackson ObjectMapper 無法劃分串流中的訊息。因此,MapJsonSerializer 需要委派給另一個序列化器或還原序列化器來處理訊息劃分。預設情況下,使用 ByteArrayLfSerializer,導致線路上的訊息格式為 <json><LF>,但您可以將其設定為改用其他格式。(下一個範例說明如何執行此操作。)

最終標準序列化器是 org.springframework.core.serializer.DefaultSerializer,您可以使用它透過 Java 序列化轉換可序列化物件。org.springframework.core.serializer.DefaultDeserializer 用於串流連入還原序列化,這些串流包含可序列化物件。

如果您不希望使用預設序列化器和還原序列化器 (ByteArrayCrLfSerializer),則必須在連線工廠上設定 serializerdeserializer 屬性。以下範例顯示如何執行此操作

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

使用 java.net.Socket 連線並在线路上使用 Java 序列化的伺服器連線工廠。

如需連線工廠上可用屬性的完整詳細資訊,請參閱本節末尾的 參考資料

預設情況下,不會對連入封包執行反向 DNS 查閱:在未設定 DNS 的環境中(例如 Docker 容器),這可能會導致連線延遲。若要將 IP 位址轉換為主機名稱以用於訊息標頭中,可以透過將 lookup-host 屬性設定為 true 來覆寫預設行為。

您也可以修改 Socket 和 Socket 工廠的屬性。如需更多資訊,請參閱 SSL/TLS 支援。如那裡所述,如果正在使用 SSL 或未使用 SSL,則可以進行此類修改。

自訂序列化器和還原序列化器

如果您的資料不是標準還原序列化器之一支援的格式,您可以實作自己的還原序列化器;您也可以實作自訂序列化器。

若要實作自訂序列化器和還原序列化器對,請實作 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 介面。

當還原序列化器偵測到訊息之間已關閉的輸入串流時,它必須擲回 SoftEndOfStreamException;這是框架的訊號,指示關閉是「正常」的。如果在解碼訊息時串流已關閉,則應改為擲回其他一些例外狀況。

從版本 5.2 開始,SoftEndOfStreamException 現在是 RuntimeException 而不是擴充 IOException

TCP 快取用戶端連線工廠

先前所述,TCP Socket 可以是「單次使用」(一個請求或回應)或共用的。共用 Socket 在高容量環境中無法與輸出閘道器良好運作,因為 Socket 一次只能處理一個請求或回應。

若要提高效能,您可以使用協作通道配接器而不是閘道器,但這需要應用程式層級的訊息關聯。如需更多資訊,請參閱 TCP 訊息關聯

Spring 整合 2.2 引入了快取用戶端連線工廠,它使用共用 Socket 集區,讓閘道器可以使用共用連線集區處理多個並行請求。

TCP 容錯移轉用戶端連線工廠

您可以設定支援容錯移轉到一個或多個其他伺服器的 TCP 連線工廠。傳送訊息時,工廠會反覆運算所有已設定的工廠,直到可以傳送訊息或找不到連線為止。最初,使用已設定清單中的第一個工廠。如果連線隨後失敗,則下一個工廠會變成目前的工廠。以下範例顯示如何設定容錯移轉用戶端連線工廠

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
使用容錯移轉連線工廠時,singleUse 屬性在工廠本身及其設定為使用的工廠清單之間必須一致。

連線工廠具有兩個與失敗回復相關的屬性,當與共用連線一起使用時 (singleUse=false)

  • refreshSharedInterval

  • closeOnRefresh

考慮以下基於上述設定的案例:假設 clientFactory1 無法建立連線,但 clientFactory2 可以。當在 refreshSharedInterval 經過後呼叫 failCF getConnection() 方法時,我們將再次嘗試使用 clientFactory1 進行連線;如果成功,則與 clientFactory2 的連線將會關閉。如果 closeOnRefreshfalse,則「舊」連線將保持開啟,並且如果第一個工廠再次失敗,則將來可能會重複使用。

設定 refreshSharedInterval 以僅在該時間過期後嘗試與第一個工廠重新連線;如果您只想在目前連線失敗時失敗回復到第一個工廠,則將其設定為 Long.MAX_VALUE(預設值)。

設定 closeOnRefresh 以在重新整理實際建立新連線後關閉「舊」連線。

如果任何委派工廠是 CachingClientConnectionFactory,則這些屬性不適用,因為連線快取在那裡處理;在這種情況下,將始終諮詢連線工廠清單以取得連線。

從版本 5.3 開始,這些預設為 Long.MAX_VALUEtrue,因此工廠僅在目前連線失敗時嘗試失敗回復。若要還原為先前版本的預設行為,請將它們設定為 0false

另請參閱 測試連線

TCP 執行緒親和性連線工廠

Spring 整合版本 5.0 引入了此連線工廠。它將連線繫結到呼叫執行緒,並且每次該執行緒傳送訊息時都會重複使用相同的連線。這會持續到連線關閉(由伺服器或網路關閉)或執行緒呼叫 releaseConnection() 方法為止。連線本身由另一個用戶端工廠實作提供,該實作必須設定為提供非共用(單次使用)連線,以便每個執行緒都取得連線。

以下範例顯示如何設定 TCP 執行緒親和性連線工廠

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}