Apache Pulsar 支援

Apache Pulsar 透過提供 Spring for Apache Pulsar 專案的自動組態來支援。

當類別路徑中存在 org.springframework.pulsar:spring-pulsar 時,Spring Boot 將會自動組態並註冊經典(命令式)Spring for Apache Pulsar 组件。當類別路徑中存在 org.springframework.pulsar:spring-pulsar-reactive 時,它也會對反應式组件執行相同的操作。

分別有 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive 啟動器,方便收集命令式和反應式使用的依賴項。

連線到 Pulsar

當您使用 Pulsar 啟動器時,Spring Boot 將會自動組態並註冊 PulsarClient Bean。

預設情況下,應用程式會嘗試連線到位於 pulsar://127.0.0.1:6650 的本機 Pulsar 執行個體。可以透過將 spring.pulsar.client.service-url 屬性設定為不同的值來調整此設定。

該值必須是有效的 Pulsar Protocol URL

您可以透過指定任何具有 spring.pulsar.client.* 字首的應用程式屬性來組態用戶端。

如果您需要對組態進行更多控制,請考慮註冊一個或多個 PulsarClientBuilderCustomizer Bean。

驗證

若要連線到需要驗證的 Pulsar 叢集,您需要透過設定 pluginClassName 和外掛程式所需的任何參數,來指定要使用的驗證外掛程式。您可以將參數設定為參數名稱到參數值的映射。以下範例顯示如何組態 AuthenticationOAuth2 外掛程式。

  • 屬性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要確保在 spring.pulsar.client.authentication.param.* 下定義的名稱與您的驗證外掛程式預期的名稱完全相符(通常為駝峰式命名)。Spring Boot 不會嘗試對這些條目進行任何形式的寬鬆綁定。

例如,如果您想要為 AuthenticationOAuth2 驗證外掛程式組態發行者 URL,則必須使用 spring.pulsar.client.authentication.param.issuerUrl。如果您使用其他形式,例如 issuerurlissuer-url,則設定將不會套用至外掛程式。

這種缺乏寬鬆綁定的情況也使得使用環境變數進行驗證參數變得棘手,因為大小寫敏感性會在轉換過程中遺失。如果您使用環境變數作為參數,則需要遵循 這些步驟,在 Spring for Apache Pulsar 參考文件中,才能使其正常運作。

SSL

預設情況下,Pulsar 用戶端以純文字與 Pulsar 服務通訊。您可以遵循 這些步驟,在 Spring for Apache Pulsar 參考文件中,以啟用 TLS 加密。

如需用戶端和驗證的完整詳細資訊,請參閱 Spring for Apache Pulsar 參考文件

反應式地連線到 Pulsar

當反應式自動組態被啟用時,Spring Boot 將會自動組態並註冊 ReactivePulsarClient Bean。

ReactivePulsarClient 適應先前描述的 PulsarClient 的執行個體。因此,請遵循上一節來組態 ReactivePulsarClient 使用的 PulsarClient

連線到 Pulsar 管理

Spring for Apache Pulsar 的 PulsarAdministration 用戶端也會自動組態。

預設情況下,應用程式會嘗試連線到位於 https://127.0.0.1:8080 的本機 Pulsar 執行個體。可以透過將 spring.pulsar.admin.service-url 屬性設定為不同的值(格式為 (http|https)://<host>:<port>)來調整此設定。

如果您需要對組態進行更多控制,請考慮註冊一個或多個 PulsarAdminBuilderCustomizer Bean。

驗證

當存取需要驗證的 Pulsar 叢集時,管理用戶端需要與常規 Pulsar 用戶端相同的安全性組態。您可以透過將 spring.pulsar.client.authentication 替換為 spring.pulsar.admin.authentication,來使用上述的 驗證組態

若要在啟動時建立主題,請新增類型為 PulsarTopic 的 Bean。如果主題已存在,則會忽略該 Bean。

傳送訊息

Spring 的 PulsarTemplate 已自動組態,您可以如以下範例所示使用它來傳送訊息

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依賴 PulsarProducerFactory 來建立底層 Pulsar 生產者。Spring Boot 自動組態也提供了此生產者工廠,預設情況下,它會快取它建立的生產者。您可以透過指定任何具有 spring.pulsar.producer.*spring.pulsar.producer.cache.* 字首的應用程式屬性,來組態生產者工廠和快取設定。

如果您需要對生產者工廠組態進行更多控制,請考慮註冊一個或多個 ProducerBuilderCustomizer Bean。這些自訂器會套用至所有建立的生產者。您也可以在傳送訊息時傳入 ProducerBuilderCustomizer,以僅影響目前的生產者。

如果您需要對要傳送的訊息進行更多控制,則可以在傳送訊息時傳入 TypedMessageBuilderCustomizer

反應式地傳送訊息

當反應式自動組態被啟用時,Spring 的 ReactivePulsarTemplate 會自動組態,您可以如以下範例所示使用它來傳送訊息

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依賴 ReactivePulsarSenderFactory 來實際建立底層的傳送器。Spring Boot 自動組態也提供了此傳送器工廠,預設情況下,它會快取它建立的生產者。您可以透過指定任何具有 spring.pulsar.producer.*spring.pulsar.producer.cache.* 字首的應用程式屬性,來組態傳送器工廠和快取設定。

