資料緩衝區與編碼器

Java NIO 提供了 ByteBuffer,但許多程式庫在其之上建構了自己的位元組緩衝區 API,特別是對於網路操作,其中重複使用緩衝區和/或使用直接緩衝區對於效能是有益的。例如,Netty 具有 ByteBuf 階層,Undertow 使用 XNIO,Jetty 使用帶有回呼以供釋放的集區位元組緩衝區等等。spring-core 模組提供了一組抽象概念,用於處理各種位元組緩衝區 API,如下所示:

DataBufferFactory

DataBufferFactory 用於以兩種方式之一建立資料緩衝區:

  1. 配置新的資料緩衝區,如果已知容量,可以選擇預先指定容量,這樣更有效率,即使 DataBuffer 的實作可以根據需求成長和縮小。

  2. 包裝現有的 byte[]java.nio.ByteBuffer,這會使用 DataBuffer 實作裝飾給定的資料,並且不涉及配置。

請注意,WebFlux 應用程式不會直接建立 DataBufferFactory,而是透過伺服器端的 ServerHttpResponse 或客户端的 ClientHttpRequest 來存取它。工廠的類型取決於底層的客户端或伺服器,例如,Reactor Netty 使用 NettyDataBufferFactory,其他使用 DefaultDataBufferFactory

DataBuffer

DataBuffer 介面提供了與 java.nio.ByteBuffer 類似的操作,但也帶來了一些額外的好處,其中一些靈感來自 Netty ByteBuf。以下是部分優點列表:

  • 使用獨立的位置進行讀取和寫入,即不需要呼叫 flip() 在讀取和寫入之間切換。

  • 容量可根據需求擴充,如同 java.lang.StringBuilder

  • 集區化的緩衝區和透過 PooledDataBuffer 進行參考計數。

  • 將緩衝區視為 java.nio.ByteBufferInputStreamOutputStream

  • 確定給定位元組的索引或最後一個索引。

PooledDataBuffer

ByteBuffer 的 Javadoc 中所述,位元組緩衝區可以是直接的或非直接的。直接緩衝區可能駐留在 Java 堆積之外,這消除了本機 I/O 操作複製的需要。這使得直接緩衝區對於透過 Socket 接收和傳送資料特別有用,但它們的建立和釋放成本也更高,這導致了集區化緩衝區的想法。

PooledDataBufferDataBuffer 的擴充,有助於參考計數,這對於位元組緩衝區集區化至關重要。它是如何運作的?當配置 PooledDataBuffer 時,參考計數為 1。呼叫 retain() 會遞增計數,而呼叫 release() 會遞減計數。只要計數大於 0,就保證不會釋放緩衝區。當計數減少到 0 時,可以釋放集區化的緩衝區,這在實務上可能意味著緩衝區的保留記憶體已返回到記憶體集區。

請注意,在大多數情況下,與其直接操作 PooledDataBuffer,不如使用 DataBufferUtils 中的便利方法,這些方法僅在 DataBufferPooledDataBuffer 的實例時才套用釋放或保留。

DataBufferUtils

DataBufferUtils 提供了許多實用方法來操作資料緩衝區:

  • 將資料緩衝區串流聯結成單個緩衝區,可能透過零複製,例如,如果底層位元組緩衝區 API 支援,則透過複合緩衝區。

  • InputStream 或 NIO Channel 轉換為 Flux<DataBuffer>,反之亦然,將 Publisher<DataBuffer> 轉換為 OutputStream 或 NIO Channel

  • 如果緩衝區是 PooledDataBuffer 的實例,則釋放或保留 DataBuffer 的方法。

  • 跳過或從位元組串流中取得,直到達到特定的位元組計數。

編碼器

org.springframework.core.codec 套件提供了以下策略介面:

  • EncoderPublisher<T> 編碼為資料緩衝區串流。

  • DecoderPublisher<DataBuffer> 解碼為更高等級物件的串流。

spring-core 模組提供了 byte[]ByteBufferDataBufferResourceString 編碼器和解碼器實作。spring-web 模組新增了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他編碼器和解碼器。請參閱 WebFlux 章節中的 編碼器

使用 DataBuffer

當使用資料緩衝區時,必須特別注意確保緩衝區被釋放,因為它們可能是集區化的。我們將使用編碼器來說明其運作方式,但這些概念更普遍適用。讓我們看看編碼器在內部必須做些什麼來管理資料緩衝區。

Decoder 是最後一個讀取輸入資料緩衝區的,在建立更高等級的物件之前,因此它必須按如下方式釋放它們:

  1. 如果 Decoder 只是簡單地讀取每個輸入緩衝區並準備立即釋放它,則可以透過 DataBufferUtils.release(dataBuffer) 這樣做。

  2. 如果 Decoder 正在使用 FluxMono 運算子(例如 flatMapreduce 和其他在內部預先提取和快取資料項目的運算子),或者正在使用運算子(例如 filterskip 和其他省略項目的運算子),則必須將 doOnDiscard(DataBuffer.class, DataBufferUtils::release) 新增至組合鏈,以確保在這些緩衝區被丟棄之前(也可能是由於錯誤或取消訊號的結果)被釋放。

  3. 如果 Decoder 以任何其他方式保留一個或多個資料緩衝區,則必須確保在完全讀取時或在錯誤或取消訊號發生在快取的資料緩衝區被讀取和釋放之前釋放它們。

請注意,DataBufferUtils#join 提供了一種安全有效的方法,可將資料緩衝區串流聚合為單個資料緩衝區。同樣地,skipUntilByteCounttakeUntilByteCount 是解碼器使用的其他安全方法。

Encoder 配置資料緩衝區,其他人必須讀取(和釋放)。因此,Encoder 沒有太多事情要做。但是,如果發生序列化錯誤,導致在用資料填充緩衝區時出錯,Encoder 必須注意釋放資料緩衝區。例如:

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的消費者負責釋放它接收到的資料緩衝區。在 WebFlux 應用程式中,Encoder 的輸出用於寫入 HTTP 伺服器回應,或寫入客户端 HTTP 請求,在這種情況下,釋放資料緩衝區是寫入伺服器回應或客户端請求的程式碼的責任。

請注意,當在 Netty 上執行時,有一些偵錯選項可用於 疑難排解緩衝區洩漏