Apache Pulsar 的 Spring Cloud Stream Binder

適用於 Apache Pulsar 的 Spring 提供 Spring Cloud Stream 的 binder,我們可以使用它來建構使用發布-訂閱模式的事件驅動微服務。在本節中,我們將介紹此 binder 的基本詳細資訊。

用法

我們需要在您的應用程式中包含以下相依性,才能使用適用於 Spring Cloud Stream 的 Apache Pulsar binder。

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概觀

適用於 Apache Pulsar 的 Spring Cloud Stream binder 讓應用程式可以專注於業務邏輯,而無需處理管理和維護 Pulsar 的較低層級細節。binder 為應用程式開發人員處理所有這些細節。Spring Cloud Stream 帶來基於 Spring Cloud Function 的強大程式設計模型,讓應用程式開發人員可以使用函數式風格編寫複雜的事件驅動應用程式。應用程式可以從中介軟體中立的方式開始,然後透過 Spring Boot 組態屬性將 Pulsar 主題對應為 Spring Cloud Stream 中的目的地。Spring Cloud Stream 建構於 Spring Boot 之上,當您使用 Spring Cloud Stream 編寫事件驅動微服務時,您實際上是在編寫一個 Boot 應用程式。以下是一個簡單的 Spring Cloud Stream 應用程式。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上面的範例應用程式是一個功能完整的 Spring Boot 應用程式,值得一些解釋。但是,初步來看,您可以看到這只是純粹的 Java 和一些 Spring 與 Spring Boot 註解。我們在這裡有三個 Bean 方法 - 一個 java.util.function.Supplier、一個 java.util.function.Function,以及最後一個 java.util.function.Consumer。供應商產生目前的毫秒時間,函數取得此時間,然後透過新增一些隨機資料來增強它,然後消費者記錄增強的時間。

為了簡潔起見,我們省略了所有匯入,但在整個應用程式中沒有任何 Spring Cloud Stream 特有的東西。它如何變成與 Apache Pulsar 互動的 Spring Cloud Stream 應用程式?您必須在應用程式中包含上述 binder 的相依性。新增該相依性後,您必須提供以下組態屬性。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

有了這個,上面的 Spring Boot 應用程式已成為基於 Spring Cloud Stream 的端對端事件驅動應用程式。由於我們的 classpath 上有 Pulsar binder,因此應用程式會與 Apache Pulsar 互動。如果應用程式中只有一個函數,則我們不需要告知 Spring Cloud Stream 啟動該函數以供執行,因為它預設會執行此操作。如果應用程式中有超過一個此類函數,如我們的範例所示,我們需要指示 Spring Cloud Stream 我們想要啟動哪些函數。在我們的案例中,我們需要啟動所有這些函數,而我們透過 spring.cloud.function.definition 屬性來完成此操作。預設情況下,bean 名稱會成為 Spring Cloud Stream 繫結名稱的一部分。繫結是 Spring Cloud Stream 中一個基本抽象概念,框架使用它與中介軟體目的地進行通訊。Spring Cloud Stream 執行的幾乎所有操作都發生在具體的繫結上。供應商只有輸出繫結;函數具有輸入和輸出繫結,而消費者只有輸入繫結。讓我們以我們的供應商 bean - timeSupplier 為例。此供應商的預設繫結名稱將為 timeSupplier-out-0。同樣地,timeProcessor 函數的預設繫結名稱在輸入端將為 timeProcessor-in-0,在輸出端將為 timeProcessor-out-0。請參閱 Spring Cloud Stream 參考文件,以取得有關如何變更預設繫結名稱的詳細資訊。在大多數情況下,使用預設繫結名稱就足夠了。我們在繫結名稱上設定目的地,如上所示。如果未提供目的地,則繫結名稱會成為目的地的值,如 timeSupplier-out-0 的情況。

執行上述應用程式時,您應該會看到供應商每秒執行一次,然後由函數消費,並增強記錄器消費者所消費的時間。

基於 Binder 的應用程式中的訊息轉換

在上面的範例應用程式中,我們沒有提供訊息轉換的結構描述資訊。這是因為,預設情況下,Spring Cloud Stream 使用其訊息轉換機制,使用透過 Spring Messaging 專案在 Spring Framework 中建立的訊息傳遞支援。除非另有指定,否則 Spring Cloud Stream 會使用 application/json 作為輸入和輸出繫結訊息轉換的 content-type。在輸出端,資料會序列化為 byte[],然後 Pulsar binder 使用 Schema.BYTES 將其透過網路傳送到 Pulsar 主題。同樣地,在輸入端,資料會以 byte[] 的形式從 Pulsar 主題消費,然後使用適當的訊息轉換器轉換為目標類型。

在 Pulsar 中使用原生轉換 (使用 Pulsar 結構描述)

