Spring 資料整合歷程簡史

Spring 在資料整合的旅程始於 Spring Integration。憑藉其程式設計模型,它為建構應用程式提供了連貫的開發者體驗,這些應用程式可以採用 企業整合模式 來連接外部系統,例如資料庫、訊息代理等等。

快轉到雲端時代,微服務已在企業環境中變得突出。Spring Boot 改變了開發人員建構應用程式的方式。透過 Spring 的程式設計模型和由 Spring Boot 處理的執行期責任,開發獨立、生產級的基於 Spring 的微服務變得無縫。

為了將其擴展到資料整合工作負載,Spring Integration 和 Spring Boot 被整合到一個新專案中。Spring Cloud Stream 應運而生。

透過 Spring Cloud Stream,開發人員可以

  • 隔離地建構、測試和部署以資料為中心的應用程式。

  • 應用現代微服務架構模式,包括透過訊息傳遞進行組合。

  • 透過以事件為中心的思維來解耦應用程式職責。事件可以代表在時間上發生的某件事,下游消費者應用程式可以在不知道事件的來源或生產者的身分的情況下對其做出反應。

  • 將業務邏輯移植到訊息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 依賴框架對常見用例的自動內容類型支援。擴展到不同的資料轉換類型是可能的。

  • 以及更多...

快速開始

即使在您深入了解任何細節之前,您也可以在不到 5 分鐘內嘗試 Spring Cloud Stream,只需按照這三個步驟的指南即可。

我們將向您展示如何建立一個 Spring Cloud Stream 應用程式,該應用程式接收來自您選擇的訊息傳遞中介軟體(稍後會詳細介紹)的訊息,並將接收到的訊息記錄到主控台。我們稱之為 LoggingConsumer。雖然不是很實用,但它為一些主要概念和抽象概念提供了很好的介紹,使您更容易理解本使用者指南的其餘部分。

這三個步驟如下

使用 Spring Initializr 建立範例應用程式

若要開始,請造訪 Spring Initializr。從那裡,您可以產生我們的 LoggingConsumer 應用程式。若要執行此操作

  1. Dependencies 區段中,開始輸入 stream。當「Cloud Stream」選項出現時,請選取它。

  2. 開始輸入 'kafka' 或 'rabbit'。

  3. 選取「Kafka」或「RabbitMQ」。

    基本上,您選擇您的應用程式繫結到的訊息傳遞中介軟體。我們建議使用您已安裝或感覺更舒適安裝和執行的那一個。此外,正如您從 Initializer 畫面中看到的那樣,您可以選擇其他一些選項。例如,您可以選擇 Gradle 作為您的建置工具,而不是 Maven(預設)。

  4. Artifact 欄位中,輸入 'logging-consumer'。

    Artifact 欄位的值會變成應用程式名稱。如果您為中介軟體選擇了 RabbitMQ,則您的 Spring Initializr 現在應如下所示

spring initializr
  1. 按一下 Generate Project 按鈕。

    這樣做會將產生專案的 zipped 版本下載到您的硬碟。

  2. 將檔案解壓縮到您要用作專案目錄的資料夾中。

我們鼓勵您探索 Spring Initializr 中提供的許多可能性。它可讓您建立許多不同種類的 Spring 應用程式。

將專案匯入到您的 IDE

