MQTT 支援

Spring Integration 提供輸入和輸出通道配接器,以支援訊息佇列遙測傳輸 (MQTT) 協定。

您需要將此依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.5"

目前的實作使用 Eclipse Paho MQTT Client 程式庫。

XML 設定和本章的大部分內容是關於 MQTT v3.1 協定支援和相關的 Paho Client。請參閱 MQTT v5 支援 段落以了解相關的協定支援。

兩個配接器的設定都是透過使用 DefaultMqttPahoClientFactory 來完成。有關設定選項的更多資訊,請參閱 Paho 文件。

我們建議設定一個 MqttConnectOptions 物件,並將其注入到工廠中,而不是在工廠本身上設定(已棄用的)選項。

輸入 (訊息驅動) 通道配接器

輸入通道配接器由 MqttPahoMessageDrivenChannelAdapter 實作。為了方便起見,您可以使用命名空間來設定它。最小的設定可能如下所示

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表顯示了可用的屬性

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 用戶端 ID。
2 Broker URL。
3 以此配接器接收訊息的主題的逗號分隔列表。
4 QoS 值的逗號分隔列表。它可以是應用於所有主題的單個值,也可以是每個主題的值(在這種情況下,列表的長度必須相同)。
5 MqttMessageConverter(可選)。預設情況下,預設的 DefaultPahoMessageConverter 會產生具有 String Payload 的訊息,以及以下標頭
  • mqtt_topic:接收訊息的主題

  • mqtt_duplicate:如果訊息是重複的,則為 true

  • mqtt_qos:服務品質。您可以將 DefaultPahoMessageConverter 設定為在 Payload 中傳回原始 byte[],方法是將其宣告為 <bean/> 並將 payloadAsBytes 屬性設定為 true

6 用戶端工廠。
7 send() 超時。它僅在通道可能阻塞時適用(例如,目前已滿的有界 QueueChannel)。
8 錯誤通道。下游異常會以 ErrorMessage 的形式傳送到此通道(如果已提供)。Payload 是 MessagingException,其中包含失敗的訊息和原因。
9 恢復間隔。它控制配接器在失敗後嘗試重新連線的間隔。預設值為 10000ms(十秒)。
10 確認模式;設定為 true 以進行手動確認。
從 4.1 版開始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactoryserverURIs 屬性中提供伺服器 URI。這樣做可以實現,例如,連線到高可用性 (HA) 叢集。

從 4.2.2 版開始,當配接器成功訂閱主題時,會發布 MqttSubscribedEvent。當連線或訂閱失敗時,會發布 MqttConnectionFailedEvent 事件。這些事件可以由實作 ApplicationListener 的 Bean 接收。

此外,一個名為 recoveryInterval 的新屬性控制配接器在失敗後嘗試重新連線的間隔。預設值為 10000ms(十秒)。

在 4.2.3 版之前,當配接器停止時,用戶端始終取消訂閱。這是錯誤的,因為如果用戶端 QOS 大於 0,我們需要保持訂閱處於活動狀態,以便在配接器停止時到達的訊息在下次啟動時傳遞。這也需要將用戶端工廠上的 cleanSession 屬性設定為 false。預設值為 true

從 4.2.3 版開始,如果 cleanSession 屬性為 false,則配接器不會取消訂閱(預設情況下)。

此行為可以透過設定工廠上的 consumerCloseAction 屬性來覆寫。它可以具有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。後者(預設值)僅在 cleanSession 屬性為 true 時取消訂閱。

若要還原為 4.2.3 之前的行為,請使用 UNSUBSCRIBE_ALWAYS

