AMQP

進階消息佇列協定 (AMQP) 是一種平台中立的線路級協定,適用於面向消息的中介軟體。Spring AMQP 專案將核心 Spring 概念應用於基於 AMQP 的消息傳遞解決方案的開發。Spring Boot 提供了若干便利功能,可透過 RabbitMQ 使用 AMQP,包括 spring-boot-starter-amqp 啟動器。

RabbitMQ 支援

RabbitMQ 是一種輕量、可靠、可擴展且可移植的消息代理程式,基於 AMQP 協定。Spring 使用 RabbitMQ 通過 AMQP 協定進行通訊。

RabbitMQ 組態由 spring.rabbitmq.* 中的外部組態屬性控制。例如,您可能會在 application.properties 中宣告以下章節

  • 屬性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用 addresses 屬性組態相同的連線

  • 屬性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以這種方式指定位址時,將忽略 hostport 屬性。如果位址使用 amqps 協定,則會自動啟用 SSL 支援。

請參閱 RabbitProperties 以取得更多受支援的基於屬性的組態選項。若要組態 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的較低層級詳細資訊,請定義 ConnectionFactoryCustomizer bean。

如果內容中存在 ConnectionNameStrategy bean,它將自動用於命名由自動組態的 CachingConnectionFactory 建立的連線。

若要對 RabbitTemplate 進行應用程式範圍的附加自訂,請使用 RabbitTemplateCustomizer bean。

請參閱 Understanding AMQP, the protocol used by RabbitMQ 以取得更多詳細資訊。

發送消息

Spring 的 AmqpTemplateAmqpAdmin 是自動組態的,您可以將它們直接自動裝配到您自己的 bean 中,如下例所示

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter bean,則會自動將其關聯到自動組態的 AmqpTemplate

如有必要,任何定義為 bean 的 org.springframework.amqp.core.Queue 都會自動用於在 RabbitMQ 實例上宣告對應的佇列。

若要重試操作,您可以在 AmqpTemplate 上啟用重試 (例如,在代理程式連線遺失的情況下)

  • 屬性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

預設情況下,重試處於停用狀態。您也可以透過宣告 RabbitRetryTemplateCustomizer bean 以程式設計方式自訂 RetryTemplate

如果您需要建立更多 RabbitTemplate 實例,或者您想要覆寫預設值,Spring Boot 提供了 RabbitTemplateConfigurer bean,您可以使用它來初始化 RabbitTemplate,使其具有與自動組態使用的工廠相同的設定。

發送消息到串流

若要將消息發送到特定串流,請指定串流的名稱,如下例所示

  • 屬性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定義了 MessageConverterStreamMessageConverterProducerCustomizer bean,則會自動將其關聯到自動組態的 RabbitStreamTemplate

如果您需要建立更多 RabbitStreamTemplate 實例,或者您想要覆寫預設值,Spring Boot 提供了 RabbitStreamTemplateConfigurer bean,您可以使用它來初始化 RabbitStreamTemplate,使其具有與自動組態使用的工廠相同的設定。

接收消息

當 Rabbit 基礎架構存在時,可以使用 @RabbitListener 註解任何 bean 以建立監聽器端點。如果未定義 RabbitListenerContainerFactory,則會自動組態預設的 SimpleRabbitListenerContainerFactory,並且您可以使用 spring.rabbitmq.listener.type 屬性切換到直接容器。如果定義了 MessageConverterMessageRecoverer bean,則會自動將其與預設工廠關聯。

以下範例元件在 someQueue 佇列上建立監聽器端點

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
請參閱 @EnableRabbit 以取得更多詳細資訊。

如果您需要建立更多 RabbitListenerContainerFactory 實例,或者您想要覆寫預設值,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,您可以使用它們來初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,使其具有與自動組態使用的工廠相同的設定。

您選擇哪種容器類型並不重要。這兩個 bean 由自動組態公開。

例如,以下組態類別公開了另一個使用特定 MessageConverter 的工廠

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然後,您可以在任何 @RabbitListener 註解的方法中使用該工廠,如下所示

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

您可以啟用重試來處理監聽器拋出例外狀況的情況。預設情況下,使用 RejectAndDontRequeueRecoverer,但您可以定義自己的 MessageRecoverer。當重試耗盡時,消息會被拒絕,並且如果代理程式組態為這樣做,則會被丟棄或路由到死信交換。預設情況下,重試處於停用狀態。您也可以透過宣告 RabbitRetryTemplateCustomizer bean 以程式設計方式自訂 RetryTemplate

預設情況下,如果重試處於停用狀態且監聽器拋出例外狀況,則會無限期地重試傳遞。您可以透過兩種方式修改此行為:將 defaultRequeueRejected 屬性設定為 false,以便不嘗試重新傳遞,或拋出 AmqpRejectAndDontRequeueException 以發出消息應被拒絕的訊號。後者是在啟用重試且達到最大傳遞嘗試次數時使用的機制。