範例應用程式

Spring AMQP 範例專案包含兩個範例應用程式。第一個是簡單的「Hello World」範例,示範同步和非同步訊息接收。它為理解基本組件提供了絕佳的起點。第二個範例基於股票交易用例,以示範真實世界應用程式中常見的互動類型。在本章中,我們將快速瀏覽每個範例,以便您可以專注於最重要的組件。這些範例均以 Maven 為基礎,因此您應該能夠將它們直接匯入任何支援 Maven 的 IDE(例如 SpringSource Tool Suite)。

「Hello World」範例

「Hello World」範例示範同步和非同步訊息接收。您可以將 spring-rabbit-helloworld 範例匯入 IDE,然後按照以下討論進行操作。

同步範例

src/main/java 目錄中,導覽至 org.springframework.amqp.helloworld 套件。開啟 HelloWorldConfiguration 類別,並注意它在類別層級包含 @Configuration 註解,並注意方法層級的一些 @Bean 註解。這是 Spring 基於 Java 的組態範例。您可以在此處閱讀更多相關資訊。

以下清單顯示如何建立連線工廠

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

組態也包含 RabbitAdmin 的執行個體,預設情況下,它會尋找任何類型為交換器、佇列或綁定的 Bean,然後在 Broker 上宣告它們。實際上,在 HelloWorldConfiguration 中產生的 helloWorldQueue Bean 就是一個範例,因為它是 Queue 的執行個體。

以下清單顯示 helloWorldQueue Bean 定義

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顧 rabbitTemplate Bean 組態,您可以看到它將 helloWorldQueue 的名稱設定為其 queue 屬性(用於接收訊息)和 routingKey 屬性(用於傳送訊息)。

現在我們已經探索了組態,我們可以看看實際使用這些組件的程式碼。首先,從同一個套件中開啟 Producer 類別。它包含一個 main() 方法,用於建立 Spring ApplicationContext

以下清單顯示 main 方法

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在上述範例中,擷取 AmqpTemplate Bean 並用於傳送 Message。由於用戶端程式碼應盡可能依賴介面,因此類型為 AmqpTemplate 而不是 RabbitTemplate。即使在 HelloWorldConfiguration 中建立的 Bean 是 RabbitTemplate 的執行個體,但依賴介面意味著此程式碼更具可攜性(您可以獨立於程式碼變更組態)。由於調用了 convertAndSend() 方法,範本會委派給其 MessageConverter 執行個體。在本例中,它使用預設的 SimpleMessageConverter,但可以為 rabbitTemplate Bean 提供不同的實作,如 HelloWorldConfiguration 中所定義。

現在開啟 Consumer 類別。它實際上共用相同的組態基底類別,這表示它共用 rabbitTemplate Bean。這就是我們將該範本組態為同時具有 routingKey(用於傳送)和 queue(用於接收)的原因。正如我們在 AmqpTemplate 中描述的那樣,您可以改為將 'routingKey' 引數傳遞給 send 方法,並將 'queue' 引數傳遞給 receive 方法。Consumer 程式碼基本上是 Producer 的鏡像,調用 receiveAndConvert() 而不是 convertAndSend()

以下清單顯示 Consumer 的 main 方法

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果您執行 Producer,然後執行 Consumer,您應該會在主控台輸出中看到 Received: Hello World

非同步範例

同步範例 逐步介紹了同步 Hello World 範例。本節介紹一個稍微更進階但功能更強大的選項。只需稍作修改,Hello World 範例即可提供非同步接收的範例,也稱為訊息驅動 POJO。實際上,有一個子套件正好提供了這一點:org.springframework.amqp.samples.helloworld.async

同樣,我們從傳送端開始。開啟 ProducerConfiguration 類別,並注意它建立了 connectionFactoryrabbitTemplate Bean。這次,由於組態專用於訊息傳送端,我們甚至不需要任何佇列定義,並且 RabbitTemplate 僅設定了 'routingKey' 屬性。回想一下,訊息是傳送到交換器而不是直接傳送到佇列。AMQP 預設交換器是沒有名稱的直接交換器。所有佇列都使用其名稱作為路由金鑰繫結到該預設交換器。這就是為什麼我們在這裡只需要提供路由金鑰。

