Apache Kafka 支援
Apache Kafka 透過提供 spring-kafka
專案的自動組態來支援。
Kafka 組態由 spring.kafka.*
中的外部組態屬性控制。例如,您可以在 application.properties
中宣告以下區段
-
屬性
-
YAML
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
若要在啟動時建立主題,請新增 NewTopic 類型的 bean。如果主題已存在,則會忽略該 bean。 |
請參閱 KafkaProperties
以取得更多支援的選項。
發送訊息
Spring 的 KafkaTemplate
已自動組態,您可以將其直接自動注入到您自己的 bean 中,如下列範例所示
-
Java
-
Kotlin
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
// ...
fun someMethod() {
kafkaTemplate.send("someTopic", "Hello")
}
}
如果定義了屬性 spring.kafka.producer.transaction-id-prefix ,則會自動組態 KafkaTransactionManager 。此外,如果定義了 RecordMessageConverter bean,它會自動關聯到自動組態的 KafkaTemplate 。 |
接收訊息
當 Apache Kafka 基礎架構存在時,任何 bean 都可以使用 @KafkaListener
註解來建立監聽器端點。如果沒有定義 KafkaListenerContainerFactory
,則會使用 spring.kafka.listener.*
中定義的鍵自動組態一個預設的工廠。
以下組件在 someTopic
主題上建立一個監聽器端點
-
Java
-
Kotlin
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果定義了 KafkaTransactionManager
bean,它會自動關聯到容器工廠。同樣地,如果定義了 RecordFilterStrategy
、CommonErrorHandler
、AfterRollbackProcessor
或 ConsumerAwareRebalanceListener
bean,它會自動關聯到預設工廠。
根據監聽器類型,RecordMessageConverter
或 BatchMessageConverter
bean 會關聯到預設工廠。如果批次監聽器僅存在 RecordMessageConverter
bean,則會將其包裝在 BatchMessageConverter
中。
自訂 ChainedKafkaTransactionManager 必須標記為 @Primary ,因為它通常參考自動組態的 KafkaTransactionManager bean。 |
Kafka Streams
Spring for Apache Kafka 提供了一個工廠 bean 來建立 StreamsBuilder
物件並管理其串流的生命週期。只要 kafka-streams
在類別路徑上,並且透過 @EnableKafkaStreams
註解啟用 Kafka Streams,Spring Boot 就會自動組態所需的 KafkaStreamsConfiguration
bean。
啟用 Kafka Streams 表示必須設定應用程式 ID 和啟動伺服器。前者可以使用 spring.kafka.streams.application-id
進行組態,如果未設定,則預設為 spring.application.name
。後者可以全域設定,也可以僅針對串流進行特定覆寫。
可以使用專用屬性來設定幾個額外的屬性;其他任意 Kafka 屬性可以使用 spring.kafka.streams.properties
命名空間進行設定。另請參閱 其他 Kafka 屬性 以取得更多資訊。
若要使用工廠 bean,請將 StreamsBuilder
注入到您的 @Bean
中,如下列範例所示
-
Java
-
Kotlin
import java.util.Locale;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
預設情況下,由 StreamBuilder
物件管理的串流會自動啟動。您可以使用 spring.kafka.streams.auto-startup
屬性來自訂此行為。
其他 Kafka 屬性
附錄的 整合屬性 區段中顯示了自動組態支援的屬性。請注意,在大多數情況下,這些屬性(連字符或駝峰式命名)直接對應到 Apache Kafka 點狀屬性。請參閱 Apache Kafka 文件以取得詳細資訊。
名稱中不包含客戶端類型(producer
、consumer
、admin
或 streams
)的屬性被視為通用屬性,適用於所有客戶端。如果需要,大多數這些通用屬性可以針對一個或多個客戶端類型進行覆寫。
Apache Kafka 將屬性指定為 HIGH、MEDIUM 或 LOW 的重要性。Spring Boot 自動組態支援所有 HIGH 重要性屬性、一些選定的 MEDIUM 和 LOW 屬性,以及任何沒有預設值的屬性。
只有 Kafka 支援的屬性子集可透過 KafkaProperties
類別直接使用。如果您希望使用未直接支援的其他屬性來組態個別客戶端類型,請使用以下屬性
-
屬性
-
YAML
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
這會將通用 prop.one
Kafka 屬性設定為 first
(適用於生產者、消費者、管理員和串流)、prop.two
管理員屬性設定為 second
、prop.three
消費者屬性設定為 third
、prop.four
生產者屬性設定為 fourth
,以及 prop.five
串流屬性設定為 fifth
。
您也可以如下組態 Spring Kafka JsonDeserializer
-
屬性
-
YAML
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同樣地,您可以停用 JsonSerializer
在標頭中發送類型資訊的預設行為
-
屬性
-
YAML
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以此方式設定的屬性會覆寫 Spring Boot 明確支援的任何組態項目。 |
使用嵌入式 Kafka 進行測試
Spring for Apache Kafka 提供了一種方便的方式來使用嵌入式 Apache Kafka Broker 測試專案。若要使用此功能,請使用來自 spring-kafka-test
模組的 @EmbeddedKafka
註解測試類別。如需更多資訊,請參閱 Spring for Apache Kafka 參考手冊。
為了使 Spring Boot 自動組態與上述嵌入式 Apache Kafka Broker 搭配運作,您需要將嵌入式 Broker 位址(由 EmbeddedKafkaBroker
填充)的系統屬性重新對應到 Apache Kafka 的 Spring Boot 組態屬性。有幾種方法可以做到這一點
-
提供一個系統屬性,將嵌入式 Broker 位址對應到測試類別中的
spring.kafka.bootstrap-servers
-
Java
-
Kotlin
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka
註解上組態屬性名稱
-
Java
-
Kotlin
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在組態屬性中使用佔位符
-
屬性
-
YAML
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"