從 5.0 版開始,topicqosretained 屬性對應到 .RECEIVED_…​ 標頭(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免意外傳播到(預設情況下)使用 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 標頭的輸出訊息。

在執行階段新增和移除主題

從 4.1 版開始,您可以透過程式設計方式變更配接器訂閱的主題。Spring Integration 提供 addTopic()removeTopic() 方法。新增主題時,您可以選擇性地指定 QoS(預設值:1)。您也可以透過將適當的訊息傳送到具有適當 Payload 的 <control-bus/> 來修改主題,例如:"myMqttAdapter.addTopic('foo', 1)"

停止和啟動配接器對主題列表沒有影響(它不會還原為設定中的原始設定)。變更不會超出應用程式內容的生命週期。新的應用程式內容會還原為設定的設定。

當配接器停止(或與 Broker 斷線)時變更主題,會在下次建立連線時生效。

手動確認

從 5.3 版開始,您可以將 manualAcks 屬性設定為 true。通常用於非同步確認傳遞。當設定為 true 時,標頭 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 會新增到訊息中,其值為 SimpleAcknowledgment。您必須調用 acknowledge() 方法才能完成傳遞。有關更多資訊,請參閱 IMqttClient setManualAcks()messageArrivedComplete() 的 Javadoc。為了方便起見,提供了標頭存取器

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

5.2.11 版開始,當訊息轉換器從 MqttMessage 轉換中擲回異常或傳回 null 時,MqttPahoMessageDrivenChannelAdapter 會將 ErrorMessage 傳送到 errorChannel(如果已提供)。否則,將此轉換錯誤重新擲回 MQTT 用戶端回呼中。

使用 Java 設定進行設定

以下 Spring Boot 應用程式示範如何使用 Java 設定來設定輸入配接器

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 進行設定

以下 Spring Boot 應用程式提供如何使用 Java DSL 設定輸入配接器的範例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

輸出通道配接器

輸出通道配接器由 MqttPahoMessageHandler 實作,它封裝在 ConsumerEndpoint 中。為了方便起見,您可以使用命名空間來設定它。

從 4.1 版開始,配接器支援非同步傳送操作,避免阻塞直到傳遞得到確認。您可以發出應用程式事件,以使應用程式能夠在需要時確認傳遞。

以下列表顯示了輸出通道配接器可用的屬性

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 用戶端 ID。
2 Broker URL。
3 MqttMessageConverter(可選)。預設的 DefaultPahoMessageConverter 識別以下標頭
  • mqtt_topic:訊息將傳送到的主題

  • mqtt_retained:如果訊息要保留,則為 true

  • mqtt_qos:服務品質

4 用戶端工廠。
5 預設服務品質。如果未找到 mqtt_qos 標頭或 qos-expression 傳回 null,則使用它。如果您提供自訂 converter,則不使用它。
6 用於評估以確定 qos 的運算式。預設值為 headers[mqtt_qos]
7 保留標誌的預設值。如果未找到 mqtt_retained 標頭,則使用它。如果提供了自訂 converter,則不使用它。
8 用於評估以確定保留布林值的運算式。預設值為 headers[mqtt_retained]
9 訊息傳送到的預設主題(如果未找到 mqtt_topic 標頭,則使用此主題)。
10 用於評估以確定目的地主題的運算式。預設值為 headers['mqtt_topic']
11 true 時,調用者不會阻塞。相反,它會等待傳遞確認後再傳送訊息。預設值為 false(傳送會阻塞,直到傳遞得到確認)。
12 asyncasync-events 都為 true 時,會發出 MqttMessageSentEvent(請參閱 事件)。它包含訊息、主題、用戶端程式庫產生的 messageIdclientIdclientInstance(每次用戶端連線時都會遞增)。當用戶端程式庫確認傳遞時,會發出 MqttMessageDeliveredEvent。它包含 messageIdclientIdclientInstance,使傳遞能夠與 send() 相關聯。任何 ApplicationListener 或事件輸入通道配接器都可以接收這些事件。請注意,MqttMessageDeliveredEvent 可能會在 MqttMessageSentEvent 之前收到。預設值為 false
從 4.1 版開始,可以省略 URL。相反,伺服器 URI 可以在 DefaultMqttPahoClientFactoryserverURIs 屬性中提供。這可以實現,例如,連線到高可用性 (HA) 叢集。

使用 Java 設定進行設定

以下 Spring Boot 應用程式示範如何使用 Java 設定來設定輸出配接器

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 進行設定

以下 Spring Boot 應用程式提供如何使用 Java DSL 設定輸出配接器的範例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

某些應用程式事件由配接器發布。

  • MqttConnectionFailedEvent - 如果我們連線失敗或連線隨後遺失,則由兩個配接器發布。對於 MQTT v5 Paho 用戶端,當伺服器執行正常斷線時,也會發出此事件,在這種情況下,遺失連線的 causenull

  • MqttMessageSentEvent - 當訊息已傳送時,由輸出配接器發布(如果在非同步模式下執行)。

  • MqttMessageDeliveredEvent - 當用戶端指示訊息已傳遞時,由輸出配接器發布(如果在非同步模式下執行)。

  • MqttSubscribedEvent - 在訂閱主題後,由輸入配接器發布。

這些事件可以由 ApplicationListener<MqttIntegrationEvent>@EventListener 方法接收。

若要確定事件的來源,請使用以下方法;您可以檢查 Bean 名稱和/或連線選項(以存取伺服器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支援

從 5.5.5 版開始,spring-integration-mqtt 模組為 MQTT v5 協定提供通道配接器實作。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一個 optional 依賴項,因此必須在目標專案中明確包含。

由於 MQTT v5 協定在 MQTT 訊息中支援額外的任意屬性,因此引入了 MqttHeaderMapper 實作,以便在發布和接收操作時對應到/從標頭。預設情況下(透過 * 模式),它會對應所有收到的 PUBLISH 框架屬性(包括使用者屬性)。在輸出端,它會對應 PUBLISH 框架的此標頭子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 協定的輸出通道配接器以 Mqttv5PahoMessageHandler 的形式存在。它需要 clientId 和 MQTT Broker URL 或 MqttConnectionOptions 參考。它支援 MqttClientPersistence 選項,可以是 async,並且在這種情況下可以發出 MqttIntegrationEvent 物件(請參閱 asyncEvents 選項)。如果請求訊息 Payload 是 org.eclipse.paho.mqttv5.common.MqttMessage,則會透過內部 IMqttAsyncClient 按原樣發布。如果 Payload 是 byte[],則按原樣用於要發布的目標 MqttMessage Payload。如果 Payload 是 String,則會轉換為 byte[] 以進行發布。其餘用例委託給提供的 MessageConverter,它是應用程式內容中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter Bean。以下 Java DSL 設定範例示範如何在整合流程中使用此通道配接器

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageHandler 一起使用,因為其合約僅針對 MQTT v3 協定。

如果在啟動或執行階段連線失敗,Mqttv5PahoMessageHandler 會嘗試在產生到此處理器的下一個訊息上重新連線。如果此手動重新連線失敗,則連線異常會擲回給調用者。在這種情況下,會套用標準 Spring Integration 錯誤處理程序,包括請求處理器 Advice,例如重試或斷路器。

請參閱 Mqttv5PahoMessageHandler Javadoc 及其父類別中的更多資訊。

MQTT v5 協定的輸入通道配接器以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要 clientId 和 MQTT Broker URL 或 MqttConnectionOptions 參考,以及要訂閱和使用的主題。它支援 MqttClientPersistence 選項,預設情況下為記憶體內。可以設定預期的 payloadType(預設情況下為 byte[]),並將其傳播到提供的 SmartMessageConverter,以便從收到的 MqttMessagebyte[] 進行轉換。如果設定了 manualAck 選項,則會將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 標頭新增到要產生的訊息中,作為 SimpleAcknowledgment 的實例。HeaderMapper<MqttProperties> 用於將 PUBLISH 框架屬性(包括使用者屬性)對應到目標訊息標頭。標準 MqttMessage 屬性,例如 qosiddupretained,加上接收到的主題,始終會對應到標頭。有關更多資訊,請參閱 MqttHeaders

從 6.3 版開始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基於 MqttSubscription 的建構函式,用於細微的設定,而不是純粹的主題名稱。當提供這些訂閱時,無法使用通道配接器的 qos 選項,因為這種 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 設定範例示範如何在整合流程中使用此通道配接器

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能與 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因為其合約僅針對 MQTT v3 協定。

請參閱 Mqttv5PahoMessageDrivenChannelAdapter Javadoc 及其父類別中的更多資訊。

建議將 MqttConnectionOptions#setAutomaticReconnect(boolean) 設定為 true,以讓內部 IMqttAsyncClient 實例處理重新連線。否則,只有手動重新啟動 Mqttv5PahoMessageDrivenChannelAdapter 才能處理重新連線,例如透過在斷線時處理 MqttConnectionFailedEvent

共用 MQTT 用戶端支援

如果多個整合需要單個 MQTT ClientID,則不能使用多個 MQTT 用戶端實例,因為 MQTT Broker 可能對每個 ClientID 的連線數量有限制(通常,只允許單個連線)。為了讓單個用戶端重複用於不同的通道配接器,可以使用 org.springframework.integration.mqtt.core.ClientManager 元件,並將其傳遞給所需的任何通道配接器。它將管理 MQTT 連線生命週期,並在需要時執行自動重新連線。此外,可以為用戶端管理器提供自訂連線選項和 MqttClientPersistence,就像目前可以為通道配接器元件完成的那樣。

請注意,同時支援 MQTT v5 和 v3 通道配接器。

以下 Java DSL 設定範例示範如何在整合流程中使用此用戶端管理器

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://127.0.0.1:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}