以下清單顯示 rabbitTemplate 定義

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由於此範例示範非同步訊息接收,因此產生端旨在持續傳送訊息(如果它是像同步版本那樣的每個執行個體訊息模型,則它作為訊息驅動消費者的事實就不會那麼明顯)。負責持續傳送訊息的組件定義為 ProducerConfiguration 中的內部類別。它被組態為每三秒執行一次。

以下清單顯示該組件

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

您不需要了解所有詳細資訊,因為真正的重點應該放在接收端(我們將在後面介紹)。但是,如果您還不熟悉 Spring 工作排程支援,您可以在此處了解更多資訊。簡而言之,ProducerConfiguration 中的 postProcessor Bean 會向排程器註冊工作。

現在我們可以轉向接收端。為了強調訊息驅動 POJO 行為,我們從對訊息做出反應的組件開始。該類別稱為 HelloWorldHandler,如下清單所示

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

該類別是一個 POJO。它不擴充任何基底類別,不實作任何介面,甚至不包含任何匯入。它透過 Spring AMQP MessageListenerAdapter「調整」為 MessageListener 介面。然後,您可以在 SimpleMessageListenerContainer 上組態該配接器。對於此範例,容器是在 ConsumerConfiguration 類別中建立的。您可以在那裡看到包裝在配接器中的 POJO。

以下清單顯示如何定義 listenerContainer

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer 是一個 Spring 生命周期組件,預設情況下會自動啟動。如果您查看 Consumer 類別,您可以看到其 main() 方法僅包含一行引導程式碼來建立 ApplicationContext。Producer 的 main() 方法也是一行引導程式碼,因為其方法使用 @Scheduled 註解的組件也會自動啟動。您可以按任何順序啟動 ProducerConsumer,您應該會看到每三秒傳送和接收訊息。

股票交易

股票交易範例示範了比Hello World 範例更進階的訊息傳遞情境。但是,組態非常相似,只是稍微複雜一些。由於我們詳細介紹了 Hello World 組態,因此在這裡,我們重點介紹此範例的不同之處。有一個伺服器將市場資料(股票報價)推送到主題交換器。然後,用戶端可以透過使用路由模式(例如 app.stock.quotes.nasdaq.*)繫結佇列來訂閱市場資料饋送。此示範的另一個主要功能是由用戶端啟動並由伺服器處理的請求-回覆「股票交易」互動。這涉及一個私有 replyTo 佇列,該佇列由用戶端在訂單請求訊息本身中傳送。

伺服器的核心組態位於 org.springframework.amqp.rabbit.stocks.config.server 套件中的 RabbitServerConfiguration 類別中。它擴充了 AbstractStockAppRabbitConfiguration。這是在伺服器和用戶端之間共用的資源定義的位置,包括市場資料主題交換器(其名稱為 'app.stock.marketdata')和伺服器為股票交易公開的佇列(其名稱為 'app.stock.request')。在該共用組態檔案中,您還可以看到在 RabbitTemplate 上組態了 Jackson2JsonMessageConverter

伺服器特定的組態包含兩件事。首先,它在 RabbitTemplate 上組態市場資料交換器,以便它不需要在每次調用以傳送 Message 時都提供該交換器名稱。它在基底組態類別中定義的抽象回呼方法中執行此操作。以下清單顯示該方法

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,宣告股票請求佇列。在這種情況下,它不需要任何明確的綁定,因為它使用自己的名稱作為路由金鑰綁定到預設的無名稱交換器。如前所述,AMQP 規範定義了該行為。以下清單顯示 stockRequestQueue Bean 的定義

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

