Debezium 支援

Debezium Engine,變更資料擷取 (CDC) 輸入通道配接器。DebeziumMessageProducer 允許擷取資料庫變更事件,將其轉換為訊息,然後串流至輸出通道。

您需要將 spring integration Debezium 相依性包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.3.5"

您還需要為您的輸入資料庫包含 debezium 連接器 相依性。例如,若要將 Debezium 與 PostgreSQL 搭配使用,您將需要 postgres debezium 連接器

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

debezium-version 替換為與正在使用的 spring-integration-debezium 版本相容的版本。

輸入 Debezium 通道配接器

Debezium 配接器預期預先設定的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 實例。

debezium-supplier 提供開箱即用的 DebeziumEngine.Builder Spring Boot 自動設定,以及方便的 DebeziumProperties 設定抽象概念。

Debezium Java DSL 可以從提供的 DebeziumEngine.Builder 以及純 Debezium 設定 (例如 java.util.Properties) 建立 DebeziumMessageProducer 實例。後者對於具有意見導向設定和序列化格式的一些常見使用案例可能很方便。

此外,可以使用下列設定屬性調整 DebeziumMessageProducer

  • contentType - 允許處理 JSON (預設值)、AVROPROTOBUF 訊息內容。contentType 必須 與為提供的 DebeziumEngine.Builder 設定的 SerializationFormat 對齊。

  • enableBatch - 設定為 false (預設值) 時,debezium 配接器會為從來源資料庫接收的每個 ChangeEvent 資料變更事件傳送新的 Message。如果設定為 true,則配接器會為從 Debezium 引擎接收的每個 ChangeEvent 批次向下游傳送單一 Message。此類 Payload 無法序列化,並且需要自訂序列化/反序列化實作。

  • enableEmptyPayload - 啟用對 Tombstone (又稱刪除) 訊息的支援。在資料庫列刪除時,Debezium 可以傳送 Tombstone 變更事件,該事件具有與已刪除列相同的索引鍵,以及 Optional.empty 的值。預設為 false

  • headerMapper - 自訂 HeaderMapper 實作,允許選擇和轉換 ChangeEvent 標頭為 Message 標頭。預設 DefaultDebeziumHeaderMapper 實作提供 setHeaderNamesToMap 的 Setter。依預設,會映射所有標頭。

  • taskExecutor - 為 Debezium 引擎設定自訂 TaskExecutor

下列程式碼片段示範此通道配接器的各種設定

使用 Java 設定進行設定

下列 Spring Boot 應用程式顯示如何使用 Java 設定來設定輸入配接器的範例

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 事件預期目的地的邏輯目的地名稱。通常,目的地由 topic.prefix 設定選項、資料庫名稱和表格名稱組成。例如:my-topic.inventory.orders
2 包含已變更表格索引鍵和已變更列實際索引鍵的結構描述。索引鍵結構描述及其對應的索引鍵 Payload 都包含欄位,用於在連接器建立事件時,已變更表格 PRIMARY KEY (或唯一約束) 中的每個資料行。
3 與索引鍵類似,Payload 具有結構描述區段和 Payload 值區段。結構描述區段包含描述 Payload 值區段之 Envelope 結構的結構描述,包括其巢狀欄位。用於建立、更新或刪除資料之作業的變更事件都具有具有 Envelope 結構的值 Payload。

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允許分別停用索引鍵或 Payload 的訊息內結構描述內容。

同樣地,我們可以設定 DebeziumMessageProducer 以批次處理傳入的變更事件

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL 支援

spring-integration-debezium 透過 Debezium 工廠和 DebeziumMessageProducerSpec 實作提供方便的 Java DSL Fluent API。

Debezium Java DSL 的輸入通道配接器為

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或從原生 debezium 設定屬性建立 DebeziumMessageProducerSpec 實例,並預設為 JSON 序列化格式。

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

下列 Spring Boot 應用程式提供使用 Java DSL 設定輸入配接器的範例

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}