偵測閒置的非同步消費者

雖然有效率,但非同步消費者的一個問題是偵測它們何時閒置 — 使用者可能希望在一段時間沒有收到訊息時採取某些動作。

從 1.6 版開始,現在可以設定監聽器容器,以便在一段時間沒有訊息傳遞時發布 ListenerContainerIdleEvent。當容器閒置時,每隔 idleEventInterval 毫秒就會發布一個事件。

若要設定此功能,請在容器上設定 idleEventInterval。下列範例顯示如何在 XML 和 Java 中執行此操作 (適用於 SimpleMessageListenerContainerSimpleRabbitListenerContainerFactory)

<rabbit:listener-container connection-factory="connectionFactory"
        ...
        idle-event-interval="60000"
        ...
        >
    <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ...
    container.setIdleEventInterval(60000L);
    ...
    return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setIdleEventInterval(60000L);
    ...
    return factory;
}

在這些情況下,容器閒置時,每分鐘會發布一次事件。

事件消耗

您可以透過實作 ApplicationListener 捕捉閒置事件 — 一般監聽器,或僅限於接收此特定事件的監聽器。您也可以使用 Spring Framework 4.2 中導入的 @EventListener

下列範例將 @RabbitListener@EventListener 組合到單一類別中。您需要理解,應用程式監聽器會取得所有容器的事件,因此如果您想要根據哪個容器閒置來採取特定動作,您可能需要檢查監聽器 ID。您也可以使用 @EventListener condition 來達到此目的。

這些事件有四個屬性

  • source:監聽器容器實例

  • id:監聽器 ID (或容器 Bean 名稱)

  • idleTime:事件發布時容器已閒置的時間

  • queueNames:容器監聽的佇列名稱

下列範例顯示如何使用 @RabbitListener@EventListener 註解建立監聽器

public class Listener {

    @RabbitListener(id="someId", queues="#{queue.name}")
    public String listen(String foo) {
        return foo.toUpperCase();
    }

    @EventListener(condition = "event.listenerId == 'someId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        ...
    }

}
事件監聽器會看到所有容器的事件。因此,在先前的範例中,我們根據監聽器 ID 縮小接收到的事件範圍。
如果您希望使用閒置事件來停止監聽器容器,則不應在呼叫監聽器的執行緒上呼叫 container.stop()。這樣做總是會導致延遲和不必要的記錄訊息。相反地,您應該將事件交給不同的執行緒,然後該執行緒可以停止容器。