現在您已經看到了伺服器 AMQP 資源的組態,請導覽至 src/test/java 目錄下的 org.springframework.amqp.rabbit.stocks 套件。在那裡,您可以看到實際的 Server 類別,它提供了 main() 方法。它基於 server-bootstrap.xml 組態檔建立 ApplicationContext。在那裡,您可以看到發佈虛擬市場資料的排程工作。該組態依賴於 Spring 的 task 命名空間支援。引導程式碼組態檔案還匯入了一些其他檔案。最有趣的一個是 server-messaging.xml,它直接位於 src/main/resources 下。在那裡,您可以看到負責處理股票交易請求的 messageListenerContainer Bean。最後,查看在 server-handlers.xml(也在 'src/main/resources' 中)中定義的 serverHandler Bean。該 Bean 是 ServerHandler 類別的執行個體,並且是可以傳送回覆訊息的訊息驅動 POJO 的良好範例。請注意,它本身並未耦合到架構或任何 AMQP 概念。它接受 TradeRequest 並回傳 TradeResponse。以下清單顯示 handleMessage 方法的定義

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

現在我們已經看到了伺服器的最重要組態和程式碼,我們可以轉向用戶端。最好的起點可能是 org.springframework.amqp.rabbit.stocks.config.client 套件中的 RabbitClientConfiguration。請注意,它宣告了兩個沒有提供明確名稱的佇列。以下清單顯示了兩個佇列的 Bean 定義

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

這些是私有佇列,並且會自動產生唯一名稱。用戶端使用第一個產生的佇列來綁定到伺服器公開的市場資料交換器。回想一下,在 AMQP 中,消費者與佇列互動,而產生者與交換器互動。佇列到交換器的「綁定」是告訴 Broker 將訊息從給定交換器傳遞(或路由)到佇列的方式。由於市場資料交換器是主題交換器,因此可以使用路由模式來表示綁定。RabbitClientConfiguration 使用 Binding 物件執行此操作,並且該物件是使用 BindingBuilder Fluent API 產生的。以下清單顯示 Binding

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

請注意,實際值已外部化到屬性檔案(src/main/resources 下的 client.properties)中,並且我們使用 Spring 的 @Value 註解來注入該值。這通常是一個好主意。否則,該值將被硬編碼在類別中,並且在不重新編譯的情況下無法修改。在這種情況下,更容易在變更用於綁定的路由模式時執行多個版本的用戶端。我們現在可以嘗試一下。

首先執行 org.springframework.amqp.rabbit.stocks.Server,然後執行 org.springframework.amqp.rabbit.stocks.Client。您應該會看到 NASDAQ 股票的虛擬報價,因為 client.properties 中與 'stocks.quote.pattern' 金鑰關聯的目前值是 'app.stock.quotes.nasdaq.'。現在,在保持現有 ServerClient 執行的同時,將該屬性值變更為 'app.stock.quotes.nyse.' 並啟動第二個 Client 執行個體。您應該會看到第一個用戶端仍然接收 NASDAQ 報價,而第二個用戶端接收 NYSE 報價。您可以改為變更模式以取得所有股票甚至個別股票代碼。

我們探索的最後一個功能是從用戶端的角度來看的請求-回覆互動。回想一下,我們已經看到了接受 TradeRequest 物件並回傳 TradeResponse 物件的 ServerHandlerClient 端上的對應程式碼是 org.springframework.amqp.rabbit.stocks.gateway 套件中的 RabbitStockServiceGateway。它委派給 RabbitTemplate 以傳送訊息。以下清單顯示 send 方法

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

請注意,在傳送訊息之前,它會設定 replyTo 位址。它提供了由 traderJoeQueue Bean 定義產生的佇列(如前所示)。以下清單顯示 StockServiceGateway 類別本身的 @Bean 定義

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果您不再執行伺服器和用戶端,請立即啟動它們。嘗試傳送格式為 '100 TCKR' 的請求。在模擬請求「處理」的短暫人工延遲之後,您應該會在用戶端上看到確認訊息出現。

從非 Spring 應用程式接收 JSON

Spring 應用程式在傳送 JSON 時,會將 TypeId 標頭設定為完整類別名稱,以協助接收應用程式將 JSON 轉換回 Java 物件。

spring-rabbit-json 範例探索了從非 Spring 應用程式轉換 JSON 的幾種技術。