如果您需要對傳送器工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageSenderBuilderCustomizer Bean。這些自訂器會套用至所有建立的傳送器。您也可以在傳送訊息時傳入 ReactiveMessageSenderBuilderCustomizer,以僅影響目前的傳送器。

如果您需要對要傳送的訊息進行更多控制,則可以在傳送訊息時傳入 MessageSpecBuilderCustomizer

接收訊息

當 Apache Pulsar 基礎架構存在時,任何 Bean 都可以使用 @PulsarListener 進行註解,以建立監聽器端點。以下组件會在 someTopic 主題上建立監聽器端點

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自動組態提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用來建構底層 Pulsar 消費者的消費者工廠。您可以透過指定任何具有 spring.pulsar.listener.*spring.pulsar.consumer.* 字首的應用程式屬性,來組態這些组件。

如果您需要對消費者工廠組態進行更多控制,請考慮註冊一個或多個 ConsumerBuilderCustomizer Bean。這些自訂器會套用至工廠建立的所有消費者,以及所有 @PulsarListener 執行個體。您也可以透過設定 @PulsarListener 註解的 consumerCustomizer 屬性來自訂單一監聽器。

反應式地接收訊息

當 Apache Pulsar 基礎架構存在且反應式自動組態被啟用時,任何 Bean 都可以使用 @ReactivePulsarListener 進行註解,以建立反應式監聽器端點。以下组件會在 someTopic 主題上建立反應式監聽器端點

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自動組態提供了 ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 以及它用來建構底層反應式 Pulsar 消費者的消費者工廠。您可以透過指定任何具有 spring.pulsar.listener.*spring.pulsar.consumer.* 字首的應用程式屬性,來組態這些组件。

如果您需要對消費者工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageConsumerBuilderCustomizer Bean。這些自訂器會套用至工廠建立的所有消費者,以及所有 @ReactivePulsarListener 執行個體。您也可以透過設定 @ReactivePulsarListener 註解的 consumerCustomizer 屬性來自訂單一監聽器。

讀取訊息

Pulsar 讀取器介面讓應用程式能夠手動管理游標。當您使用讀取器連線到主題時,您需要指定當讀取器連線到主題時,從哪個訊息開始讀取。

當 Apache Pulsar 基礎架構存在時,任何 Bean 都可以使用 @PulsarReader 進行註解,以使用讀取器來消費訊息。以下组件會建立一個讀取器端點,該端點從 someTopic 主題的開頭開始讀取訊息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依賴 PulsarReaderFactory 來建立底層 Pulsar 讀取器。Spring Boot 自動組態提供了此讀取器工廠,可以透過設定任何具有 spring.pulsar.reader.* 字首的應用程式屬性來自訂。

如果您需要對讀取器工廠組態進行更多控制,請考慮註冊一個或多個 ReaderBuilderCustomizer Bean。這些自訂器會套用至工廠建立的所有讀取器,以及所有 @PulsarReader 執行個體。您也可以透過設定 @PulsarReader 註解的 readerCustomizer 屬性來自訂單一監聽器。

反應式地讀取訊息

當 Apache Pulsar 基礎架構存在且反應式自動組態被啟用時,Spring 的 ReactivePulsarReaderFactory 會被提供,您可以使用它來建立讀取器,以便以反應式方式讀取訊息。以下组件使用提供的工廠建立一個讀取器,並從 someTopic 主題中讀取 5 分鐘前的單一訊息

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自動組態提供了此讀取器工廠,可以透過設定任何具有 spring.pulsar.reader.* 字首的應用程式屬性來自訂。

如果您需要對讀取器工廠組態進行更多控制,請考慮在使用工廠建立讀取器時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer 執行個體。

如果您需要對讀取器工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageReaderBuilderCustomizer Bean。這些自訂器會套用至所有建立的讀取器。您也可以在建立讀取器時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer,以僅將自訂套用至建立的讀取器。

如需上述任何组件的更多詳細資訊,以及探索其他可用的功能,請參閱 Spring for Apache Pulsar 參考文件

交易支援

當使用 PulsarTemplate@PulsarListener 時,Spring for Apache Pulsar 支援交易。

目前在使用反應式變體時,不支援交易。

spring.pulsar.transaction.enabled 屬性設定為 true 將會

  • 組態 PulsarTransactionManager Bean

  • PulsarTemplate 啟用交易支援

  • @PulsarListener 方法啟用交易支援

@PulsarListenertransactional 屬性可用於微調何時應將交易與監聽器一起使用。

若要更精細地控制 Spring for Apache Pulsar 交易功能,您應該定義自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果預設自動組態的 PulsarTransactionManager 不適合,您也可以定義 PulsarAwareTransactionManager Bean。

其他 Pulsar 屬性

附錄的 整合屬性 區段中顯示了自動組態支援的屬性。請注意,在大多數情況下,這些屬性(連字符或駝峰式命名)直接映射到 Apache Pulsar 組態屬性。請參閱 Apache Pulsar 文件以取得詳細資訊。

只有 Pulsar 支援的屬性子集可透過 PulsarProperties 類別直接取得。如果您希望使用未直接支援的其他屬性來調整自動組態的组件,則可以使用每個前述组件支援的自訂器。