現在您可以將專案匯入到您的 IDE 中。請記住,根據 IDE 的不同,您可能需要遵循特定的匯入程序。例如,根據專案的產生方式(Maven 或 Gradle),您可能需要遵循特定的匯入程序(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。

匯入後,專案不得有任何類型的錯誤。此外,src/main/java 應包含 com.example.loggingconsumer.LoggingConsumerApplication

從技術上講,此時您可以執行應用程式的主要類別。它已經是一個有效的 Spring Boot 應用程式。但是,它沒有執行任何操作,因此我們想要新增一些程式碼。

新增訊息處理常式、建置和執行

修改 com.example.loggingconsumer.LoggingConsumerApplication 類別,使其看起來如下所示

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如您從先前的清單中看到的那樣

  • 我們正在使用函數式程式設計模型(請參閱 [Spring Cloud Function 支援])將單一訊息處理常式定義為 Consumer

  • 我們依賴框架慣例將此處理常式繫結到 binder 公開的輸入目的地繫結。

這樣做還可以讓您看到框架的核心功能之一:它嘗試自動將傳入訊息的 payload 轉換為 Person 類型。

您現在有一個功能完整的 Spring Cloud Stream 應用程式,可以監聽訊息。從這裡開始,為了簡單起見,我們假設您在 步驟一 中選取了 RabbitMQ。假設您已安裝並執行 RabbitMQ,您可以透過在您的 IDE 中執行其 main 方法來啟動應用程式。

您應該看到以下輸出

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

前往 RabbitMQ 管理主控台或任何其他 RabbitMQ 用戶端,並將訊息傳送到 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分代表群組名稱,是產生的,因此在您的環境中必定會有所不同。為了更可預測,您可以透過設定 spring.cloud.stream.bindings.input.group=hello(或您喜歡的任何名稱)來使用顯式群組名稱。

訊息的內容應該是 Person 類別的 JSON 表示法,如下所示

{"name":"Sam Spade"}

然後,在您的主控台中,您應該會看到

Received: Sam Spade

您也可以建置您的應用程式並將其封裝到 boot jar 中(透過使用 ./mvnw clean install),並使用 java -jar 命令執行建置的 JAR。

現在您有一個可運作的(雖然非常基本)Spring Cloud Stream 應用程式。

Spring 運算式語言 (SpEL) 在串流資料的環境中

在本參考手冊中,您將遇到許多功能和範例,您可以在其中使用 Spring 運算式語言 (SpEL)。務必了解在使用它時的某些限制。

SpEL 可讓您存取目前的訊息以及您正在執行的應用程式環境。但是,務必了解 SpEL 可以看到什麼類型的資料,尤其是在傳入訊息的環境中。從代理程式,訊息以 byte[] 的形式到達。然後,binder 將其轉換為 Message<byte[]>,而您可以查看訊息的 payload 保持其原始形式。訊息的標頭為 <String, Object>,其中值通常是另一個基本類型或基本類型的集合/陣列,因此為 Object。這是因為 binder 不知道所需的輸入類型,因為它無法存取使用者程式碼(函數)。因此,binder 有效地傳遞了一個包含 payload 和一些可讀中繼資料的信封,形式為訊息標頭,就像郵件傳遞的信件一樣。這表示雖然可以存取訊息的 payload,但您只能以原始資料(即 byte[])的形式存取它。雖然開發人員很常要求能夠讓 SpEL 以具體類型(例如 Foo、Bar 等)存取 payload 物件的欄位,但您可以了解實現這一點有多困難甚至不可能。以下是一個範例來說明問題;假設您有一個路由運算式,可根據 payload 類型路由到不同的函數。此需求將暗示從 byte[] 到特定類型的 payload 轉換,然後應用 SpEL。但是,為了執行此轉換,我們需要知道要傳遞給轉換器的實際類型,而該類型來自我們不知道的函數簽章。解決此需求的更好方法是將類型資訊作為訊息標頭傳遞(例如,application/json;type=foo.bar.Baz)。您將獲得清晰可讀的字串值,可以在一年內存取和評估,並且易於閱讀 SpEL 運算式。

此外,將 payload 用於路由決策被認為是非常糟糕的做法,因為 payload 被認為是特權資料 - 僅供其最終接收者讀取的資料。再次,使用郵件傳遞類比,您不希望郵差打開您的信封並閱讀信件內容以做出一些傳遞決策。相同的概念適用於此處,尤其是在產生訊息時相對容易包含此類資訊的情況下。它強制執行與要透過網路傳輸的資料設計相關的某些程度的規範,以及哪些資料片段可以被視為公開的,哪些是特權的。