測試應用程式

spring-kafka-test jar 包含一些有用的工具,可協助您測試應用程式。

嵌入式 Kafka Broker

提供了兩種實作方式

  • EmbeddedKafkaZKBroker - 傳統實作方式,啟動嵌入式 Zookeeper 實例(使用 EmbeddedKafka 時仍然是預設值)。

  • EmbeddedKafkaKraftBroker - 在組合控制器和 Broker 模式中使用 Kraft 而不是 Zookeeper(自 3.1 起)。

有幾種設定 Broker 的技術,將在以下章節中討論。

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils 提供了許多靜態輔助方法,用於消費記錄、檢索各種記錄偏移量等。請參閱其 Javadocs 以取得完整詳細資訊。

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 也提供了一些靜態方法來設定 Producer 和 Consumer 屬性。以下列表顯示了這些方法的簽章

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

從 2.5 版開始,consumerProps 方法將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 設定為 earliest。這是因為在大多數情況下,您希望 Consumer 消費測試案例中發送的任何訊息。ConsumerConfig 預設值為 latest,這表示在 Consumer 啟動之前,測試已發送的訊息將不會收到這些記錄。若要恢復先前的行為,請在呼叫方法後將屬性設定為 latest

使用嵌入式 Broker 時,最佳實務通常是為每個測試使用不同的主題,以防止串音。如果由於某些原因無法做到這一點,請注意 consumeFromEmbeddedTopics 方法的預設行為是在分配後將分配的分區搜尋到開頭。由於它無法存取 Consumer 屬性,因此您必須使用接受 seekToEnd boolean 參數的重載方法,以搜尋到結尾而不是開頭。

提供了 EmbeddedKafkaZKBroker 的 JUnit 4 @Rule 包裝器,用於建立嵌入式 Kafka 和嵌入式 Zookeeper 伺服器。(有關將 @EmbeddedKafka 與 JUnit 5 搭配使用的資訊,請參閱 @EmbeddedKafka 註解)。以下列表顯示了這些方法的簽章

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支援 EmbeddedKafkaKraftBroker

EmbeddedKafkaBroker 類別有一個實用方法,可讓您消費它建立的所有主題。以下範例示範如何使用它

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些實用方法可以從 Consumer 擷取結果。以下列表顯示了這些方法的簽章

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下範例示範如何使用 KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

當嵌入式 Kafka 和嵌入式 Zookeeper 伺服器由 EmbeddedKafkaBroker 啟動時,名為 spring.embedded.kafka.brokers 的系統屬性會設定為 Kafka Broker 的位址,而名為 spring.embedded.zookeeper.connect 的系統屬性會設定為 Zookeeper 的位址。為此屬性提供了方便的常數 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

Kafka Broker 的位址可以公開給任何任意且方便的屬性,而不是預設的 spring.embedded.kafka.brokers 系統屬性。為此,可以在啟動嵌入式 Kafka 之前設定 spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) 系統屬性。例如,對於 Spring Boot,預期會設定 spring.kafka.bootstrap-servers 設定屬性以自動設定 Kafka 用戶端。因此,在使用隨機埠上的嵌入式 Kafka 執行測試之前,我們可以將 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 設定為系統屬性 - 並且 EmbeddedKafkaBroker 將使用它來公開其 Broker 位址。這現在是此屬性的預設值(從 3.0.10 版開始)。

使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以為 Kafka 伺服器提供其他屬性。有關可能的 Broker 屬性的更多資訊,請參閱 Kafka Config

設定主題

以下範例設定會建立名為 cathat 的主題,各有五個分割區;一個名為 thing1 的主題,有 10 個分割區;以及一個名為 thing2 的主題,有 15 個分割區

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

預設情況下,當出現問題時(例如新增已存在的主題),addTopics 會擲回例外狀況。2.6 版新增了該方法的新版本,該版本傳回 Map<String, Exception>;鍵是主題名稱,值在成功時為 null,失敗時為 Exception

為多個測試類別使用相同的 Broker

您可以使用類似以下內容為多個測試類別使用相同的 Broker

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

這假設是 Spring Boot 環境,並且嵌入式 Broker 取代了 bootstrap servers 屬性。

然後,在每個測試類別中,您可以使用類似以下內容

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您未使用 Spring Boot,則可以使用 broker.getBrokersAsString() 取得 bootstrap servers。

前面的範例沒有提供在所有測試完成後關閉 Broker 的機制。如果您在 Gradle daemon 中執行測試,這可能會是個問題。在這種情況下,您不應使用此技術,或者您應該在測試完成時使用某些東西來呼叫 EmbeddedKafkaBroker 上的 destroy()

