Debezium 支援
Debezium Engine,變更資料擷取 (CDC) 輸入通道配接器。DebeziumMessageProducer
允許擷取資料庫變更事件,將其轉換為訊息,然後串流至輸出通道。
您需要將 spring integration Debezium 相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.3.5"
您還需要為您的輸入資料庫包含 debezium 連接器 相依性。例如,若要將 Debezium 與 PostgreSQL 搭配使用,您將需要 postgres debezium 連接器
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
將 |
輸入 Debezium 通道配接器
Debezium 配接器預期預先設定的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
實例。
debezium-supplier 提供開箱即用的 |
Debezium Java DSL 可以從提供的 |
此外,可以使用下列設定屬性調整 DebeziumMessageProducer
-
contentType
- 允許處理JSON
(預設值)、AVRO
和PROTOBUF
訊息內容。contentType必須
與為提供的DebeziumEngine.Builder
設定的SerializationFormat
對齊。 -
enableBatch
- 設定為false
(預設值) 時,debezium 配接器會為從來源資料庫接收的每個ChangeEvent
資料變更事件傳送新的Message
。如果設定為true
,則配接器會為從 Debezium 引擎接收的每個ChangeEvent
批次向下游傳送單一Message
。此類 Payload 無法序列化,並且需要自訂序列化/反序列化實作。 -
enableEmptyPayload
- 啟用對 Tombstone (又稱刪除) 訊息的支援。在資料庫列刪除時,Debezium 可以傳送 Tombstone 變更事件,該事件具有與已刪除列相同的索引鍵,以及Optional.empty
的值。預設為false
。 -
headerMapper
- 自訂HeaderMapper
實作,允許選擇和轉換ChangeEvent
標頭為Message
標頭。預設DefaultDebeziumHeaderMapper
實作提供setHeaderNamesToMap
的 Setter。依預設,會映射所有標頭。 -
taskExecutor
- 為 Debezium 引擎設定自訂TaskExecutor
。
下列程式碼片段示範此通道配接器的各種設定
使用 Java 設定進行設定
下列 Spring Boot 應用程式顯示如何使用 Java 設定來設定輸入配接器的範例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | 事件預期目的地的邏輯目的地名稱。通常,目的地由 topic.prefix 設定選項、資料庫名稱和表格名稱組成。例如:my-topic.inventory.orders 。 |
2 | 包含已變更表格索引鍵和已變更列實際索引鍵的結構描述。索引鍵結構描述及其對應的索引鍵 Payload 都包含欄位,用於在連接器建立事件時,已變更表格 PRIMARY KEY (或唯一約束) 中的每個資料行。 |
3 | 與索引鍵類似,Payload 具有結構描述區段和 Payload 值區段。結構描述區段包含描述 Payload 值區段之 Envelope 結構的結構描述,包括其巢狀欄位。用於建立、更新或刪除資料之作業的變更事件都具有具有 Envelope 結構的值 Payload。 |
|
同樣地,我們可以設定 DebeziumMessageProducer
以批次處理傳入的變更事件
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支援
spring-integration-debezium
透過 Debezium
工廠和 DebeziumMessageProducerSpec
實作提供方便的 Java DSL Fluent API。
Debezium Java DSL 的輸入通道配接器為
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或從原生 debezium 設定屬性建立 DebeziumMessageProducerSpec
實例,並預設為 JSON
序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
下列 Spring Boot 應用程式提供使用 Java DSL 設定輸入配接器的範例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}