使用 Kafka 綁定器進行分區
Apache Kafka 原生支援主題分區。
有時將資料發送到特定分區是有利的,例如,當您想要嚴格排序訊息處理時 (特定客戶的所有訊息都應發送到同一個分區)。
以下範例示範如何設定生產者和消費者端
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
重要的是要記住,由於 Apache Kafka 原生支援分區,因此除非您像範例中那樣使用自訂分區金鑰,或使用涉及酬載本身的表達式,否則無需依賴如上所述的綁定器分區。否則,綁定器提供的分區選擇適用於不支援原生分區的中介軟體技術。請注意,在上述範例中,我們正在使用名為 partitionKey 的自訂金鑰,這將是分區的決定因素,因此在這種情況下,使用綁定器分區是適當的。當使用原生 Kafka 分區時,即當您不提供 partition-key-expression 時,Apache Kafka 將選擇一個分區,預設情況下,該分區將是記錄金鑰在可用分區數上的雜湊值。若要將金鑰新增至輸出記錄,請在 spring-messaging Message<?> 中將 KafkaHeaders.KEY 標頭設定為所需的金鑰值。預設情況下,當未提供記錄金鑰時,Apache Kafka 將根據 Apache Kafka 文件 中描述的邏輯選擇分區。 |
主題必須佈建足夠的分區,才能為所有消費者群組實現所需的並行性。上述設定最多支援 12 個消費者實例 (如果其 concurrency 為 2,則為 6 個;如果其 concurrency 為 3,則為 4 個,依此類推)。通常最好「過度佈建」分區,以便未來增加消費者或並行性。 |
前述設定使用預設分區 ( `key.hashCode() % partitionCount` )。根據金鑰值,這可能無法提供適當平衡的演算法。特別注意,此分區策略與獨立 Kafka 生產者 (例如 Kafka Streams 使用的生產者) 使用的預設值不同,這表示當這些用戶端產生時,相同的金鑰值在不同分區之間可能會以不同的方式平衡。您可以使用 `partitionSelectorExpression` 或 `partitionSelectorClass` 屬性覆寫此預設值。 |
由於分區由 Kafka 原生處理,因此消費者端無需特殊設定。Kafka 會在實例之間分配分區。
Kafka 主題的 partitionCount 可能會在執行階段變更 (例如,由於管理任務)。之後計算出的分區將會有所不同 (例如,屆時將使用新的分區)。自 Spring Cloud Stream 4.0.3 起,將支援分區計數的執行階段變更。另請參閱參數 'spring.kafka.producer.properties.metadata.max.age.ms' 以設定更新間隔。由於某些限制,無法使用引用訊息 'payload' 的 'partition-key-expression',在這種情況下機制將會停用。整體行為預設為停用,並且可以使用設定參數 'producer.dynamicPartitionUpdatesEnabled=true' 啟用。 |
以下 Spring Boot 應用程式會監聽 Kafka 串流,並將每個訊息前往的分區 ID 列印 (到主控台)
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根據需要新增實例。Kafka 會重新平衡分區分配。如果實例計數 (或 `instance count * concurrency` ) 超過分區數,則某些消費者會處於閒置狀態。