從 3.0 版開始,框架為 JUnit Platform 公開了 GlobalEmbeddedKafkaTestExecutionListener;預設情況下已停用。這需要 JUnit Platform 1.8 或更高版本。此監聽器的目的是為整個測試計畫啟動一個全域 EmbeddedKafkaBroker,並在計畫結束時停止它。若要啟用此監聽器,並因此為專案中的所有測試擁有單一全域嵌入式 Kafka 叢集,則必須透過系統屬性或 JUnit Platform 設定將 spring.kafka.global.embedded.enabled 屬性設定為 true。此外,還可以提供以下屬性

  • spring.kafka.embedded.count - 要管理的 Kafka Broker 數量;

  • spring.kafka.embedded.ports - 每個要啟動的 Kafka Broker 的埠(逗號分隔值),如果偏好隨機埠,則為 0;值的數量必須等於上面提到的計數;

  • spring.kafka.embedded.topics - 要在啟動的 Kafka 叢集中建立的主題(逗號分隔值);

  • spring.kafka.embedded.partitions - 為建立的主題佈建的分割區數量;

  • spring.kafka.embedded.broker.properties.location - 其他 Kafka Broker 設定屬性的檔案位置;此屬性的值必須遵循 Spring 資源抽象模式;

  • spring.kafka.embedded.kraft - 預設為 false,如果為 true,則使用 EmbeddedKafkaKraftBroker 而不是 EmbeddedKafkaZKBroker

本質上,這些屬性模仿了 @EmbeddedKafka 的某些屬性。

有關設定屬性以及如何在 JUnit 5 User Guide 中提供它們的更多資訊。例如,spring.embedded.kafka.brokers.property=my.bootstrap-servers 項目可以新增至測試類別路徑中的 junit-platform.properties 檔案。從 3.0.10 版開始,Broker 預設會自動將其設定為 spring.kafka.bootstrap-servers,以用於 Spring Boot 應用程式的測試。

建議不要在單一測試套件中組合全域嵌入式 Kafka 和每個測試類別。它們都共用相同的系統屬性,因此很可能會導致非預期的行為。
spring-kafka-testjunit-jupiter-apijunit-platform-launcher 具有傳遞相依性(後者用於支援全域嵌入式 Broker)。如果您希望使用嵌入式 Broker 並且未使用 JUnit,您可能希望排除這些相依性。

@EmbeddedKafka 註解

我們通常建議您將規則用作 @ClassRule,以避免在測試之間啟動和停止 Broker(並為每個測試使用不同的主題)。從 2.0 版開始,如果您使用 Spring 的測試應用程式內容快取,您也可以宣告 EmbeddedKafkaBroker Bean,以便可以在多個測試類別中使用單一 Broker。為了方便起見,我們提供了一個稱為 @EmbeddedKafka 的測試類別層級註解,以註冊 EmbeddedKafkaBroker Bean。以下範例示範如何使用它

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

從 2.2.4 版開始,您也可以使用 @EmbeddedKafka 註解來指定 Kafka 埠屬性。

從 3.2 版開始,將 kraft 屬性設定為 true 以使用 EmbeddedKafkaKraftBroker 而不是 EmbeddedKafkaZKBroker

以下範例設定了 @EmbeddedKafkatopicsbrokerPropertiesbrokerPropertiesLocation 屬性,以支援屬性佔位符解析

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://127.0.0.1:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的範例中,屬性佔位符 ${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port} 是從 Spring Environment 解析的。此外,Broker 屬性是從 brokerPropertiesLocation 指定的 broker.properties 類別路徑資源載入的。屬性佔位符會針對 brokerPropertiesLocation URL 和資源中找到的任何屬性佔位符進行解析。brokerProperties 定義的屬性會覆寫 brokerPropertiesLocation 中找到的屬性。

您可以將 @EmbeddedKafka 註解與 JUnit 4 或 JUnit 5 搭配使用。

搭配 JUnit5 的 @EmbeddedKafka 註解

從 2.3 版開始,有兩種方法可以將 @EmbeddedKafka 註解與 JUnit5 搭配使用。當與 @SpringJunitConfig 註解搭配使用時,嵌入式 Broker 會新增至測試應用程式內容。您可以將 Broker 自動注入到您的測試中,在類別或方法層級,以取得 Broker 位址清單。

當不使用 Spring 測試內容時,EmbdeddedKafkaCondition 會建立 Broker;條件包含參數解析器,因此您可以在測試方法中存取 Broker。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

