關於非阻塞 I/O (NIO)
使用 NIO (請參閱IP 設定屬性中的 using-nio
) 可以避免為每個 socket 專門配置一個執行緒來讀取。對於少量 socket,您可能會發現不使用 NIO,結合非同步交接 (例如到 QueueChannel
),效能與使用 NIO 一樣好,甚至更好。
當處理大量連線時,您應該考慮使用 NIO。然而,使用 NIO 還有其他影響。執行緒池 (在任務執行器中) 在所有 socket 之間共用。每個傳入的訊息都會被組裝,並作為一個獨立的工作單元,在從該池中選取的執行緒上傳送到設定的通道。到達同一個 socket 的兩個循序訊息可能會由不同的執行緒處理。這表示訊息傳送到通道的順序是不確定的。抵達 socket 的訊息的嚴格排序不會被維護。
對於某些應用程式,這不是問題。對於其他應用程式,這是一個問題。如果您需要嚴格排序,請考慮將 using-nio
設定為 false
並使用非同步交接。
或者,您可以將重排序器插入輸入端點的下游,以將訊息返回到正確的順序。如果您在連線工廠上將 apply-sequence
設定為 true
,則抵達 TCP 連線的訊息會設定 sequenceNumber
和 correlationId
標頭。重排序器使用這些標頭將訊息返回到正確的順序。
從 5.1.4 版本開始,優先考慮接受新連線,而不是從現有連線讀取。除非您有非常高的新傳入連線速率,否則這通常影響不大。如果您希望恢復到先前優先考慮讀取的行為,請將 TcpNioServerConnectionFactory 上的 multiAccept 屬性設定為 false 。 |
池大小
池大小屬性已不再使用。先前,當未指定 task-executor 時,它指定預設執行緒池的大小。它也用於設定伺服器 socket 上的連線待辦項目。第一個功能已不再需要 (請參閱下一段)。第二個功能已由 backlog
屬性取代。
先前,當使用固定執行緒池任務執行器 (這是預設值) 和 NIO 時,可能會發生死鎖且處理會停止。問題發生在緩衝區已滿時,從 socket 讀取的執行緒嘗試將更多資料新增到緩衝區,且沒有可用的執行緒在緩衝區中建立空間。這僅在非常小的池大小下發生,但在極端條件下可能會發生。自 2.2 版以來,兩項變更消除了此問題。首先,預設任務執行器是快取執行緒池執行器。其次,已新增死鎖偵測邏輯,以便在發生執行緒飢餓時,不會發生死鎖,而是擲回例外狀況,從而釋放死鎖的資源。
現在預設任務執行器是無限制的,如果訊息處理需要較長時間,則在高傳入訊息速率下,可能會發生記憶體不足的情況。如果您的應用程式出現這種行為,您應該使用具有適當池大小的池化任務執行器,但請參閱下一節。 |
具有 CALLER_RUNS
策略的執行緒池任務執行器
當您將固定執行緒池與 CallerRunsPolicy
(使用 <task/>
命名空間時為 CALLER_RUNS
) 搭配使用且佇列容量很小時,您應該記住一些重要的考量。
如果您未使用固定執行緒池,則以下內容不適用。
使用 NIO 連線時,有三種不同的任務類型。I/O selector 處理在一個專用執行緒上執行 (偵測事件、接受新連線,以及使用任務執行器將 I/O 讀取作業分派到其他執行緒)。當 I/O 讀取器執行緒 (讀取作業分派到此執行緒) 讀取資料時,它會交接給另一個執行緒來組裝傳入的訊息。大型訊息可能需要多次讀取才能完成。這些「組裝器」執行緒可能會在等待資料時被封鎖。當新的讀取事件發生時,讀取器會判斷此 socket 是否已具有組裝器,如果沒有,則執行新的組裝器。當組裝程序完成時,組裝器執行緒會返回到池中。
當池耗盡、正在使用 CALLER_RUNS
拒絕策略且任務佇列已滿時,這可能會導致死鎖。當池為空且佇列中沒有空間時,IO selector 執行緒會收到 OP_READ
事件,並使用執行器分派讀取。佇列已滿,因此 selector 執行緒本身會啟動讀取程序。現在它偵測到此 socket 沒有組裝器,並且在執行讀取之前,會啟動一個組裝器。佇列再次已滿,且 selector 執行緒變成組裝器。組裝器現在被封鎖,等待讀取資料,但永遠不會發生。連線工廠現在已死鎖,因為 selector 執行緒無法處理新事件。
為了避免此死鎖,我們必須避免 selector (或讀取器) 執行緒執行組裝任務。我們希望為 IO 和組裝作業使用單獨的池。
框架提供 CompositeExecutor
,允許設定兩個不同的執行器:一個用於執行 IO 作業,另一個用於訊息組裝。在此環境中,IO 執行緒永遠不會變成組裝器執行緒,且不會發生死鎖。
此外,任務執行器應設定為使用 AbortPolicy
(使用 <task>
時為 ABORT
)。當 I/O 任務無法完成時,它會延遲一段時間,並持續重試,直到可以完成並配置組裝器為止。預設情況下,延遲為 100 毫秒,但您可以透過在連線工廠上設定 readDelay
屬性 (使用 XML 命名空間設定時為 read-delay
) 來變更它。
以下三個範例示範如何設定複合執行器
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>