測試應用程式
spring-kafka-test
jar 包含一些有用的工具,可協助您測試應用程式。
嵌入式 Kafka Broker
提供了兩種實作方式
-
EmbeddedKafkaZKBroker
- 傳統實作方式,啟動嵌入式Zookeeper
實例(使用EmbeddedKafka
時仍然是預設值)。 -
EmbeddedKafkaKraftBroker
- 在組合控制器和 Broker 模式中使用 Kraft 而不是 Zookeeper(自 3.1 起)。
有幾種設定 Broker 的技術,將在以下章節中討論。
KafkaTestUtils
org.springframework.kafka.test.utils.KafkaTestUtils
提供了許多靜態輔助方法,用於消費記錄、檢索各種記錄偏移量等。請參閱其 Javadocs 以取得完整詳細資訊。
JUnit
org.springframework.kafka.test.utils.KafkaTestUtils
也提供了一些靜態方法來設定 Producer 和 Consumer 屬性。以下列表顯示了這些方法的簽章
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
從 2.5 版開始, 使用嵌入式 Broker 時,最佳實務通常是為每個測試使用不同的主題,以防止串音。如果由於某些原因無法做到這一點,請注意 |
提供了 EmbeddedKafkaZKBroker
的 JUnit 4 @Rule
包裝器,用於建立嵌入式 Kafka 和嵌入式 Zookeeper 伺服器。(有關將 @EmbeddedKafka
與 JUnit 5 搭配使用的資訊,請參閱 @EmbeddedKafka 註解)。以下列表顯示了這些方法的簽章
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支援 EmbeddedKafkaKraftBroker 。 |
EmbeddedKafkaBroker
類別有一個實用方法,可讓您消費它建立的所有主題。以下範例示範如何使用它
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
KafkaTestUtils
有一些實用方法可以從 Consumer 擷取結果。以下列表顯示了這些方法的簽章
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下範例示範如何使用 KafkaTestUtils
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
當嵌入式 Kafka 和嵌入式 Zookeeper 伺服器由 EmbeddedKafkaBroker
啟動時,名為 spring.embedded.kafka.brokers
的系統屬性會設定為 Kafka Broker 的位址,而名為 spring.embedded.zookeeper.connect
的系統屬性會設定為 Zookeeper 的位址。為此屬性提供了方便的常數 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
和 EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
)。
Kafka Broker 的位址可以公開給任何任意且方便的屬性,而不是預設的 spring.embedded.kafka.brokers
系統屬性。為此,可以在啟動嵌入式 Kafka 之前設定 spring.embedded.kafka.brokers.property
(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY
) 系統屬性。例如,對於 Spring Boot,預期會設定 spring.kafka.bootstrap-servers
設定屬性以自動設定 Kafka 用戶端。因此,在使用隨機埠上的嵌入式 Kafka 執行測試之前,我們可以將 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
設定為系統屬性 - 並且 EmbeddedKafkaBroker
將使用它來公開其 Broker 位址。這現在是此屬性的預設值(從 3.0.10 版開始)。
使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
,您可以為 Kafka 伺服器提供其他屬性。有關可能的 Broker 屬性的更多資訊,請參閱 Kafka Config。
設定主題
以下範例設定會建立名為 cat
和 hat
的主題,各有五個分割區;一個名為 thing1
的主題,有 10 個分割區;以及一個名為 thing2
的主題,有 15 個分割區
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
預設情況下,當出現問題時(例如新增已存在的主題),addTopics
會擲回例外狀況。2.6 版新增了該方法的新版本,該版本傳回 Map<String, Exception>
;鍵是主題名稱,值在成功時為 null
,失敗時為 Exception
。
為多個測試類別使用相同的 Broker
您可以使用類似以下內容為多個測試類別使用相同的 Broker
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
這假設是 Spring Boot 環境,並且嵌入式 Broker 取代了 bootstrap servers 屬性。
然後,在每個測試類別中,您可以使用類似以下內容
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果您未使用 Spring Boot,則可以使用 broker.getBrokersAsString()
取得 bootstrap servers。
前面的範例沒有提供在所有測試完成後關閉 Broker 的機制。如果您在 Gradle daemon 中執行測試,這可能會是個問題。在這種情況下,您不應使用此技術,或者您應該在測試完成時使用某些東西來呼叫 EmbeddedKafkaBroker 上的 destroy() 。 |
從 3.0 版開始,框架為 JUnit Platform 公開了 GlobalEmbeddedKafkaTestExecutionListener
;預設情況下已停用。這需要 JUnit Platform 1.8 或更高版本。此監聽器的目的是為整個測試計畫啟動一個全域 EmbeddedKafkaBroker
,並在計畫結束時停止它。若要啟用此監聽器,並因此為專案中的所有測試擁有單一全域嵌入式 Kafka 叢集,則必須透過系統屬性或 JUnit Platform 設定將 spring.kafka.global.embedded.enabled
屬性設定為 true
。此外,還可以提供以下屬性
-
spring.kafka.embedded.count
- 要管理的 Kafka Broker 數量; -
spring.kafka.embedded.ports
- 每個要啟動的 Kafka Broker 的埠(逗號分隔值),如果偏好隨機埠,則為0
;值的數量必須等於上面提到的計數; -
spring.kafka.embedded.topics
- 要在啟動的 Kafka 叢集中建立的主題(逗號分隔值); -
spring.kafka.embedded.partitions
- 為建立的主題佈建的分割區數量; -
spring.kafka.embedded.broker.properties.location
- 其他 Kafka Broker 設定屬性的檔案位置;此屬性的值必須遵循 Spring 資源抽象模式; -
spring.kafka.embedded.kraft
- 預設為 false,如果為 true,則使用EmbeddedKafkaKraftBroker
而不是EmbeddedKafkaZKBroker
。
本質上,這些屬性模仿了 @EmbeddedKafka
的某些屬性。
有關設定屬性以及如何在 JUnit 5 User Guide 中提供它們的更多資訊。例如,spring.embedded.kafka.brokers.property=my.bootstrap-servers
項目可以新增至測試類別路徑中的 junit-platform.properties
檔案。從 3.0.10 版開始,Broker 預設會自動將其設定為 spring.kafka.bootstrap-servers
,以用於 Spring Boot 應用程式的測試。
建議不要在單一測試套件中組合全域嵌入式 Kafka 和每個測試類別。它們都共用相同的系統屬性,因此很可能會導致非預期的行為。 |
spring-kafka-test 對 junit-jupiter-api 和 junit-platform-launcher 具有傳遞相依性(後者用於支援全域嵌入式 Broker)。如果您希望使用嵌入式 Broker 並且未使用 JUnit,您可能希望排除這些相依性。 |
@EmbeddedKafka
註解
我們通常建議您將規則用作 @ClassRule
,以避免在測試之間啟動和停止 Broker(並為每個測試使用不同的主題)。從 2.0 版開始,如果您使用 Spring 的測試應用程式內容快取,您也可以宣告 EmbeddedKafkaBroker
Bean,以便可以在多個測試類別中使用單一 Broker。為了方便起見,我們提供了一個稱為 @EmbeddedKafka
的測試類別層級註解,以註冊 EmbeddedKafkaBroker
Bean。以下範例示範如何使用它
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
從 2.2.4 版開始,您也可以使用 @EmbeddedKafka
註解來指定 Kafka 埠屬性。
從 3.2 版開始,將 kraft
屬性設定為 true
以使用 EmbeddedKafkaKraftBroker
而不是 EmbeddedKafkaZKBroker
。
以下範例設定了 @EmbeddedKafka
的 topics
、brokerProperties
和 brokerPropertiesLocation
屬性,以支援屬性佔位符解析
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://127.0.0.1:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的範例中,屬性佔位符 ${kafka.topics.another-topic}
、${kafka.broker.logs-dir}
和 ${kafka.broker.port}
是從 Spring Environment 解析的。此外,Broker 屬性是從 brokerPropertiesLocation
指定的 broker.properties
類別路徑資源載入的。屬性佔位符會針對 brokerPropertiesLocation
URL 和資源中找到的任何屬性佔位符進行解析。brokerProperties
定義的屬性會覆寫 brokerPropertiesLocation
中找到的屬性。
您可以將 @EmbeddedKafka
註解與 JUnit 4 或 JUnit 5 搭配使用。
搭配 JUnit5 的 @EmbeddedKafka
註解
從 2.3 版開始,有兩種方法可以將 @EmbeddedKafka
註解與 JUnit5 搭配使用。當與 @SpringJunitConfig
註解搭配使用時,嵌入式 Broker 會新增至測試應用程式內容。您可以將 Broker 自動注入到您的測試中,在類別或方法層級,以取得 Broker 位址清單。
當不使用 Spring 測試內容時,EmbdeddedKafkaCondition
會建立 Broker;條件包含參數解析器,因此您可以在測試方法中存取 Broker。
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
除非以 @EmbeddedKafka
註解的類別也以 ExtendWith(SpringExtension.class)
註解(或 meta-annotated),否則將會建立獨立的 Broker(在 Spring 的 TestContext 之外)。@SpringJunitConfig
和 @SpringBootTest
都是如此 meta-annotated,並且當這些註解中的任何一個也存在時,將會使用基於內容的 Broker。
當有可用的 Spring 測試應用程式內容時,主題和 Broker 屬性可以包含屬性佔位符,只要屬性在某處定義,就會解析這些佔位符。如果沒有可用的 Spring 內容,則不會解析這些佔位符。 |
@SpringBootTest
註解中的嵌入式 Broker
Spring Initializr 現在會自動將 spring-kafka-test
相依性新增至專案設定中的測試範圍。
如果您的應用程式在
|
在 Spring Boot 應用程式測試中,有幾種方法可以使用嵌入式 Broker。
它們包括
JUnit4 類別規則
以下範例示範如何使用 JUnit4 類別規則來建立嵌入式 Broker
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
請注意,由於這是 Spring Boot 應用程式,因此我們覆寫了 Broker 清單屬性以設定 Spring Boot 的屬性。
搭配 @SpringJunitConfig
的 @EmbeddedKafka
將 @EmbeddedKafka
與 @SpringJUnitConfig
搭配使用時,建議在測試類別上使用 @DirtiesContext
。這是為了防止在測試套件中執行多個測試後,JVM 關閉期間可能發生的競爭條件。例如,如果不使用 @DirtiesContext
,EmbeddedKafkaBroker
可能會較早關閉,而應用程式內容仍然需要其中的資源。由於每個 EmbeddedKafka
測試執行都會建立自己的暫時目錄,因此當發生此競爭條件時,它會產生錯誤日誌訊息,指出它嘗試刪除或清理的檔案已不再可用。新增 @DirtiesContext
將確保應用程式內容在每次測試後都已清理,且未快取,使其不易受到像這樣的潛在資源競爭條件的影響。
@EmbeddedKafka
註解或 EmbeddedKafkaBroker
Bean
以下範例示範如何使用 @EmbeddedKafka
註解來建立嵌入式 Broker
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
從 3.0.10 版開始,bootstrapServersProperty 預設會自動設定為 spring.kafka.bootstrap-servers 。 |
Hamcrest Matchers
org.springframework.kafka.test.hamcrest.KafkaMatchers
提供了以下 matchers
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
AssertJ Conditions
您可以使用以下 AssertJ conditions
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
範例
以下範例將本章涵蓋的大部分主題整合在一起
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的範例使用 Hamcrest matchers。使用 AssertJ,最後一部分看起來像以下程式碼
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
Mock Consumer 和 Producer
kafka-clients
程式庫提供了 MockConsumer
和 MockProducer
類別,用於測試目的。
如果您希望在某些測試中將這些類別與監聽器容器或 KafkaTemplate
分別搭配使用,從 3.0.7 版開始,框架現在提供了 MockConsumerFactory
和 MockProducerFactory
實作。
這些工廠可以用於監聽器容器和範本中,而不是需要運作中(或嵌入式)Broker 的預設工廠。
這是一個傳回單一 Consumer 的簡單實作範例
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果您希望使用並行性進行測試,則工廠建構函式中的 Supplier
lambda 每次都需要建立一個新實例。
使用 MockProducerFactory
,有兩個建構函式;一個用於建立簡單工廠,另一個用於建立支援交易的工廠。
這裡是範例
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
請注意在第二種情況下,lambda 是一個 BiFunction<Boolean, String>
,其中如果呼叫者想要交易式 Producer,則第一個參數為 true;選用的第二個參數包含交易 ID。這可以是預設值(如建構函式中所提供),或者如果如此設定,則可以由 KafkaTransactionManager
(或 KafkaTemplate
用於本機交易)覆寫。提供交易 ID 是為了讓您在希望根據此值使用不同的 MockProducer 時使用。
如果您在多執行緒環境中使用 Producer,則 BiFunction
應傳回多個 Producer(可能使用 ThreadLocal 進行執行緒繫結)。
交易式 MockProducer 必須透過呼叫 initTransaction() 初始化交易。 |
使用 MockProducer 時,如果您不想在每次傳送後關閉 Producer,則您可以提供自訂 MockProducer 實作,以覆寫不會從超類別呼叫 close
方法的 close
方法。這對於測試在同一個 Producer 上驗證多個發布而不關閉它是很方便的。
這是一個範例
@Bean
MockProducer<String, String> mockProducer() {
return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
@Override
public void close() {
}
};
}
@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
return new MockProducerFactory<>(() -> mockProducer);
}