雖然預設是使用框架提供的訊息轉換,但 Spring Cloud Stream 允許每個 binder 決定應如何轉換訊息。假設應用程式選擇採用此路徑。在這種情況下,Spring Cloud Stream 會避開使用任何 Spring 提供的訊息轉換設施,並傳遞它接收或產生的資料。Spring Cloud Stream 中的此功能稱為生產者端的原生編碼和消費者端的原生解碼。這表示編碼和解碼原生發生在目標中介軟體上,在我們的案例中,是在 Apache Pulsar 上。對於上面的應用程式,我們可以使用以下組態來繞過框架轉換並使用原生編碼和解碼。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生產者端啟用原生編碼的屬性是來自核心 Spring Cloud Stream 的繫結層級屬性。您在生產者繫結上設定它 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding 並將其設定為 true。同樣地,對於消費者繫結,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 並將其設定為 true。如果我們決定使用原生編碼和解碼,在 Pulsar 的情況下,我們需要設定對應的結構描述和底層訊息類型資訊。此資訊以擴充繫結屬性的形式提供。如您在上面的組態中所見,屬性為 - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type (用於結構描述資訊) 和 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type (用於實際目標類型)。如果您的訊息同時具有鍵和值,則可以使用 message-key-typemessage-value-type 來指定其目標類型。

當省略 schema-type 屬性時,將諮詢任何已組態的自訂結構描述對應。

訊息標頭轉換

每個訊息通常都有標頭資訊,需要在訊息透過 Spring Cloud Stream 輸入和輸出繫結在 Pulsar 和 Spring Messaging 之間傳輸時一併攜帶。為了支援此傳輸,框架會處理必要的訊息標頭轉換。

自訂標頭對應器

Pulsar binder 已組態預設標頭對應器,可以透過提供您自己的 PulsarHeaderMapper bean 來覆寫它。

在以下範例中,組態了 JSON 標頭對應器,其

  • 對應所有輸入標頭 (除了鍵為 “top” 或 “secret” 的標頭)

  • 對應輸出標頭 (除了鍵為 “id”、“timestamp” 或 “userId” 的標頭)

  • 僅信任 “com.acme” 套件中的物件以進行輸出反序列化

  • 使用簡單的 toString() 編碼對任何 “com.acme.Money” 標頭值進行去/序列化

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在 Binder 中使用 Pulsar 屬性

binder 使用適用於 Apache Pulsar 框架的 Spring 中的基本元件來建構其生產者和消費者繫結。由於基於 binder 的應用程式是 Spring Boot 應用程式,因此 binder 預設會使用適用於 Apache Pulsar 的 Spring Boot 自動組態。因此,核心框架層級提供的所有 Pulsar Spring Boot 屬性也可透過 binder 取得。例如,您可以使用具有字首 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的屬性。此外,您也可以在 binder 層級設定這些 Pulsar 屬性。例如,這也適用 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

上述任一種方法都可以,但當使用這類屬性時,它會套用至整個應用程式。如果您的應用程式中有多個函數,它們都會取得相同的屬性。您也可以在擴充繫結屬性層級設定這些 Pulsar 屬性來解決此問題。擴充繫結屬性會套用至繫結本身。例如,如果您有輸入和輸出繫結,而且兩者都需要一組不同的 Pulsar 屬性,則必須在擴充繫結上設定它們。生產者繫結的模式為 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。同樣地,對於消費者繫結,模式為 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。透過這種方式,您可以為同一個應用程式中的不同繫結套用一組不同的 Pulsar 屬性。

最高優先順序是擴充繫結屬性。在 binder 中套用屬性的優先順序為 擴充繫結屬性 → binder 屬性 → Spring Boot 屬性。(從最高到最低)。

以下是一些資源,您可以參考這些資源來尋找有關透過 Pulsar binder 提供的屬性的更多資訊。

Pulsar 生產者繫結組態。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.producer 字首。所有 Spring Boot 提供的 Pulsar 生產者屬性 也可透過此組態類別取得。

Pulsar 消費者繫結組態。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.consumer 字首。所有 Spring Boot 提供的 Pulsar 消費者屬性 也可透過此組態類別取得。

如需常見的 Pulsar binder 特定組態屬性,請參閱此處。這些屬性需要 spring.cloud.stream.pulsar.binder 字首。上述指定的生產者和消費者屬性 (包括 Spring Boot 屬性) 可以在 binder 上使用,字首為 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer

Pulsar 主題佈建器

適用於 Apache Pulsar 的 Spring Cloud Stream binder 隨附適用於 Pulsar 主題的現成佈建器。當執行應用程式時,如果缺少必要的主題,Pulsar 將為您建立主題。但是,這是一個基本非分割主題,如果您想要進階功能 (例如建立分割主題),您可以依賴 binder 中的主題佈建器。Pulsar 主題佈建器使用框架中的 PulsarAdministration,後者使用 PulsarAdminBuilder。因此,您需要設定 spring.pulsar.administration.service-url 屬性,除非您在預設伺服器和埠上執行 Pulsar。

在建立主題時指定分割區計數

建立主題時,您可以使用兩種方式設定分割區計數。首先,您可以使用屬性 spring.cloud.stream.pulsar.binder.partition-count 在 binder 層級設定它。如我們在上面看到的,這樣做會使應用程式建立的所有主題繼承此屬性。假設您想要在繫結層級進行精細控制以設定分割區。在這種情況下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 為每個繫結設定 partition-count 屬性。透過這種方式,同一個應用程式中不同函數建立的各種主題將根據應用程式需求而具有不同的分割區。