關於非阻塞 I/O (NIO)

使用 NIO (請參閱IP 設定屬性中的 using-nio) 可以避免為每個 socket 專門配置一個執行緒來讀取。對於少量 socket,您可能會發現不使用 NIO,結合非同步交接 (例如到 QueueChannel),效能與使用 NIO 一樣好,甚至更好。

當處理大量連線時,您應該考慮使用 NIO。然而,使用 NIO 還有其他影響。執行緒池 (在任務執行器中) 在所有 socket 之間共用。每個傳入的訊息都會被組裝,並作為一個獨立的工作單元,在從該池中選取的執行緒上傳送到設定的通道。到達同一個 socket 的兩個循序訊息可能會由不同的執行緒處理。這表示訊息傳送到通道的順序是不確定的。抵達 socket 的訊息的嚴格排序不會被維護。

對於某些應用程式,這不是問題。對於其他應用程式,這是一個問題。如果您需要嚴格排序,請考慮將 using-nio 設定為 false 並使用非同步交接。

或者,您可以將重排序器插入輸入端點的下游,以將訊息返回到正確的順序。如果您在連線工廠上將 apply-sequence 設定為 true,則抵達 TCP 連線的訊息會設定 sequenceNumbercorrelationId 標頭。重排序器使用這些標頭將訊息返回到正確的順序。

從 5.1.4 版本開始,優先考慮接受新連線,而不是從現有連線讀取。除非您有非常高的新傳入連線速率,否則這通常影響不大。如果您希望恢復到先前優先考慮讀取的行為,請將 TcpNioServerConnectionFactory 上的 multiAccept 屬性設定為 false

池大小

池大小屬性已不再使用。先前,當未指定 task-executor 時,它指定預設執行緒池的大小。它也用於設定伺服器 socket 上的連線待辦項目。第一個功能已不再需要 (請參閱下一段)。第二個功能已由 backlog 屬性取代。

先前,當使用固定執行緒池任務執行器 (這是預設值) 和 NIO 時,可能會發生死鎖且處理會停止。問題發生在緩衝區已滿時,從 socket 讀取的執行緒嘗試將更多資料新增到緩衝區,且沒有可用的執行緒在緩衝區中建立空間。這僅在非常小的池大小下發生,但在極端條件下可能會發生。自 2.2 版以來,兩項變更消除了此問題。首先,預設任務執行器是快取執行緒池執行器。其次,已新增死鎖偵測邏輯,以便在發生執行緒飢餓時,不會發生死鎖,而是擲回例外狀況,從而釋放死鎖的資源。

現在預設任務執行器是無限制的,如果訊息處理需要較長時間,則在高傳入訊息速率下,可能會發生記憶體不足的情況。如果您的應用程式出現這種行為,您應該使用具有適當池大小的池化任務執行器,但請參閱下一節

具有 CALLER_RUNS 策略的執行緒池任務執行器

當您將固定執行緒池與 CallerRunsPolicy (使用 <task/> 命名空間時為 CALLER_RUNS) 搭配使用且佇列容量很小時,您應該記住一些重要的考量。

如果您未使用固定執行緒池,則以下內容不適用。

使用 NIO 連線時,有三種不同的任務類型。I/O selector 處理在一個專用執行緒上執行 (偵測事件、接受新連線,以及使用任務執行器將 I/O 讀取作業分派到其他執行緒)。當 I/O 讀取器執行緒 (讀取作業分派到此執行緒) 讀取資料時,它會交接給另一個執行緒來組裝傳入的訊息。大型訊息可能需要多次讀取才能完成。這些「組裝器」執行緒可能會在等待資料時被封鎖。當新的讀取事件發生時,讀取器會判斷此 socket 是否已具有組裝器,如果沒有,則執行新的組裝器。當組裝程序完成時,組裝器執行緒會返回到池中。

當池耗盡、正在使用 CALLER_RUNS 拒絕策略且任務佇列已滿時,這可能會導致死鎖。當池為空且佇列中沒有空間時,IO selector 執行緒會收到 OP_READ 事件,並使用執行器分派讀取。佇列已滿,因此 selector 執行緒本身會啟動讀取程序。現在它偵測到此 socket 沒有組裝器,並且在執行讀取之前,會啟動一個組裝器。佇列再次已滿,且 selector 執行緒變成組裝器。組裝器現在被封鎖,等待讀取資料,但永遠不會發生。連線工廠現在已死鎖,因為 selector 執行緒無法處理新事件。

為了避免此死鎖,我們必須避免 selector (或讀取器) 執行緒執行組裝任務。我們希望為 IO 和組裝作業使用單獨的池。

框架提供 CompositeExecutor,允許設定兩個不同的執行器:一個用於執行 IO 作業,另一個用於訊息組裝。在此環境中,IO 執行緒永遠不會變成組裝器執行緒,且不會發生死鎖。

此外,任務執行器應設定為使用 AbortPolicy (使用 <task> 時為 ABORT)。當 I/O 任務無法完成時,它會延遲一段時間,並持續重試,直到可以完成並配置組裝器為止。預設情況下,延遲為 100 毫秒,但您可以透過在連線工廠上設定 readDelay 屬性 (使用 XML 命名空間設定時為 read-delay) 來變更它。

以下三個範例示範如何設定複合執行器

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>