使用應用程式事件

為了盡可能保持應用程式模組彼此解耦,它們之間的主要互動方式應該是事件的發布和消費。這樣可以避免原始模組了解所有潛在的利害關係人,這是啟用應用程式模組整合測試的關鍵方面(請參閱整合測試應用程式模組)。

我們經常會發現像這樣定義的應用程式組件

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法創建了功能上的重力,它會吸引相關的功能,以及與其他應用程式模組中定義的 Spring Bean 的互動。這尤其使組件更難測試,因為我們需要擁有那些依賴的 Bean 的實例,才能創建 OrderManagement 的實例(請參閱處理外向依賴)。這也意味著,每當我們想將更多功能與業務事件訂單完成整合時,我們都必須修改這個類別。

我們可以如下更改應用程式模組的互動方式

透過 Spring 的 ApplicationEventPublisher 發布應用程式事件
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

請注意,我們沒有依賴其他應用程式模組的 Spring Bean,而是使用 Spring 的 ApplicationEventPublisher 在我們完成主要聚合的狀態轉換後發布領域事件。若要採用更以聚合驅動的方法來發布事件,請參閱Spring Data 的應用程式事件發布機制以了解詳細資訊。由於事件發布預設以同步方式發生,因此整體安排的交易語義與上面的範例保持不變。這既有好處,因為我們獲得了一個非常簡單的一致性模型(訂單狀態的變更庫存更新要么都成功,要么都不成功),也有壞處,因為更多觸發的相關功能會擴大交易邊界,並可能導致整個交易失敗,即使導致錯誤的功能並非至關重要。

另一種處理方法是將事件消費移至交易提交時的非同步處理,並將次要功能視為那樣

非同步、交易式事件監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

現在,這有效地將原始交易與監聽器的執行解耦。雖然這避免了原始業務交易的擴展,但也產生了一個風險:如果監聽器因任何原因失敗,事件發布將會遺失,除非每個監聽器都實際實作了自己的安全網。更糟糕的是,即使這樣也不能完全奏效,因為系統可能在方法甚至被調用之前就失敗了。

應用程式模組監聽器

為了在交易本身中運行交易式事件監聽器,反過來也需要使用 @Transactional 註解它。

在交易本身中運行的非同步、交易式事件監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

為了簡化聲明應該描述透過事件整合模組的預設方式,Spring Modulith 提供了 @ApplicationModuleListener 作為快捷方式。

應用程式模組監聽器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

事件發布註冊表

Spring Modulith 附帶一個事件發布註冊表,它掛鉤到 Spring Framework 的核心事件發布機制中。在事件發布時,它會找出將接收事件的交易式事件監聽器,並將每個監聽器的條目(深藍色)寫入事件發布日誌,作為原始業務交易的一部分。

event publication registry start
圖 1. 執行前的交易式事件監聽器安排

每個交易式事件監聽器都被包裝在一個方面中,如果監聽器執行成功,則將該日誌條目標記為已完成。如果監聽器失敗,則日誌條目保持未觸動狀態,以便可以根據應用程式的需求部署重試機制。可以透過 spring.modulith.republish-outstanding-events-on-restart 屬性啟用事件的自動重新發布。

event publication registry end
圖 2. 執行後的交易式事件監聽器安排

Spring Boot 事件註冊表啟動器

使用交易式事件發布日誌需要添加到應用程式中的工件組合。為了簡化此任務,Spring Modulith 提供了啟動器 POM,它們以要使用的持久性技術為中心,並預設為基於 Jackson 的 EventSerializer 實作。以下啟動器可用

持久性技術 工件 描述

JPA

spring-modulith-starter-jpa

使用 JPA 作為持久性技術。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作為持久性技術。也適用於基於 JPA 的應用程式,但繞過 JPA 提供者進行實際的事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 MongoDB 作為持久性技術。也啟用 MongoDB 交易,並且需要伺服器的副本集設定才能互動。可以透過將 spring.modulith.events.mongobd.transaction-management.enabled 屬性設定為 false 來停用交易自動配置。

Neo4j

