Null Payload 與 ‘墓碑’ 記錄的日誌壓縮

當使用日誌壓縮時,您可以傳送和接收帶有 null payload 的訊息,以識別金鑰的刪除。您也可能因為其他原因收到 null 值,例如,當反序列化器無法反序列化值時可能會傳回 null

產生 Null Payload

您可以使用 ReactivePulsarTemplate 傳送 null 值,方法是將 null 訊息參數值傳遞給其中一個 send 方法,例如

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
當傳送 null 值時,您必須指定 schema 類型,因為系統無法從 null payload 判斷訊息的類型。

消費 Null Payload

對於 @ReactivePularListenernull payload 會根據其訊息參數的類型傳遞到 listener 方法中,如下所示

參數類型 傳入值

primitive (基本型別)

null

user-defined (使用者定義型別)

null

org.apache.pulsar.client.api.Message<T>

non-null Pulsar 訊息,其 getValue() 傳回 null

org.springframework.messaging.Message<T>

non-null Spring 訊息,其 getPayload() 傳回 PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

non-null flux,其條目為 non-null Pulsar 訊息,且這些訊息的 getValue() 傳回 null

Flux<org.springframework.messaging.Message<T>>

non-null flux,其條目為 non-null Spring 訊息,且這些訊息的 getPayload() 傳回 PulsarNull

當傳入值為 null 時 (即,具有 primitive 或 user-defined 類型的單筆記錄 listener),您必須使用 @Payload 參數註解,並設定 required = false
當使用 Spring org.springframework.messaging.Message 作為您的 listener payload 類型時,其泛型類型資訊必須夠廣泛,才能接受 Message<PulsarNull> (例如 MessageMessage<?>Message<Object>)。這是因為 Spring Message 不允許其 payload 使用 null 值,而是使用 PulsarNull 佔位符。

如果這是壓縮日誌的墓碑訊息,您通常也需要金鑰,以便您的應用程式可以判斷哪個金鑰被「deleted」(刪除)。以下範例顯示了這樣的配置

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
當使用串流訊息 listener (Flux) 時,標頭支援受到限制,因此在日誌壓縮情境中較不實用。