使用 RabbitMQ Binder 進行分割

RabbitMQ 本身不支援分割。

有時,將資料發送到特定分割區會很有利 — 例如,當您想要嚴格排序訊息處理時,特定客戶的所有訊息都應發送到相同的分割區。

RabbitMessageChannelBinder 透過將每個分割區的佇列繫結到目的地交換器來提供分割。

以下 Java 和 YAML 範例示範如何組態生產者

生產者
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

先前範例中的組態使用預設分割 (key.hashCode() % partitionCount)。這可能無法提供適當平衡的演算法,取決於金鑰值。您可以使用 partitionSelectorExpressionpartitionSelectorClass 屬性來覆寫此預設值。

只有在您需要在部署生產者時佈建消費者佇列,才需要 required-groups 屬性。否則,發送到分割區的任何訊息都會遺失,直到部署對應的消費者為止。

以下組態佈建主題交換器

part exchange

以下佇列繫結到該交換器

part queues

以下繫結將佇列關聯到交換器

part bindings

以下 Java 和 YAML 範例延續先前的範例,並示範如何組態消費者

消費者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
RabbitMessageChannelBinder 不支援動態擴展。每個分割區必須至少有一個消費者。消費者的 instanceIndex 用於指示消費哪個分割區。諸如 Cloud Foundry 之類的平台只能有一個具有 instanceIndex 的實例。