spring-modulith-starter-neo4j

在 Spring Data Neo4j 後面使用 Neo4j。

管理事件發布

在應用程式的運行時期間,事件發布可能需要以各種方式進行管理。不完整的發布可能需要在給定的時間後重新提交給相應的監聽器。另一方面,已完成的發布很可能需要從資料庫中清除或移動到封存儲存區。由於這種類型的內務處理需求因應用程式而異,因此 Spring Modulith 提供了一個 API 來處理這兩種發布類型。該 API 可透過您可以添加到應用程式中的 spring-modulith-events-api 工件獲得

使用 Spring Modulith Events API 工件
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.2.5</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:1.2.5'
}

此工件包含兩個主要的抽象,應用程式程式碼可以將它們作為 Spring Bean 使用

  • CompletedEventPublications — 此介面允許存取所有已完成的事件發布,並提供一個 API 以立即從資料庫中清除所有這些發布,或清除早於給定持續時間(例如,1 分鐘)的已完成發布。

  • IncompleteEventPublications — 此介面允許存取所有未完成的事件發布,以重新提交符合給定謂詞的發布,或重新提交早於相對於原始發布日期的給定 Duration 的發布。

事件發布儲存庫

為了實際寫入事件發布日誌,Spring Modulith 公開了一個 EventPublicationRepository SPI 和流行的持久性技術(例如 JPA、JDBC 和 MongoDB)的實作,這些技術支援交易。您可以透過將相應的 JAR 添加到 Spring Modulith 應用程式來選擇要使用的持久性技術。我們準備了專用的啟動器來簡化此任務。

當相應的配置屬性 (spring.modulith.events.jdbc.schema-initialization.enabled) 設定為 true 時,基於 JDBC 的實作可以為事件發布日誌建立專用表。有關詳細資訊,請參閱附錄中的架構概觀

事件序列化器

每個日誌條目都包含序列化格式的原始事件。spring-modulith-events-core 中包含的 EventSerializer 抽象允許插入不同的策略,以將事件實例轉換為適合資料儲存區的格式。Spring Modulith 通過 spring-modulith-events-jackson 工件提供基於 Jackson 的 JSON 實作,該工件預設通過標準 Spring Boot 自動配置註冊一個 JacksonEventSerializer,它使用 ObjectMapper

自訂事件發布日期

預設情況下,事件發布註冊表將使用 Clock.systemUTC() 返回的日期作為事件發布日期。如果您想自訂此日期,請在應用程式上下文中註冊一個時鐘類型的 Bean

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

外部化事件

應用程式模組之間交換的某些事件可能對外部系統感興趣。Spring Modulith 允許將選定的事件發布到各種消息代理。若要使用該支援,您需要採取以下步驟

  1. 特定於代理的 Spring Modulith 工件添加到您的專案中。

  2. 通過使用 Spring Modulith 或 jMolecules 的 @Externalized 註解來選擇要外部化的事件類型。

  3. 在註解的值中指定特定於代理的路由目標。

要了解如何使用其他方式選擇要外部化的事件,或自訂它們在代理中的路由,請查看事件外部化的基本原理

支援的基礎架構

代理 工件 描述

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 與代理互動。邏輯路由鍵將用作 Kafka 的主題和消息鍵。

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 與任何兼容的代理互動。例如,需要為 Spring Rabbit 顯式聲明依賴項。邏輯路由鍵將用作 AMQP 路由鍵。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支援。不支援路由鍵。

SQS

spring-modulith-events-aws-sqs

使用 Spring Cloud AWS SQS 支援。邏輯路由鍵將用作 SQS 消息組 ID。當設定路由鍵時,需要將 SQS 佇列配置為 FIFO 佇列。

SNS

spring-modulith-events-aws-sns

使用 Spring Cloud AWS SNS 支援。邏輯路由鍵將用作 SNS 消息組 ID。當設定路由鍵時,需要將 SNS 配置為啟用基於內容重複數據刪除的 FIFO 主題。

事件外部化的基本原理

