連入通道配接器
以下列表顯示了 AMQP 連入通道配接器的可能設定選項
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
tx-size="" (25)
consumers-per-queue (26)
batch-mode="MESSAGES"/> (27)
<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See xref:amqp/inbound-ack.adoc[Inbound Endpoint Acknowledge Mode].
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://spring-docs.dev.org.tw/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://spring-docs.dev.org.tw/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
容器
請注意,當使用 XML 設定外部容器時,您無法使用 Spring AMQP 命名空間來定義容器。這是因為命名空間至少需要一個
|
即使 Spring Integration JMS 和 AMQP 支援相似,仍然存在重要的差異。 JMS 連入通道配接器在底層使用 JmsDestinationPollingSource ,並需要設定輪詢器。 AMQP 連入通道配接器使用 AbstractMessageListenerContainer 並且是訊息驅動的。在這方面,它更類似於 JMS 訊息驅動通道配接器。 |
從 5.5 版開始,AmqpInboundChannelAdapter
可以使用 org.springframework.amqp.rabbit.retry.MessageRecoverer
策略進行設定,該策略在內部呼叫重試操作時用於 RecoveryCallback
中。有關更多資訊,請參閱 setMessageRecoverer()
JavaDocs。
@Publisher
註解也可以與 @RabbitListener
結合使用
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
預設情況下,@Publisher
AOP 攔截器處理方法呼叫的回傳值。但是,來自 @RabbitListener
方法的回傳值被視為 AMQP 回覆訊息。因此,這種方法不能與 @Publisher
一起使用,因此建議使用 @Payload
註解以及針對方法引數的相應 SpEL 運算式來組合使用。 有關 @Publisher
的更多資訊,請參閱註解驅動設定章節。
當在監聽器容器中使用獨佔或單一活動消費者時,建議您將容器屬性 forceStop 設定為 true 。這將防止在停止容器後,另一個消費者可能在本實例完全停止之前開始消耗訊息的競爭條件。 |
批次訊息
有關批次訊息的更多資訊,請參閱Spring AMQP 文件。
若要使用 Spring Integration 產生批次訊息,只需使用 BatchingRabbitTemplate
設定連出端點。
當接收批次訊息時,預設情況下,監聽器容器會擷取每個片段訊息,並且配接器會為每個片段產生一個 Message<?>
。從 5.2 版開始,如果容器的 deBatchingEnabled
屬性設定為 false
,則解批次化將由配接器執行,並產生單個 Message<List<?>>
,其酬載是片段酬載的列表(如果適用,則在轉換後)。
預設的 BatchingStrategy
是 SimpleBatchingStrategy
,但可以在配接器上覆寫此設定。
當重試操作需要復原時,批次處理必須使用 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 。 |