串流支援
在許多情況下,應用程式資料是從串流中取得的。不建議將串流的參考作為訊息酬載傳送給消費者。相反地,訊息是從輸入串流中讀取的資料建立的,而訊息酬載會逐一寫入輸出串流。
您需要將此依賴項包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.3.5"
從串流讀取
Spring Integration 提供了兩個用於串流的配接器。ByteStreamReadingMessageSource
和 CharacterStreamReadingMessageSource
都實作了 MessageSource
。透過在 channel-adapter 元素中設定其中一個,可以設定輪詢週期,並且訊息匯流排可以自動偵測和排程它們。位元組串流版本需要 InputStream
,而字元串流版本需要 Reader
作為單一建構子引數。ByteStreamReadingMessageSource
也接受 'bytesPerMessage' 屬性,以決定它嘗試讀取到每個 Message
中的位元組數。預設值為 1024
。以下範例建立一個輸入串流,該串流建立每個包含 2048 個位元組的訊息
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
CharacterStreamReadingMessageSource
將讀取器包裝在 BufferedReader
中(如果它還不是的話)。您可以在第二個建構子引數中設定緩衝讀取器使用的緩衝區大小。從版本 5.0 開始,第三個建構子引數 (blockToDetectEOF
) 控制 CharacterStreamReadingMessageSource
的行為。當 false
(預設值)時,receive()
方法會檢查讀取器是否 ready()
,如果不是,則傳回 null。在這種情況下,不會偵測到 EOF(檔案結尾)。當 true
時,receive()
方法會封鎖,直到資料可用或在基礎串流上偵測到 EOF 為止。當偵測到 EOF 時,會發佈 StreamClosedEvent
(應用程式事件)。您可以使用實作 ApplicationListener<StreamClosedEvent>
的 Bean 來取用此事件。
為了方便 EOF 偵測,輪詢器執行緒會在 receive() 方法中封鎖,直到資料到達或偵測到 EOF 為止。 |
一旦偵測到 EOF,輪詢器會繼續在每次輪詢時發佈事件。應用程式接聽器可以停止配接器以防止這種情況。事件是在輪詢器執行緒上發佈的。停止配接器會導致執行緒中斷。如果您打算在停止配接器後執行一些可中斷的任務,您必須在不同的執行緒上執行 stop() ,或為那些下游活動使用不同的執行緒。請注意,傳送到 QueueChannel 是可中斷的,因此,如果您希望從接聽器傳送訊息,請在停止配接器之前執行此操作。 |
這有助於將資料「管道傳輸」或重新導向到 stdin
,如下列兩個範例所示
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
這種方法讓應用程式在管道關閉時停止。
有四種方便的工廠方法可用
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
寫入串流
對於目標串流,您可以使用兩種實作之一:ByteStreamWritingMessageHandler
或 CharacterStreamWritingMessageHandler
。每個都需要單一建構子引數(位元組串流的 OutputStream
或字元串流的 Writer
),並且每個都提供第二個建構子,該建構子新增了可選的 'bufferSize'。由於這兩者最終都實作了 MessageHandler
介面,因此您可以從 channel-adapter
設定中參考它們,如 通道配接器 中所述。
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
串流命名空間支援
Spring Integration 定義了一個命名空間,以減少串流相關通道配接器所需的設定。需要以下綱要位置才能使用它
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下程式碼片段顯示了支援的不同設定選項,用於設定輸入通道配接器
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
從版本 5.0 開始,您可以設定 detect-eof
屬性,該屬性設定 blockToDetectEOF
屬性。有關更多資訊,請參閱 從串流讀取。
若要設定輸出通道配接器,您也可以使用命名空間支援。以下範例顯示了輸出通道配接器的不同設定
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>