事件外部化對每個發布的應用程式事件執行三個步驟。

  1. 確定事件是否應該被外部化 — 我們將其稱為「事件選擇」。預設情況下,只有位於 Spring Boot 自動配置套件中並使用其中一個受支援的 @Externalized 註解註解的事件類型才被選中進行外部化。

  2. 映射事件(可選) — 預設情況下,事件使用應用程式中存在的 Jackson ObjectMapper 序列化為 JSON,並按原樣發布。映射步驟允許開發人員自訂表示形式,甚至完全將原始事件替換為適合外部合作夥伴的表示形式。請注意,映射步驟先於要發布物件的實際序列化。

  3. 確定路由目標 — 消息代理客戶端需要一個邏輯目標來發布消息。目標通常識別物理基礎架構(主題、交換或佇列,具體取決於代理),並且通常從事件類型靜態派生。除非在 @Externalized 註解中明確定義,否則 Spring Modulith 使用應用程式本地類型名稱作為目標。換句話說,在基礎套件為 com.acme.app 的 Spring Boot 應用程式中,事件類型 com.acme.app.sample.SampleEvent 將發布到 sample.SampleEvent

    某些代理也允許定義相當動態的路由鍵,該路由鍵用於實際目標中的不同目的。預設情況下,不使用路由鍵。

基於註解的事件外部化配置

若要透過 @Externalized 註解定義自訂路由鍵,可以使用 $target::$key 模式用於每個特定註解中可用的目標/值屬性。鍵可以是 SpEL 表達式,它將事件實例配置為根物件。

透過 SpEL 表達式定義動態路由鍵
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件透過存取器方法公開客戶的姓氏。然後,在目標宣告的 :: 分隔符之後的鍵表達式中使用該方法 #this.getLastname()

如果鍵計算變得更加複雜,建議將其委託給將事件作為參數的 Spring Bean

調用 Spring Bean 以計算路由鍵
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

程式化的事件外部化配置

spring-modulith-events-api 工件包含 EventExternalizationConfiguration,允許開發人員自訂上述所有步驟。

以程式方式配置事件外部化
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, event -> …)                                  (3)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (4)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class.java) { event -> … }                           (3)
      .routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey)         (4)
      .build()
  }
}
1 我們首先建立 EventExternalizationConfiguration 的預設實例。
2 我們透過調用先前調用返回的 Selector 實例上的 select(…) 方法之一來自訂事件選擇。此步驟從根本上禁用了應用程式基礎套件篩選器,因為我們現在只查找註解。方便的方法可以輕鬆地按類型、按套件、套件和註解選擇事件。此外,還有一個快捷方式可以在一個步驟中定義選擇和路由。
3 我們為 SomeEvent 實例定義一個映射步驟。請注意,除非您額外調用路由器上的 ….routeMapped(),否則路由仍將由原始事件實例確定。
4 我們最終透過定義方法句柄以提取事件實例的值來確定路由鍵。或者,可以透過使用先前調用返回的 Router 實例上的通用 route(…) 方法,為個別事件產生完整的 RoutingKey

測試已發布的事件

以下章節描述了一種僅專注於追蹤 Spring 應用程式事件的測試方法。若要對使用 @ApplicationModuleListener 的模組進行更全面的測試,請查看 Scenario API

Spring Modulith 的 @ApplicationModuleTest 啟用了將 PublishedEvents 實例注入到測試方法中的能力,以驗證在測試的業務操作過程中是否已發布一組特定的事件。

應用程式模組安排的基於事件的整合測試
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    val matchingMapped = events.ofType(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

請注意,PublishedEvents 如何公開一個 API 來選擇符合特定標準的事件。驗證以 AssertJ 斷言結束,該斷言驗證預期的元素數量。如果您無論如何都在使用 AssertJ 進行這些斷言,您也可以使用 AssertablePublishedEvents 作為測試方法參數類型,並使用透過它提供的流暢斷言 API。

使用 AssertablePublishedEvents 驗證事件發布
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(AssertablePublishedEvents events) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

請注意,assertThat(…) 表達式返回的類型允許直接在已發布的事件上定義約束。