資料緩衝區與編碼器
Java NIO 提供了 ByteBuffer
,但許多程式庫在其之上建構了自己的位元組緩衝區 API,特別是對於網路操作,其中重複使用緩衝區和/或使用直接緩衝區對於效能是有益的。例如,Netty 具有 ByteBuf
階層,Undertow 使用 XNIO,Jetty 使用帶有回呼以供釋放的集區位元組緩衝區等等。spring-core
模組提供了一組抽象概念,用於處理各種位元組緩衝區 API,如下所示:
-
DataBufferFactory
抽象化了資料緩衝區的建立。 -
DataBuffer
代表位元組緩衝區,可能是集區化的。 -
DataBufferUtils
提供了資料緩衝區的實用方法。 -
編碼器 將資料緩衝區串流解碼或編碼為更高等級的物件。
DataBufferFactory
DataBufferFactory
用於以兩種方式之一建立資料緩衝區:
-
配置新的資料緩衝區,如果已知容量,可以選擇預先指定容量,這樣更有效率,即使
DataBuffer
的實作可以根據需求成長和縮小。 -
包裝現有的
byte[]
或java.nio.ByteBuffer
,這會使用DataBuffer
實作裝飾給定的資料,並且不涉及配置。
請注意,WebFlux 應用程式不會直接建立 DataBufferFactory
,而是透過伺服器端的 ServerHttpResponse
或客户端的 ClientHttpRequest
來存取它。工廠的類型取決於底層的客户端或伺服器,例如,Reactor Netty 使用 NettyDataBufferFactory
,其他使用 DefaultDataBufferFactory
。
DataBuffer
DataBuffer
介面提供了與 java.nio.ByteBuffer
類似的操作,但也帶來了一些額外的好處,其中一些靈感來自 Netty ByteBuf
。以下是部分優點列表:
-
使用獨立的位置進行讀取和寫入,即不需要呼叫
flip()
在讀取和寫入之間切換。 -
容量可根據需求擴充,如同
java.lang.StringBuilder
。 -
集區化的緩衝區和透過
PooledDataBuffer
進行參考計數。 -
將緩衝區視為
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
確定給定位元組的索引或最後一個索引。
PooledDataBuffer
如 ByteBuffer 的 Javadoc 中所述,位元組緩衝區可以是直接的或非直接的。直接緩衝區可能駐留在 Java 堆積之外,這消除了本機 I/O 操作複製的需要。這使得直接緩衝區對於透過 Socket 接收和傳送資料特別有用,但它們的建立和釋放成本也更高,這導致了集區化緩衝區的想法。
PooledDataBuffer
是 DataBuffer
的擴充,有助於參考計數,這對於位元組緩衝區集區化至關重要。它是如何運作的?當配置 PooledDataBuffer
時,參考計數為 1。呼叫 retain()
會遞增計數,而呼叫 release()
會遞減計數。只要計數大於 0,就保證不會釋放緩衝區。當計數減少到 0 時,可以釋放集區化的緩衝區,這在實務上可能意味著緩衝區的保留記憶體已返回到記憶體集區。
請注意,在大多數情況下,與其直接操作 PooledDataBuffer
,不如使用 DataBufferUtils
中的便利方法,這些方法僅在 DataBuffer
是 PooledDataBuffer
的實例時才套用釋放或保留。
DataBufferUtils
DataBufferUtils
提供了許多實用方法來操作資料緩衝區:
-
將資料緩衝區串流聯結成單個緩衝區,可能透過零複製,例如,如果底層位元組緩衝區 API 支援,則透過複合緩衝區。
-
將
InputStream
或 NIOChannel
轉換為Flux<DataBuffer>
,反之亦然,將Publisher<DataBuffer>
轉換為OutputStream
或 NIOChannel
。 -
如果緩衝區是
PooledDataBuffer
的實例,則釋放或保留DataBuffer
的方法。 -
跳過或從位元組串流中取得,直到達到特定的位元組計數。
編碼器
org.springframework.core.codec
套件提供了以下策略介面:
-
Encoder
將Publisher<T>
編碼為資料緩衝區串流。 -
Decoder
將Publisher<DataBuffer>
解碼為更高等級物件的串流。
spring-core
模組提供了 byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
編碼器和解碼器實作。spring-web
模組新增了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他編碼器和解碼器。請參閱 WebFlux 章節中的 編碼器。
使用 DataBuffer
當使用資料緩衝區時,必須特別注意確保緩衝區被釋放,因為它們可能是集區化的。我們將使用編碼器來說明其運作方式,但這些概念更普遍適用。讓我們看看編碼器在內部必須做些什麼來管理資料緩衝區。
Decoder
是最後一個讀取輸入資料緩衝區的,在建立更高等級的物件之前,因此它必須按如下方式釋放它們:
-
如果
Decoder
只是簡單地讀取每個輸入緩衝區並準備立即釋放它,則可以透過DataBufferUtils.release(dataBuffer)
這樣做。 -
如果
Decoder
正在使用Flux
或Mono
運算子(例如flatMap
、reduce
和其他在內部預先提取和快取資料項目的運算子),或者正在使用運算子(例如filter
、skip
和其他省略項目的運算子),則必須將doOnDiscard(DataBuffer.class, DataBufferUtils::release)
新增至組合鏈,以確保在這些緩衝區被丟棄之前(也可能是由於錯誤或取消訊號的結果)被釋放。 -
如果
Decoder
以任何其他方式保留一個或多個資料緩衝區,則必須確保在完全讀取時或在錯誤或取消訊號發生在快取的資料緩衝區被讀取和釋放之前釋放它們。
請注意,DataBufferUtils#join
提供了一種安全有效的方法,可將資料緩衝區串流聚合為單個資料緩衝區。同樣地,skipUntilByteCount
和 takeUntilByteCount
是解碼器使用的其他安全方法。
Encoder
配置資料緩衝區,其他人必須讀取(和釋放)。因此,Encoder
沒有太多事情要做。但是,如果發生序列化錯誤,導致在用資料填充緩衝區時出錯,Encoder
必須注意釋放資料緩衝區。例如:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder
的消費者負責釋放它接收到的資料緩衝區。在 WebFlux 應用程式中,Encoder
的輸出用於寫入 HTTP 伺服器回應,或寫入客户端 HTTP 請求,在這種情況下,釋放資料緩衝區是寫入伺服器回應或客户端請求的程式碼的責任。
請注意,當在 Netty 上執行時,有一些偵錯選項可用於 疑難排解緩衝區洩漏。