ZeroMQ 支援
Spring Integration 提供元件來支援應用程式中的 ZeroMQ 通訊。此實作基於 JeroMQ 程式庫的良好支援 Java API。所有元件都封裝 ZeroMQ Socket 生命周期,並在內部管理它們的執行緒,使與這些元件的互動無鎖定且執行緒安全。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.5"
ZeroMQ Proxy
ZeroMqProxy
是內建 ZMQ.proxy()
函數 的 Spring 友善包裝器。它封裝 Socket 生命周期和執行緒管理。此 Proxy 的用戶端仍然可以使用標準 ZeroMQ Socket 連線和互動 API。與標準 ZContext
一起,它需要其中一種廣為人知的 ZeroMQ Proxy 模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。這樣一來,一對適當的 ZeroMQ Socket 類型將用於 Proxy 的前端和後端。詳細資訊請參閱 ZeroMqProxy.Type
。
ZeroMqProxy
實作 SmartLifecycle
,以建立、綁定和設定 Socket,並從 Executor
(如果有的話)在專用執行緒中啟動 ZMQ.proxy()
。前端和後端 Socket 的綁定是透過 tcp://
協定在所有可用的網路介面上完成的,並提供連接埠。否則,它們會綁定到隨機連接埠,稍後可以透過各自的 getFrontendPort()
和 getBackendPort()
API 方法取得。
控制 Socket 作為 SocketType.PAIR
公開,在 "inproc://" + beanName + ".control"
位址上進行執行緒間傳輸;可以透過 getControlAddress()
取得。它應與來自另一個 SocketType.PAIR
Socket 的相同應用程式一起使用,以傳送 ZMQ.PROXY_TERMINATE
、ZMQ.PROXY_PAUSE
和/或 ZMQ.PROXY_RESUME
命令。當為其生命週期呼叫 stop()
時,ZeroMqProxy
會執行 ZMQ.PROXY_TERMINATE
命令,以終止 ZMQ.proxy()
迴圈並優雅地關閉所有綁定的 Socket。
setExposeCaptureSocket(boolean)
選項會導致此元件綁定額外的執行緒間 Socket 與 SocketType.PUB
,以擷取並發布前端和後端 Socket 之間的所有通訊,如同 ZMQ.proxy()
實作中所述。此 Socket 綁定到 "inproc://" + beanName + ".capture"
位址,並且不期望任何特定的訂閱進行篩選。
前端和後端 Socket 可以使用其他屬性進行自訂,例如讀取/寫入逾時或安全性。此自訂可透過分別的 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回呼取得。
可以將 ZeroMqProxy
作為簡單的 Bean 提供,如下所示
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有用戶端節點都應透過 tcp://
連接到此 Proxy 的主機,並使用其感興趣的相應連接埠。
ZeroMQ 訊息通道
ZeroMqChannel
是一個 SubscribableChannel
,它使用一對 ZeroMQ Socket 來連接發布者和訂閱者以進行訊息傳遞互動。它可以工作在 PUB/SUB 模式(預設為 PUSH/PULL);它也可以用作本機執行緒間通道(使用 PAIR
Socket)- 在這種情況下未提供 connectUrl
。在分散式模式下,它必須連接到外部管理的 ZeroMQ Proxy,在其中它可以與連接到同一 Proxy 的其他類似通道交換訊息。連線 URL 選項是標準的 ZeroMQ 連線字串,其中包含協定和主機,以及 ZeroMQ Proxy 的前端和後端 Socket 的一對連接埠(以冒號分隔)。為了方便起見,如果 ZeroMqProxy
實例與 Proxy 在同一個應用程式中設定,則可以使用 ZeroMqProxy
實例而不是連線字串來提供通道。
傳送和接收 Socket 都在其自己的專用執行緒中管理,使此通道具有並行性友善性。這樣,我們可以從不同的執行緒發布和使用 ZeroMqChannel
,而無需同步。
預設情況下,ZeroMqChannel
使用 EmbeddedJsonHeadersMessageMapper
,以使用 Jackson JSON 處理器將 Message
(包括標頭)從/序列化為 byte[]
。此邏輯可以透過 setMessageMapper(BytesMessageMapper)
進行設定。
傳送和接收 Socket 可以針對任何選項(讀取/寫入逾時、安全性等)進行自訂,透過各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回呼。
ZeroMqChannel
的內部邏輯基於透過 Project Reactor Flux
和 Mono
運算子的反應式串流。這提供了更輕鬆的執行緒控制,並允許無鎖定的並行發布和使用到/從通道。本機 PUB/SUB 邏輯實作為 Flux.publish()
運算子,以允許此通道的所有本機訂閱者接收相同的已發布訊息,如同 PUB
Socket 的分散式訂閱者一樣。
以下是 ZeroMqChannel
設定的簡單範例
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://127.0.0.1:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 輸入通道配接器
ZeroMqMessageProducer
是具有反應式語義的 MessageProducerSupport
實作。它不斷地以非阻塞方式從 ZeroMQ Socket 讀取資料,並將訊息發布到無限 Flux
,該 Flux
由 FluxMessageChannel
訂閱,或者如果輸出通道不是反應式的,則在 start()
方法中顯式訂閱。當 Socket 上未收到資料時,將套用 consumeDelay
(預設為 1 秒),然後再進行下一次讀取嘗試。
ZeroMqMessageProducer
僅支援 SocketType.PAIR
、SocketType.PULL
和 SocketType.SUB
。此元件可以連線到遠端 Socket 或使用提供的或隨機的連接埠綁定到 TCP 協定。在啟動此元件並綁定 ZeroMQ Socket 後,可以透過 getBoundPort()
取得實際的連接埠。Socket 選項(例如安全性或寫入逾時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回呼進行設定。
如果 receiveRaw
選項設定為 true
,則從 Socket 使用的 ZMsg
會按原樣在產生的 Message
的 Payload 中傳送:由下游流程來剖析和轉換 ZMsg
。否則,將使用 InboundMessageMapper
將使用的資料轉換為 Message
。如果接收到的 ZMsg
是多幀的,則第一幀將被視為此 ZeroMQ 訊息發布到的 ZeroMqHeaders.TOPIC
標頭。
如果 unwrapTopic
選項設定為 false
,則傳入訊息被認為由兩個幀組成:主題和 ZeroMQ 訊息。否則,預設情況下,ZMsg
被認為由三個幀組成:第一個幀包含主題,最後一個幀包含訊息,中間有一個空幀。
對於 SocketType.SUB
,ZeroMqMessageProducer
使用提供的 topics
選項進行訂閱;預設為訂閱所有主題。可以使用 subscribeToTopics()
和 unsubscribeFromTopics()
@ManagedOperation
s 在執行階段調整訂閱。
這是 ZeroMqMessageProducer
設定的範例
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 輸出通道配接器
ZeroMqMessageHandler
是一個 ReactiveMessageHandler
實作,用於將發布訊息產生到 ZeroMQ Socket 中。僅支援 SocketType.PAIR
、SocketType.PUSH
和 SocketType.PUB
。ZeroMqMessageHandler
僅支援連線 ZeroMQ Socket;不支援綁定。當使用 SocketType.PUB
時,如果 topicExpression
不為 Null,則會針對請求訊息評估 topicExpression
,以將主題幀注入到 ZeroMQ 訊息中。訂閱者端 (SocketType.SUB
) 必須先接收主題幀,然後才能剖析實際資料。
如果 wrapTopic
選項設定為 false
,則在注入的主題之後傳送 ZeroMQ 訊息幀(如果存在)。預設情況下,在主題和訊息之間傳送一個額外的空幀。
當請求訊息的 Payload 是 ZMsg
時,不會執行轉換或主題擷取:ZMsg
會按原樣傳送到 Socket 中,並且不會為了可能的進一步重複使用而銷毀。否則,將使用 OutboundMessageMapper<byte[]>
將請求訊息(或僅其 Payload)轉換為要發布的 ZeroMQ 幀。預設情況下,使用 ConfigurableCompositeMessageConverter
提供的 ConvertingBytesMessageMapper
。Socket 選項(例如安全性或寫入逾時)可以透過 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回呼進行設定。
這是 ZeroMqMessageHandler
設定的範例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://127.0.0.1:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支援
spring-integration-zeromq
透過 ZeroMq
工廠和上述元件的 IntegrationComponentSpec
實作,提供方便的 Java DSL Fluent API。
這是 ZeroMqChannel
的 Java DSL 範例
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://127.0.0.1:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的輸入通道配接器是
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://127.0.0.1:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的輸出通道配接器是
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://127.0.0.1:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}