除非以 @EmbeddedKafka 註解的類別也以 ExtendWith(SpringExtension.class) 註解(或 meta-annotated),否則將會建立獨立的 Broker(在 Spring 的 TestContext 之外)。@SpringJunitConfig@SpringBootTest 都是如此 meta-annotated,並且當這些註解中的任何一個也存在時,將會使用基於內容的 Broker。

當有可用的 Spring 測試應用程式內容時,主題和 Broker 屬性可以包含屬性佔位符,只要屬性在某處定義,就會解析這些佔位符。如果沒有可用的 Spring 內容,則不會解析這些佔位符。

@SpringBootTest 註解中的嵌入式 Broker

Spring Initializr 現在會自動將 spring-kafka-test 相依性新增至專案設定中的測試範圍。

如果您的應用程式在 spring-cloud-stream 中使用 Kafka binder,並且您想要為測試使用嵌入式 Broker,則必須移除 spring-cloud-stream-test-support 相依性,因為它會將真實的 binder 替換為測試案例的測試 binder。如果您希望某些測試使用測試 binder,而某些測試使用嵌入式 Broker,則使用真實 binder 的測試需要透過排除測試類別中的 binder 自動設定來停用測試 binder。以下範例示範如何執行此操作

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

在 Spring Boot 應用程式測試中,有幾種方法可以使用嵌入式 Broker。

它們包括

JUnit4 類別規則

以下範例示範如何使用 JUnit4 類別規則來建立嵌入式 Broker

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

請注意,由於這是 Spring Boot 應用程式,因此我們覆寫了 Broker 清單屬性以設定 Spring Boot 的屬性。

搭配 @SpringJunitConfig@EmbeddedKafka

@EmbeddedKafka@SpringJUnitConfig 搭配使用時,建議在測試類別上使用 @DirtiesContext。這是為了防止在測試套件中執行多個測試後,JVM 關閉期間可能發生的競爭條件。例如,如果不使用 @DirtiesContextEmbeddedKafkaBroker 可能會較早關閉,而應用程式內容仍然需要其中的資源。由於每個 EmbeddedKafka 測試執行都會建立自己的暫時目錄,因此當發生此競爭條件時,它會產生錯誤日誌訊息,指出它嘗試刪除或清理的檔案已不再可用。新增 @DirtiesContext 將確保應用程式內容在每次測試後都已清理,且未快取,使其不易受到像這樣的潛在資源競爭條件的影響。

@EmbeddedKafka 註解或 EmbeddedKafkaBroker Bean

以下範例示範如何使用 @EmbeddedKafka 註解來建立嵌入式 Broker

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
從 3.0.10 版開始,bootstrapServersProperty 預設會自動設定為 spring.kafka.bootstrap-servers

Hamcrest Matchers

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供了以下 matchers

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ Conditions

您可以使用以下 AssertJ conditions

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

範例

以下範例將本章涵蓋的大部分主題整合在一起

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的範例使用 Hamcrest matchers。使用 AssertJ,最後一部分看起來像以下程式碼

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock Consumer 和 Producer

kafka-clients 程式庫提供了 MockConsumerMockProducer 類別,用於測試目的。

如果您希望在某些測試中將這些類別與監聽器容器或 KafkaTemplate 分別搭配使用,從 3.0.7 版開始,框架現在提供了 MockConsumerFactoryMockProducerFactory 實作。

這些工廠可以用於監聽器容器和範本中,而不是需要運作中(或嵌入式)Broker 的預設工廠。

這是一個傳回單一 Consumer 的簡單實作範例

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望使用並行性進行測試,則工廠建構函式中的 Supplier lambda 每次都需要建立一個新實例。

使用 MockProducerFactory,有兩個建構函式;一個用於建立簡單工廠,另一個用於建立支援交易的工廠。

這裡是範例

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

請注意在第二種情況下,lambda 是一個 BiFunction<Boolean, String>,其中如果呼叫者想要交易式 Producer,則第一個參數為 true;選用的第二個參數包含交易 ID。這可以是預設值(如建構函式中所提供),或者如果如此設定,則可以由 KafkaTransactionManager(或 KafkaTemplate 用於本機交易)覆寫。提供交易 ID 是為了讓您在希望根據此值使用不同的 MockProducer 時使用。

如果您在多執行緒環境中使用 Producer,則 BiFunction 應傳回多個 Producer(可能使用 ThreadLocal 進行執行緒繫結)。

交易式 MockProducer 必須透過呼叫 initTransaction() 初始化交易。

使用 MockProducer 時,如果您不想在每次傳送後關閉 Producer,則您可以提供自訂 MockProducer 實作,以覆寫不會從超類別呼叫 close 方法的 close 方法。這對於測試在同一個 Producer 上驗證多個發布而不關閉它是很方便的。

這是一個範例

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}