Spring Cloud Stream Schema Registry

簡介

當組織採用基於訊息的發布/訂閱架構,且多個生產者和消費者微服務相互通訊時,通常所有這些微服務都需要就基於綱要的合約達成一致。當此類綱要需要發展以適應新的業務需求時,現有組件仍需繼續運作。Spring Cloud Stream 提供了獨立綱要註冊伺服器的支援,使用該伺服器可以註冊上述綱要並供應用程式使用。Spring Cloud Stream 綱要註冊支援還提供基於 Avro 的綱要註冊用戶端的支援,這些用戶端本質上提供了訊息轉換器,可與綱要註冊中心通訊,以便在訊息轉換期間協調綱要。Spring Cloud Stream 提供的綱要演進支援適用於上述獨立綱要註冊中心以及 Confluent 提供的專門用於 Apache Kafka 的綱要註冊中心。

Spring Cloud Stream Schema Registry 概觀

Spring Cloud Stream Schema Registry 提供了綱要演進的支援,以便資料可以隨著時間推移而演進,並且仍然可以與較舊或較新的生產者和消費者協同運作,反之亦然。大多數序列化模型,尤其是那些旨在跨不同平台和語言實現可移植性的模型,都依賴於描述資料如何在二進位酬載中序列化的綱要。為了序列化資料然後解釋它,發送方和接收方都必須存取描述二進位格式的綱要。在某些情況下,綱要可以從序列化時的酬載類型或反序列化時的目標類型推斷出來。然而,許多應用程式受益於存取描述二進位資料格式的顯式綱要。綱要註冊中心可讓您以文字格式(通常為 JSON)儲存綱要資訊,並使需要它的各種應用程式可以存取該資訊,以便以二進位格式接收和發送資料。綱要可以作為一個元組引用,其中包含:

  • 主旨,即綱要的邏輯名稱

  • 綱要版本

  • 綱要格式,描述資料的二進位格式

Spring Cloud Stream Schema Registry 提供以下組件

  • 獨立綱要註冊伺服器

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • 綱要註冊用戶端,能夠透過與綱要註冊中心通訊來進行訊息編組。

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

綱要註冊用戶端

用於與綱要註冊伺服器互動的用戶端抽象化是 SchemaRegistryClient 介面,其結構如下

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供了開箱即用的實作,用於與其自身的綱要伺服器以及與 Confluent Schema Registry 互動。

可以使用 @EnableSchemaRegistryClient 組態 Spring Cloud Stream 綱要註冊中心的用戶端,如下所示

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
預設轉換器經過最佳化,不僅可以快取來自遠端伺服器的綱要,還可以快取 parse()toString() 方法,這些方法非常耗費資源。因此,它使用不快取回應的 DefaultSchemaRegistryClient。如果您打算變更預設行為,您可以直接在您的程式碼中使用用戶端並覆寫它以達到所需的結果。為此,您必須將屬性 spring.cloud.stream.schemaRegistryClient.cached=true 新增至您的應用程式屬性。

綱要註冊用戶端屬性

綱要註冊用戶端支援以下屬性

spring.cloud.stream.schemaRegistryClient.endpoint

綱要伺服器的位置。設定此項時,請使用完整 URL,包括協定 (httphttps)、連接埠和內容路徑。

預設值

localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

用戶端是否應快取綱要伺服器回應。通常設定為 false,因為快取發生在訊息轉換器中。使用綱要註冊用戶端的用戶端應將此項設定為 true

預設值

false

Avro 綱要註冊用戶端訊息轉換器

對於在應用程式內容中註冊了 SchemaRegistryClient Bean 的應用程式,Spring Cloud Stream 會自動組態 Apache Avro 訊息轉換器以進行綱要管理。這簡化了綱要演進,因為接收訊息的應用程式可以輕鬆存取寫入器綱要,該綱要可以與它們自己的讀取器綱要協調。

