AMQP
進階消息佇列協定 (AMQP) 是一種平台中立的線路級協定,適用於面向消息的中介軟體。Spring AMQP 專案將核心 Spring 概念應用於基於 AMQP 的消息傳遞解決方案的開發。Spring Boot 提供了若干便利功能,可透過 RabbitMQ 使用 AMQP,包括 spring-boot-starter-amqp
啟動器。
RabbitMQ 支援
RabbitMQ 是一種輕量、可靠、可擴展且可移植的消息代理程式,基於 AMQP 協定。Spring 使用 RabbitMQ 通過 AMQP 協定進行通訊。
RabbitMQ 組態由 spring.rabbitmq.*
中的外部組態屬性控制。例如,您可能會在 application.properties
中宣告以下章節
-
屬性
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用 addresses
屬性組態相同的連線
-
屬性
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以這種方式指定位址時,將忽略 host 和 port 屬性。如果位址使用 amqps 協定,則會自動啟用 SSL 支援。 |
請參閱 RabbitProperties
以取得更多受支援的基於屬性的組態選項。若要組態 Spring AMQP 使用的 RabbitMQ ConnectionFactory
的較低層級詳細資訊,請定義 ConnectionFactoryCustomizer
bean。
如果內容中存在 ConnectionNameStrategy
bean,它將自動用於命名由自動組態的 CachingConnectionFactory
建立的連線。
若要對 RabbitTemplate
進行應用程式範圍的附加自訂,請使用 RabbitTemplateCustomizer
bean。
請參閱 Understanding AMQP, the protocol used by RabbitMQ 以取得更多詳細資訊。 |
發送消息
Spring 的 AmqpTemplate
和 AmqpAdmin
是自動組態的,您可以將它們直接自動裝配到您自己的 bean 中,如下例所示
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter bean,則會自動將其關聯到自動組態的 AmqpTemplate 。 |
如有必要,任何定義為 bean 的 org.springframework.amqp.core.Queue
都會自動用於在 RabbitMQ 實例上宣告對應的佇列。
若要重試操作,您可以在 AmqpTemplate
上啟用重試 (例如,在代理程式連線遺失的情況下)
-
屬性
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
預設情況下,重試處於停用狀態。您也可以透過宣告 RabbitRetryTemplateCustomizer
bean 以程式設計方式自訂 RetryTemplate
。
如果您需要建立更多 RabbitTemplate
實例,或者您想要覆寫預設值,Spring Boot 提供了 RabbitTemplateConfigurer
bean,您可以使用它來初始化 RabbitTemplate
,使其具有與自動組態使用的工廠相同的設定。
發送消息到串流
若要將消息發送到特定串流,請指定串流的名稱,如下例所示
-
屬性
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定義了 MessageConverter
、StreamMessageConverter
或 ProducerCustomizer
bean,則會自動將其關聯到自動組態的 RabbitStreamTemplate
。
如果您需要建立更多 RabbitStreamTemplate
實例,或者您想要覆寫預設值,Spring Boot 提供了 RabbitStreamTemplateConfigurer
bean,您可以使用它來初始化 RabbitStreamTemplate
,使其具有與自動組態使用的工廠相同的設定。
接收消息
當 Rabbit 基礎架構存在時,可以使用 @RabbitListener
註解任何 bean 以建立監聽器端點。如果未定義 RabbitListenerContainerFactory
,則會自動組態預設的 SimpleRabbitListenerContainerFactory
,並且您可以使用 spring.rabbitmq.listener.type
屬性切換到直接容器。如果定義了 MessageConverter
或 MessageRecoverer
bean,則會自動將其與預設工廠關聯。
以下範例元件在 someQueue
佇列上建立監聽器端點
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
請參閱 @EnableRabbit 以取得更多詳細資訊。 |
如果您需要建立更多 RabbitListenerContainerFactory
實例,或者您想要覆寫預設值,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurer
和 DirectRabbitListenerContainerFactoryConfigurer
,您可以使用它們來初始化 SimpleRabbitListenerContainerFactory
和 DirectRabbitListenerContainerFactory
,使其具有與自動組態使用的工廠相同的設定。
您選擇哪種容器類型並不重要。這兩個 bean 由自動組態公開。 |
例如,以下組態類別公開了另一個使用特定 MessageConverter
的工廠
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然後,您可以在任何 @RabbitListener
註解的方法中使用該工廠,如下所示
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以啟用重試來處理監聽器拋出例外狀況的情況。預設情況下,使用 RejectAndDontRequeueRecoverer
,但您可以定義自己的 MessageRecoverer
。當重試耗盡時,消息會被拒絕,並且如果代理程式組態為這樣做,則會被丟棄或路由到死信交換。預設情況下,重試處於停用狀態。您也可以透過宣告 RabbitRetryTemplateCustomizer
bean 以程式設計方式自訂 RetryTemplate
。
預設情況下,如果重試處於停用狀態且監聽器拋出例外狀況,則會無限期地重試傳遞。您可以透過兩種方式修改此行為:將 defaultRequeueRejected 屬性設定為 false ,以便不嘗試重新傳遞,或拋出 AmqpRejectAndDontRequeueException 以發出消息應被拒絕的訊號。後者是在啟用重試且達到最大傳遞嘗試次數時使用的機制。 |