設定主題

如果您在應用程式上下文中定義了 KafkaAdmin bean,它可以自動將主題新增至 Broker。為此,您可以為每個主題在應用程式上下文中新增一個 NewTopic @Bean。版本 2.3 引入了一個新的類別 TopicBuilder,使建立此類 Bean 更加方便。以下範例示範如何操作

  • Java

  • Kotlin

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

從版本 2.6 開始,您可以省略 partitions() 和/或 replicas(),Broker 預設值將會套用至這些屬性。Broker 版本必須至少為 2.4.0 才能支援此功能 - 請參閱 KIP-464

  • Java

  • Kotlin

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

從版本 2.7 開始,您可以在單個 KafkaAdmin.NewTopics bean 定義中宣告多個 NewTopic

  • Java

  • Kotlin

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
當使用 Spring Boot 時,KafkaAdmin bean 會自動註冊,因此您只需要 NewTopic (和/或 NewTopics) @Bean

預設情況下,如果 Broker 不可用,則會記錄訊息,但上下文會繼續載入。您可以透過程式設計方式調用 admin 的 initialize() 方法稍後重試。如果您希望將此條件視為致命,請將 admin 的 fatalIfBrokerNotAvailable 屬性設定為 true。然後上下文將無法初始化。

如果 Broker 支援 (1.0.0 或更高版本),如果發現現有主題的分區少於 NewTopic.numPartitions,則 admin 會增加分區數量。

從版本 2.7 開始,KafkaAdmin 提供了在運行時建立和檢查主題的方法。

  • createOrModifyTopics

  • describeTopics

對於更進階的功能,您可以直接使用 AdminClient。以下範例示範如何操作

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

從版本 2.9.10、3.0.9 開始,您可以提供一個 Predicate<NewTopic>,可用於判斷是否應考慮建立或修改特定的 NewTopic bean。這很有用,例如,如果您有多個 KafkaAdmin 實例指向不同的叢集,並且您希望選擇每個 admin 應建立或修改的主題。

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));