對於出站訊息,如果繫結的內容類型設定為 application/*+avro,則 MessageConverter 會被啟動,如下列範例所示

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站轉換期間,訊息轉換器會嘗試推斷每個出站訊息的綱要(基於其類型),並使用 SchemaRegistryClient 將其註冊到主旨(基於酬載類型)。如果已找到相同的綱要,則會擷取對其的參考。否則,綱要會被註冊,並提供新的版本號碼。訊息會使用下列方案透過 contentType 標頭發送:application/[prefix].[subject].v[version]+avro,其中 prefix 是可組態的,而 subject 是從酬載類型推導出來的。

例如,類型為 User 的訊息可能會以二進位酬載形式發送,內容類型為 application/vnd.user.v2+avro,其中 user 是主旨,而 2 是版本號碼。

接收訊息時,轉換器會從傳入訊息的標頭推斷綱要參考,並嘗試擷取它。該綱要**在反序列化過程中用作寫入器綱要。**

Avro 綱要註冊訊息轉換器屬性

如果您已透過設定 spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro 啟用基於 Avro 的綱要註冊用戶端,則可以透過設定下列屬性來自訂註冊行為。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

如果您希望轉換器使用反射從 POJO 推斷綱要,請啟用此項。

預設值:false

spring.cloud.stream.schema.avro.readerSchema

Avro 透過查看寫入器綱要(原始酬載)和讀取器綱要(您的應用程式酬載)來比較綱要版本。有關更多資訊,請參閱 Avro 文件。如果設定,這將覆寫綱要伺服器上的任何查找,並使用本機綱要作為讀取器綱要。預設值:null

spring.cloud.stream.schema.avro.schemaLocations

向綱要伺服器註冊此屬性中列出的任何 .avsc 檔案。

預設值:empty

spring.cloud.stream.schema.avro.prefix

要在 Content-Type 標頭上使用的前綴。

預設值:vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

決定用於在綱要註冊中心註冊 Avro 綱要的主旨名稱。有兩種實作可用,org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主旨是綱要名稱,以及 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它使用 Avro 綱要命名空間和名稱傳回完全合格的主旨。可以透過實作 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy 來建立自訂策略。

預設值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

忽略任何綱要註冊中心通訊。對於測試目的很有用,以便在執行單元測試時,它不會不必要地嘗試連接到綱要註冊伺服器。

預設值:false

Apache Avro 訊息轉換器

Spring Cloud Stream 透過其 spring-cloud-stream-schema-registry-client 模組提供對基於綱要的訊息轉換器的支援。目前,基於綱要的訊息轉換器開箱即用支援的唯一序列化格式是 Apache Avro,未來版本將新增更多格式。

spring-cloud-stream-schema-registry-client 模組包含兩種可用於 Apache Avro 序列化的訊息轉換器

  • 使用序列化或反序列化物件的類別資訊或啟動時已知位置的綱要的轉換器。

  • 使用綱要註冊中心的轉換器。它們在執行階段定位綱要,並在網域物件演進時動態註冊新綱要。

具有綱要支援的轉換器

AvroSchemaMessageConverter 支援透過使用預定義的綱要或使用類別中可用的綱要資訊(以反射方式或包含在 SpecificRecord 中)來序列化和反序列化訊息。如果您提供自訂轉換器,則不會建立預設的 AvroSchemaMessageConverter Bean。以下範例顯示了自訂轉換器

若要使用自訂轉換器,您可以簡單地將其新增至應用程式內容,選擇性地指定一個或多個要與其關聯的 MimeType。預設 MimeTypeapplication/avro

如果轉換的目標類型是 GenericRecord,則必須設定綱要。

以下範例顯示如何在接收器應用程式中組態轉換器,方法是不使用預定義的綱要來註冊 Apache Avro MessageConverter。在此範例中,請注意 mime 類型值為 avro/bytes,而不是預設的 application/avro

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反地,下列應用程式使用預定義的綱要(在 classpath 上找到)註冊轉換器

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

綱要註冊伺服器

Spring Cloud Stream 提供了綱要註冊伺服器實作。若要使用它,您可以下載最新的 spring-cloud-stream-schema-registry-server 版本並將其作為獨立應用程式執行

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

您可以將綱要註冊中心嵌入到您現有的 Spring Boot Web 應用程式中。若要執行此操作,請將 spring-cloud-stream-schema-registry-core 構件新增至您的專案,並使用 @EnableSchemaRegistryServer 註解,這會將綱要註冊伺服器 REST 控制器新增至您的應用程式。以下範例顯示了啟用綱要註冊中心的 Spring Boot 應用程式

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path 屬性可用於控制綱要伺服器的根路徑(尤其是在它嵌入到其他應用程式中時)。spring.cloud.stream.schema.server.allowSchemaDeletion 布林屬性啟用綱要的刪除。預設情況下,此項已停用。

綱要註冊伺服器使用關聯式資料庫來儲存綱要。預設情況下,它使用嵌入式資料庫。您可以使用 Spring Boot SQL 資料庫和 JDBC 組態選項來自訂綱要儲存。

綱要註冊伺服器 API

綱要註冊伺服器 API 包含以下操作

註冊新綱要

若要註冊新綱要,請將 POST 請求發送到 / 端點。

/ 接受具有下列欄位的 JSON 酬載

  • subject:綱要主旨

  • format:綱要格式

  • definition:綱要定義

其回應是 JSON 格式的綱要物件,具有下列欄位

  • id:綱要 ID

  • subject:綱要主旨

  • format:綱要格式

  • version:綱要版本

  • definition:綱要定義

依主旨、格式和版本擷取現有綱要

若要依主旨、格式和版本擷取現有綱要,請將 GET 請求發送到 {subject}/{format}/{version} 端點。

其回應是 JSON 格式的綱要物件,具有下列欄位

  • id:綱要 ID

  • subject:綱要主旨

  • format:綱要格式

  • version:綱要版本

  • definition:綱要定義

依主旨和格式擷取現有綱要

若要依主旨和格式擷取現有綱要,請將 GET 請求發送到 /subject/format 端點。

其回應是綱要清單,其中每個綱要物件都是 JSON 格式,具有下列欄位

  • id:綱要 ID

  • subject:綱要主旨

  • format:綱要格式

  • version:綱要版本

  • definition:綱要定義

依 ID 擷取現有綱要

若要依其 ID 擷取綱要,請將 GET 請求發送到 /schemas/{id} 端點。

其回應是 JSON 格式的綱要物件,具有下列欄位

  • id:綱要 ID

  • subject:綱要主旨

  • format:綱要格式

  • version:綱要版本

  • definition:綱要定義

依主旨、格式和版本刪除綱要

若要刪除由其主旨、格式和版本識別的綱要,請將 DELETE 請求發送到 {subject}/{format}/{version} 端點。

依 ID 刪除綱要

若要依其 ID 刪除綱要,請將 DELETE 請求發送到 /schemas/{id} 端點。

依主旨刪除綱要

DELETE /{subject}

依其主旨刪除現有綱要。

此注意事項僅適用於 Spring Cloud Stream 1.1.0.RELEASE 的使用者。Spring Cloud Stream 1.1.0.RELEASE 使用表名稱 schema 來儲存 Schema 物件。Schema 是許多資料庫實作中的關鍵字。為了避免未來發生任何衝突,從 1.1.1.RELEASE 開始,我們選擇了名稱 SCHEMA_REPOSITORY 作為儲存表。任何升級的 Spring Cloud Stream 1.1.0.RELEASE 使用者都應在升級之前將其現有綱要移轉到新表。

使用 Confluent 的綱要註冊中心

預設組態會建立 DefaultSchemaRegistryClient Bean。如果您想使用 Confluent 綱要註冊中心,您需要建立 ConfluentSchemaRegistryClient 類型的 Bean,這將取代架構預設組態的 Bean。以下範例顯示如何建立此類 Bean

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient 已針對 Confluent 平台 4.0.0 版進行測試。

綱要註冊與解析

為了更好地理解 Spring Cloud Stream 如何註冊和解析新綱要以及其 Avro 綱要比較功能的使用,我們提供了兩個獨立的小節

綱要註冊流程(序列化)

註冊流程的第一部分是從透過通道發送的酬載中提取綱要。Avro 類型(例如 SpecificRecordGenericRecord)已經包含綱要,可以直接從實例中擷取。對於 POJO,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 屬性設定為 true(預設值),則會推斷綱要。

取得綱要後,轉換器會從遠端伺服器載入其元資料(版本)。首先,它會查詢本機快取。如果找不到結果,它會將資料提交到伺服器,伺服器會回覆版本資訊。轉換器始終快取結果,以避免為每個需要序列化的新訊息查詢綱要伺服器的額外負荷。

有了綱要版本資訊,轉換器會設定訊息的 contentType 標頭以攜帶版本資訊 — 例如:application/vnd.user.v1+avro

綱要解析流程(反序列化)

當讀取包含版本資訊的訊息時(也就是說,contentType 標頭具有類似於綱要註冊流程(序列化)下描述的方案),轉換器會查詢綱要伺服器以擷取訊息的寫入器綱要。一旦它找到傳入訊息的正確綱要,它就會擷取讀取器綱要,並透過使用 Avro 的綱要解析支援,將其讀取到讀取器定義中(設定預設值和任何遺失的屬性)。

您應該了解寫入器綱要(寫入訊息的應用程式)和讀取器綱要(接收應用程式)之間的差異。我們建議您花一些時間閱讀 Avro 術語並了解該流程。Spring Cloud Stream 始終擷取寫入器綱要以確定如何讀取訊息。如果您想讓 Avro 的綱要演進支援正常運作,您需要確保為您的應用程式正確設定了 readerSchema