Apache Pulsar 支援
Apache Pulsar 透過提供 Spring for Apache Pulsar 專案的自動組態來支援。
當類別路徑中存在 org.springframework.pulsar:spring-pulsar
時,Spring Boot 將會自動組態並註冊經典(命令式)Spring for Apache Pulsar 组件。當類別路徑中存在 org.springframework.pulsar:spring-pulsar-reactive
時,它也會對反應式组件執行相同的操作。
分別有 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
啟動器,方便收集命令式和反應式使用的依賴項。
連線到 Pulsar
當您使用 Pulsar 啟動器時,Spring Boot 將會自動組態並註冊 PulsarClient
Bean。
預設情況下,應用程式會嘗試連線到位於 pulsar://127.0.0.1:6650
的本機 Pulsar 執行個體。可以透過將 spring.pulsar.client.service-url
屬性設定為不同的值來調整此設定。
該值必須是有效的 Pulsar Protocol URL |
您可以透過指定任何具有 spring.pulsar.client.*
字首的應用程式屬性來組態用戶端。
如果您需要對組態進行更多控制,請考慮註冊一個或多個 PulsarClientBuilderCustomizer
Bean。
驗證
若要連線到需要驗證的 Pulsar 叢集,您需要透過設定 pluginClassName
和外掛程式所需的任何參數,來指定要使用的驗證外掛程式。您可以將參數設定為參數名稱到參數值的映射。以下範例顯示如何組態 AuthenticationOAuth2
外掛程式。
-
屬性
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
您需要確保在 例如,如果您想要為 這種缺乏寬鬆綁定的情況也使得使用環境變數進行驗證參數變得棘手,因為大小寫敏感性會在轉換過程中遺失。如果您使用環境變數作為參數,則需要遵循 這些步驟,在 Spring for Apache Pulsar 參考文件中,才能使其正常運作。 |
反應式地連線到 Pulsar
當反應式自動組態被啟用時,Spring Boot 將會自動組態並註冊 ReactivePulsarClient
Bean。
ReactivePulsarClient
適應先前描述的 PulsarClient
的執行個體。因此,請遵循上一節來組態 ReactivePulsarClient
使用的 PulsarClient
。
連線到 Pulsar 管理
Spring for Apache Pulsar 的 PulsarAdministration
用戶端也會自動組態。
預設情況下,應用程式會嘗試連線到位於 https://127.0.0.1:8080
的本機 Pulsar 執行個體。可以透過將 spring.pulsar.admin.service-url
屬性設定為不同的值(格式為 (http|https)://<host>:<port>
)來調整此設定。
如果您需要對組態進行更多控制,請考慮註冊一個或多個 PulsarAdminBuilderCustomizer
Bean。
驗證
當存取需要驗證的 Pulsar 叢集時,管理用戶端需要與常規 Pulsar 用戶端相同的安全性組態。您可以透過將 spring.pulsar.client.authentication
替換為 spring.pulsar.admin.authentication
,來使用上述的 驗證組態。
若要在啟動時建立主題,請新增類型為 PulsarTopic 的 Bean。如果主題已存在,則會忽略該 Bean。 |
傳送訊息
Spring 的 PulsarTemplate
已自動組態,您可以如以下範例所示使用它來傳送訊息
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate
依賴 PulsarProducerFactory
來建立底層 Pulsar 生產者。Spring Boot 自動組態也提供了此生產者工廠,預設情況下,它會快取它建立的生產者。您可以透過指定任何具有 spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
字首的應用程式屬性,來組態生產者工廠和快取設定。
如果您需要對生產者工廠組態進行更多控制,請考慮註冊一個或多個 ProducerBuilderCustomizer
Bean。這些自訂器會套用至所有建立的生產者。您也可以在傳送訊息時傳入 ProducerBuilderCustomizer
,以僅影響目前的生產者。
如果您需要對要傳送的訊息進行更多控制,則可以在傳送訊息時傳入 TypedMessageBuilderCustomizer
。
反應式地傳送訊息
當反應式自動組態被啟用時,Spring 的 ReactivePulsarTemplate
會自動組態,您可以如以下範例所示使用它來傳送訊息
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate
依賴 ReactivePulsarSenderFactory
來實際建立底層的傳送器。Spring Boot 自動組態也提供了此傳送器工廠,預設情況下,它會快取它建立的生產者。您可以透過指定任何具有 spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
字首的應用程式屬性,來組態傳送器工廠和快取設定。
如果您需要對傳送器工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageSenderBuilderCustomizer
Bean。這些自訂器會套用至所有建立的傳送器。您也可以在傳送訊息時傳入 ReactiveMessageSenderBuilderCustomizer
,以僅影響目前的傳送器。
如果您需要對要傳送的訊息進行更多控制,則可以在傳送訊息時傳入 MessageSpecBuilderCustomizer
。
接收訊息
當 Apache Pulsar 基礎架構存在時,任何 Bean 都可以使用 @PulsarListener
進行註解,以建立監聽器端點。以下组件會在 someTopic
主題上建立監聽器端點
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自動組態提供了 PulsarListener
所需的所有组件,例如 PulsarListenerContainerFactory
以及它用來建構底層 Pulsar 消費者的消費者工廠。您可以透過指定任何具有 spring.pulsar.listener.*
和 spring.pulsar.consumer.*
字首的應用程式屬性,來組態這些组件。
如果您需要對消費者工廠組態進行更多控制,請考慮註冊一個或多個 ConsumerBuilderCustomizer
Bean。這些自訂器會套用至工廠建立的所有消費者,以及所有 @PulsarListener
執行個體。您也可以透過設定 @PulsarListener
註解的 consumerCustomizer
屬性來自訂單一監聽器。
反應式地接收訊息
當 Apache Pulsar 基礎架構存在且反應式自動組態被啟用時,任何 Bean 都可以使用 @ReactivePulsarListener
進行註解,以建立反應式監聽器端點。以下组件會在 someTopic
主題上建立反應式監聽器端點
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自動組態提供了 ReactivePulsarListener
所需的所有组件,例如 ReactivePulsarListenerContainerFactory
以及它用來建構底層反應式 Pulsar 消費者的消費者工廠。您可以透過指定任何具有 spring.pulsar.listener.*
和 spring.pulsar.consumer.*
字首的應用程式屬性,來組態這些组件。
如果您需要對消費者工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageConsumerBuilderCustomizer
Bean。這些自訂器會套用至工廠建立的所有消費者,以及所有 @ReactivePulsarListener
執行個體。您也可以透過設定 @ReactivePulsarListener
註解的 consumerCustomizer
屬性來自訂單一監聽器。
讀取訊息
Pulsar 讀取器介面讓應用程式能夠手動管理游標。當您使用讀取器連線到主題時,您需要指定當讀取器連線到主題時,從哪個訊息開始讀取。
當 Apache Pulsar 基礎架構存在時,任何 Bean 都可以使用 @PulsarReader
進行註解,以使用讀取器來消費訊息。以下组件會建立一個讀取器端點,該端點從 someTopic
主題的開頭開始讀取訊息
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader
依賴 PulsarReaderFactory
來建立底層 Pulsar 讀取器。Spring Boot 自動組態提供了此讀取器工廠,可以透過設定任何具有 spring.pulsar.reader.*
字首的應用程式屬性來自訂。
如果您需要對讀取器工廠組態進行更多控制,請考慮註冊一個或多個 ReaderBuilderCustomizer
Bean。這些自訂器會套用至工廠建立的所有讀取器,以及所有 @PulsarReader
執行個體。您也可以透過設定 @PulsarReader
註解的 readerCustomizer
屬性來自訂單一監聽器。
反應式地讀取訊息
當 Apache Pulsar 基礎架構存在且反應式自動組態被啟用時,Spring 的 ReactivePulsarReaderFactory
會被提供,您可以使用它來建立讀取器,以便以反應式方式讀取訊息。以下组件使用提供的工廠建立一個讀取器,並從 someTopic
主題中讀取 5 分鐘前的單一訊息
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自動組態提供了此讀取器工廠,可以透過設定任何具有 spring.pulsar.reader.*
字首的應用程式屬性來自訂。
如果您需要對讀取器工廠組態進行更多控制,請考慮在使用工廠建立讀取器時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer
執行個體。
如果您需要對讀取器工廠組態進行更多控制,請考慮註冊一個或多個 ReactiveMessageReaderBuilderCustomizer
Bean。這些自訂器會套用至所有建立的讀取器。您也可以在建立讀取器時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer
,以僅將自訂套用至建立的讀取器。
如需上述任何组件的更多詳細資訊,以及探索其他可用的功能,請參閱 Spring for Apache Pulsar 參考文件。 |
交易支援
當使用 PulsarTemplate
和 @PulsarListener
時,Spring for Apache Pulsar 支援交易。
目前在使用反應式變體時,不支援交易。 |
將 spring.pulsar.transaction.enabled
屬性設定為 true
將會
-
組態
PulsarTransactionManager
Bean -
為
PulsarTemplate
啟用交易支援 -
為
@PulsarListener
方法啟用交易支援
@PulsarListener
的 transactional
屬性可用於微調何時應將交易與監聽器一起使用。
若要更精細地控制 Spring for Apache Pulsar 交易功能,您應該定義自己的 PulsarTemplate
和/或 ConcurrentPulsarListenerContainerFactory
Bean。如果預設自動組態的 PulsarTransactionManager
不適合,您也可以定義 PulsarAwareTransactionManager
Bean。
其他 Pulsar 屬性
附錄的 整合屬性 區段中顯示了自動組態支援的屬性。請注意,在大多數情況下,這些屬性(連字符或駝峰式命名)直接映射到 Apache Pulsar 組態屬性。請參閱 Apache Pulsar 文件以取得詳細資訊。
只有 Pulsar 支援的屬性子集可透過 PulsarProperties
類別直接取得。如果您希望使用未直接支援的其他屬性來調整自動組態的组件,則可以使用每個前述组件支援的自訂器。