MQTT 支援
Spring Integration 提供輸入和輸出通道配接器,以支援訊息佇列遙測傳輸 (MQTT) 協定。
您需要將此依賴項包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.5"
目前的實作使用 Eclipse Paho MQTT Client 程式庫。
XML 設定和本章的大部分內容是關於 MQTT v3.1 協定支援和相關的 Paho Client。請參閱 MQTT v5 支援 段落以了解相關的協定支援。 |
兩個配接器的設定都是透過使用 DefaultMqttPahoClientFactory
來完成。有關設定選項的更多資訊,請參閱 Paho 文件。
我們建議設定一個 MqttConnectOptions 物件,並將其注入到工廠中,而不是在工廠本身上設定(已棄用的)選項。 |
輸入 (訊息驅動) 通道配接器
輸入通道配接器由 MqttPahoMessageDrivenChannelAdapter
實作。為了方便起見,您可以使用命名空間來設定它。最小的設定可能如下所示
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下列表顯示了可用的屬性
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://127.0.0.1:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
1 | 用戶端 ID。 |
2 | Broker URL。 |
3 | 以此配接器接收訊息的主題的逗號分隔列表。 |
4 | QoS 值的逗號分隔列表。它可以是應用於所有主題的單個值,也可以是每個主題的值(在這種情況下,列表的長度必須相同)。 |
5 | MqttMessageConverter (可選)。預設情況下,預設的 DefaultPahoMessageConverter 會產生具有 String Payload 的訊息,以及以下標頭
|
6 | 用戶端工廠。 |
7 | send() 超時。它僅在通道可能阻塞時適用(例如,目前已滿的有界 QueueChannel )。 |
8 | 錯誤通道。下游異常會以 ErrorMessage 的形式傳送到此通道(如果已提供)。Payload 是 MessagingException ,其中包含失敗的訊息和原因。 |
9 | 恢復間隔。它控制配接器在失敗後嘗試重新連線的間隔。預設值為 10000ms (十秒)。 |
10 | 確認模式;設定為 true 以進行手動確認。 |
從 4.1 版開始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactory 的 serverURIs 屬性中提供伺服器 URI。這樣做可以實現,例如,連線到高可用性 (HA) 叢集。 |
從 4.2.2 版開始,當配接器成功訂閱主題時,會發布 MqttSubscribedEvent
。當連線或訂閱失敗時,會發布 MqttConnectionFailedEvent
事件。這些事件可以由實作 ApplicationListener
的 Bean 接收。
此外,一個名為 recoveryInterval
的新屬性控制配接器在失敗後嘗試重新連線的間隔。預設值為 10000ms
(十秒)。
在 4.2.3 版之前,當配接器停止時,用戶端始終取消訂閱。這是錯誤的,因為如果用戶端 QOS 大於 0,我們需要保持訂閱處於活動狀態,以便在配接器停止時到達的訊息在下次啟動時傳遞。這也需要將用戶端工廠上的 從 4.2.3 版開始,如果 此行為可以透過設定工廠上的 若要還原為 4.2.3 之前的行為,請使用 |
從 5.0 版開始, |
在執行階段新增和移除主題
從 4.1 版開始,您可以透過程式設計方式變更配接器訂閱的主題。Spring Integration 提供 addTopic()
和 removeTopic()
方法。新增主題時,您可以選擇性地指定 QoS
(預設值:1)。您也可以透過將適當的訊息傳送到具有適當 Payload 的 <control-bus/>
來修改主題,例如:"myMqttAdapter.addTopic('foo', 1)"
。
停止和啟動配接器對主題列表沒有影響(它不會還原為設定中的原始設定)。變更不會超出應用程式內容的生命週期。新的應用程式內容會還原為設定的設定。
當配接器停止(或與 Broker 斷線)時變更主題,會在下次建立連線時生效。
手動確認
從 5.3 版開始,您可以將 manualAcks
屬性設定為 true。通常用於非同步確認傳遞。當設定為 true
時,標頭 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
) 會新增到訊息中,其值為 SimpleAcknowledgment
。您必須調用 acknowledge()
方法才能完成傳遞。有關更多資訊,請參閱 IMqttClient
setManualAcks()
和 messageArrivedComplete()
的 Javadoc。為了方便起見,提供了標頭存取器
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
從 5.2.11
版開始,當訊息轉換器從 MqttMessage
轉換中擲回異常或傳回 null
時,MqttPahoMessageDrivenChannelAdapter
會將 ErrorMessage
傳送到 errorChannel
(如果已提供)。否則,將此轉換錯誤重新擲回 MQTT 用戶端回呼中。
使用 Java 設定進行設定
以下 Spring Boot 應用程式示範如何使用 Java 設定來設定輸入配接器
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 進行設定
以下 Spring Boot 應用程式提供如何使用 Java DSL 設定輸入配接器的範例
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
輸出通道配接器
輸出通道配接器由 MqttPahoMessageHandler
實作,它封裝在 ConsumerEndpoint
中。為了方便起見,您可以使用命名空間來設定它。
從 4.1 版開始,配接器支援非同步傳送操作,避免阻塞直到傳遞得到確認。您可以發出應用程式事件,以使應用程式能夠在需要時確認傳遞。
以下列表顯示了輸出通道配接器可用的屬性
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" (1)
url="tcp://127.0.0.1:1883" (2)
converter="myConverter" (3)
client-factory="clientFactory" (4)
default-qos="1" (5)
qos-expression="" (6)
default-retained="true" (7)
retained-expression="" (8)
default-topic="bar" (9)
topic-expression="" (10)
async="false" (11)
async-events="false" (12)
channel="target" />
1 | 用戶端 ID。 |
2 | Broker URL。 |
3 | MqttMessageConverter (可選)。預設的 DefaultPahoMessageConverter 識別以下標頭
|
4 | 用戶端工廠。 |
5 | 預設服務品質。如果未找到 mqtt_qos 標頭或 qos-expression 傳回 null ,則使用它。如果您提供自訂 converter ,則不使用它。 |
6 | 用於評估以確定 qos 的運算式。預設值為 headers[mqtt_qos] 。 |
7 | 保留標誌的預設值。如果未找到 mqtt_retained 標頭,則使用它。如果提供了自訂 converter ,則不使用它。 |
8 | 用於評估以確定保留布林值的運算式。預設值為 headers[mqtt_retained] 。 |
9 | 訊息傳送到的預設主題(如果未找到 mqtt_topic 標頭,則使用此主題)。 |
10 | 用於評估以確定目的地主題的運算式。預設值為 headers['mqtt_topic'] 。 |
11 | 當 true 時,調用者不會阻塞。相反,它會等待傳遞確認後再傳送訊息。預設值為 false (傳送會阻塞,直到傳遞得到確認)。 |
12 | 當 async 和 async-events 都為 true 時,會發出 MqttMessageSentEvent (請參閱 事件)。它包含訊息、主題、用戶端程式庫產生的 messageId 、clientId 和 clientInstance (每次用戶端連線時都會遞增)。當用戶端程式庫確認傳遞時,會發出 MqttMessageDeliveredEvent 。它包含 messageId 、clientId 和 clientInstance ,使傳遞能夠與 send() 相關聯。任何 ApplicationListener 或事件輸入通道配接器都可以接收這些事件。請注意,MqttMessageDeliveredEvent 可能會在 MqttMessageSentEvent 之前收到。預設值為 false 。 |
從 4.1 版開始,可以省略 URL。相反,伺服器 URI 可以在 DefaultMqttPahoClientFactory 的 serverURIs 屬性中提供。這可以實現,例如,連線到高可用性 (HA) 叢集。 |
使用 Java 設定進行設定
以下 Spring Boot 應用程式示範如何使用 Java 設定來設定輸出配接器
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 進行設定
以下 Spring Boot 應用程式提供如何使用 Java DSL 設定輸出配接器的範例
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些應用程式事件由配接器發布。
-
MqttConnectionFailedEvent
- 如果我們連線失敗或連線隨後遺失,則由兩個配接器發布。對於 MQTT v5 Paho 用戶端,當伺服器執行正常斷線時,也會發出此事件,在這種情況下,遺失連線的cause
為null
。 -
MqttMessageSentEvent
- 當訊息已傳送時,由輸出配接器發布(如果在非同步模式下執行)。 -
MqttMessageDeliveredEvent
- 當用戶端指示訊息已傳遞時,由輸出配接器發布(如果在非同步模式下執行)。 -
MqttSubscribedEvent
- 在訂閱主題後,由輸入配接器發布。
這些事件可以由 ApplicationListener<MqttIntegrationEvent>
或 @EventListener
方法接收。
若要確定事件的來源,請使用以下方法;您可以檢查 Bean 名稱和/或連線選項(以存取伺服器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支援
從 5.5.5 版開始,spring-integration-mqtt
模組為 MQTT v5 協定提供通道配接器實作。org.eclipse.paho:org.eclipse.paho.mqttv5.client
是一個 optional
依賴項,因此必須在目標專案中明確包含。
由於 MQTT v5 協定在 MQTT 訊息中支援額外的任意屬性,因此引入了 MqttHeaderMapper
實作,以便在發布和接收操作時對應到/從標頭。預設情況下(透過 *
模式),它會對應所有收到的 PUBLISH
框架屬性(包括使用者屬性)。在輸出端,它會對應 PUBLISH
框架的此標頭子集:contentType
、mqtt_messageExpiryInterval
、mqtt_responseTopic
、mqtt_correlationData
。
MQTT v5 協定的輸出通道配接器以 Mqttv5PahoMessageHandler
的形式存在。它需要 clientId
和 MQTT Broker URL 或 MqttConnectionOptions
參考。它支援 MqttClientPersistence
選項,可以是 async
,並且在這種情況下可以發出 MqttIntegrationEvent
物件(請參閱 asyncEvents
選項)。如果請求訊息 Payload 是 org.eclipse.paho.mqttv5.common.MqttMessage
,則會透過內部 IMqttAsyncClient
按原樣發布。如果 Payload 是 byte[]
,則按原樣用於要發布的目標 MqttMessage
Payload。如果 Payload 是 String
,則會轉換為 byte[]
以進行發布。其餘用例委託給提供的 MessageConverter
,它是應用程式內容中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
ConfigurableCompositeMessageConverter
Bean。以下 Java DSL 設定範例示範如何在整合流程中使用此通道配接器
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageHandler 一起使用,因為其合約僅針對 MQTT v3 協定。 |
如果在啟動或執行階段連線失敗,Mqttv5PahoMessageHandler
會嘗試在產生到此處理器的下一個訊息上重新連線。如果此手動重新連線失敗,則連線異常會擲回給調用者。在這種情況下,會套用標準 Spring Integration 錯誤處理程序,包括請求處理器 Advice,例如重試或斷路器。
請參閱 Mqttv5PahoMessageHandler
Javadoc 及其父類別中的更多資訊。
MQTT v5 協定的輸入通道配接器以 Mqttv5PahoMessageDrivenChannelAdapter
的形式存在。它需要 clientId
和 MQTT Broker URL 或 MqttConnectionOptions
參考,以及要訂閱和使用的主題。它支援 MqttClientPersistence
選項,預設情況下為記憶體內。可以設定預期的 payloadType
(預設情況下為 byte[]
),並將其傳播到提供的 SmartMessageConverter
,以便從收到的 MqttMessage
的 byte[]
進行轉換。如果設定了 manualAck
選項,則會將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
標頭新增到要產生的訊息中,作為 SimpleAcknowledgment
的實例。HeaderMapper<MqttProperties>
用於將 PUBLISH
框架屬性(包括使用者屬性)對應到目標訊息標頭。標準 MqttMessage
屬性,例如 qos
、id
、dup
、retained
,加上接收到的主題,始終會對應到標頭。有關更多資訊,請參閱 MqttHeaders
。
從 6.3 版開始,Mqttv5PahoMessageDrivenChannelAdapter
提供了基於 MqttSubscription
的建構函式,用於細微的設定,而不是純粹的主題名稱。當提供這些訂閱時,無法使用通道配接器的 qos
選項,因為這種 qos
模式是 MqttSubscription
API 的一部分。
以下 Java DSL 設定範例示範如何在整合流程中使用此通道配接器
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因為其合約僅針對 MQTT v3 協定。 |
請參閱 Mqttv5PahoMessageDrivenChannelAdapter
Javadoc 及其父類別中的更多資訊。
建議將 MqttConnectionOptions#setAutomaticReconnect(boolean) 設定為 true,以讓內部 IMqttAsyncClient 實例處理重新連線。否則,只有手動重新啟動 Mqttv5PahoMessageDrivenChannelAdapter 才能處理重新連線,例如透過在斷線時處理 MqttConnectionFailedEvent 。 |
共用 MQTT 用戶端支援
如果多個整合需要單個 MQTT ClientID,則不能使用多個 MQTT 用戶端實例,因為 MQTT Broker 可能對每個 ClientID 的連線數量有限制(通常,只允許單個連線)。為了讓單個用戶端重複用於不同的通道配接器,可以使用 org.springframework.integration.mqtt.core.ClientManager
元件,並將其傳遞給所需的任何通道配接器。它將管理 MQTT 連線生命週期,並在需要時執行自動重新連線。此外,可以為用戶端管理器提供自訂連線選項和 MqttClientPersistence
,就像目前可以為通道配接器元件完成的那樣。
請注意,同時支援 MQTT v5 和 v3 通道配接器。
以下 Java DSL 設定範例示範如何在整合流程中使用此用戶端管理器
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://127.0.0.1:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}