Redis 支援
Spring Integration 2.1 引入了對 Redis 的支援:「一個開放原始碼的進階鍵值儲存區」。此支援以基於 Redis 的 MessageStore
以及發佈-訂閱訊息傳遞配接器的形式提供,這些配接器透過 Redis 的 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令獲得支援。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.3.5"
您也需要包含 Redis 用戶端相依性,例如 Lettuce。
若要下載、安裝和執行 Redis,請參閱 Redis 文件。
連線到 Redis
若要開始與 Redis 互動,您首先需要連線到它。Spring Integration 使用另一個 Spring 專案 Spring Data Redis 提供的支援,該專案提供典型的 Spring 建構:ConnectionFactory
和 Template
。這些抽象簡化了與多個 Redis 用戶端 Java API 的整合。目前,Spring Data Redis 支援 Jedis 和 Lettuce。
使用 RedisConnectionFactory
若要連線到 Redis,您可以使用 RedisConnectionFactory
介面的其中一個實作。以下列表顯示了介面定義
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下範例示範如何在 Java 中建立 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下範例示範如何在 Spring 的 XML 設定中建立 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
的實作提供了一組屬性,例如 port 和 host,您可以在需要時設定這些屬性。一旦您擁有 RedisConnectionFactory
的實例,您就可以建立 RedisTemplate
的實例,並使用 RedisConnectionFactory
注入它。
使用 RedisTemplate
與 Spring 中的其他樣板類別(例如 JdbcTemplate
和 JmsTemplate
)一樣,RedisTemplate
是一個協助程式類別,可簡化 Redis 資料存取程式碼。有關 RedisTemplate
及其變體(例如 StringRedisTemplate
)的更多資訊,請參閱 Spring Data Redis 文件。
以下範例示範如何在 Java 中建立 RedisTemplate
的實例
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下範例示範如何在 Spring 的 XML 設定中建立 RedisTemplate
的實例
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 進行訊息傳遞
如 簡介 中所述,Redis 透過其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供對發佈-訂閱訊息傳遞的支援。與 JMS 和 AMQP 一樣,Spring Integration 提供了訊息通道和配接器,用於透過 Redis 發送和接收訊息。
Redis 發佈/訂閱通道
與 JMS 類似,在某些情況下,生產者和消費者都旨在成為同一應用程式的一部分,在同一程序中執行。您可以透過使用一對輸入和輸出通道配接器來完成此操作。但是,與 Spring Integration 的 JMS 支援一樣,有一種更簡單的方法可以解決此用例。您可以建立發佈-訂閱通道,如下列範例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
的行為很像主 Spring Integration 命名空間中的一般 <publish-subscribe-channel/>
元素。它可以被任何端點的 input-channel
和 output-channel
屬性引用。不同之處在於,此通道由 Redis 主題名稱支援:topic-name
屬性指定的 String
值。但是,與 JMS 不同,此主題不必事先建立,甚至不必由 Redis 自動建立。在 Redis 中,主題是簡單的 String
值,充當位址的角色。生產者和消費者可以使用相同的 String
值作為其主題名稱來進行通訊。簡單地訂閱此通道意味著生產端點和消費端點之間可以進行非同步發佈-訂閱訊息傳遞。但是,與透過在簡單的 Spring Integration <channel/>
元素中新增 <queue/>
元素而建立的非同步訊息通道不同,訊息不會儲存在記憶體內佇列中。相反,這些訊息會透過 Redis 傳遞,這讓您可以依靠其對持久性和叢集化的支援,以及與其他非 Java 平台的互操作性。
Redis 輸入通道配接器
Redis 輸入通道配接器 (RedisInboundChannelAdapter
) 將傳入的 Redis 訊息轉換為 Spring 訊息,方式與其他輸入配接器相同。它接收平台特定的訊息(在本例中為 Redis),並使用 MessageConverter
策略將其轉換為 Spring 訊息。以下範例示範如何設定 Redis 輸入通道配接器
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的範例顯示了 Redis 輸入通道配接器的簡單但完整的設定。請注意,前面的設定依賴於熟悉的 Spring 自動探索某些 Bean 的範例。在本例中,redisConnectionFactory
被隱式注入到配接器中。您可以透過使用 connection-factory
屬性來明確指定它。
另請注意,前面的設定使用自訂 MessageConverter
注入配接器。該方法與 JMS 類似,其中 MessageConverter
實例用於在 Redis 訊息和 Spring Integration 訊息 Payload 之間進行轉換。預設值為 SimpleMessageConverter
。
輸入配接器可以訂閱多個主題名稱,因此 topics
屬性中包含逗號分隔的值集。
自 3.0 版起,輸入配接器除了現有的 topics
屬性外,現在還具有 topic-patterns
屬性。此屬性包含逗號分隔的 Redis 主題模式集。有關 Redis 發佈-訂閱的更多資訊,請參閱 Redis Pub/Sub。
輸入配接器可以使用 RedisSerializer
來還原序列化 Redis 訊息的主體。<int-redis:inbound-channel-adapter>
的 serializer
屬性可以設定為空字串,這會導致 RedisSerializer
屬性的值為 null
。在這種情況下,Redis 訊息的原始 byte[]
主體將作為訊息 Payload 提供。
自 5.0 版起,您可以使用 <int-redis:inbound-channel-adapter>
的 task-executor
屬性,為輸入配接器提供 Executor
實例。此外,接收到的 Spring Integration 訊息現在具有 RedisHeaders.MESSAGE_SOURCE
標頭,以指示發佈訊息的來源:主題或模式。您可以將其用於下游路由邏輯。
Redis 輸出通道配接器
Redis 輸出通道配接器將傳出的 Spring Integration 訊息轉換為 Redis 訊息,方式與其他輸出配接器相同。它接收 Spring Integration 訊息,並使用 MessageConverter
策略將其轉換為平台特定的訊息(在本例中為 Redis)。以下範例示範如何設定 Redis 輸出通道配接器
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
此設定與 Redis 輸入通道配接器平行。配接器隱式注入了 RedisConnectionFactory
,後者定義為 Bean 名稱 redisConnectionFactory
。此範例還包含可選(和自訂)的 MessageConverter
(testConverter
Bean)。
自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter>
提供 topic
屬性的替代方案:您可以使用 topic-expression
屬性在執行階段判斷訊息的 Redis 主題。這些屬性是互斥的。
Redis 佇列輸入通道配接器
Spring Integration 3.0 引入了佇列輸入通道配接器,用於從 Redis 列表「彈出」訊息。預設情況下,它使用「右彈出」,但您可以將其設定為改用「左彈出」。配接器是訊息驅動的。它使用內部監聽器執行緒,並且不使用 Poller。
以下列表顯示了 queue-inbound-channel-adapter
的所有可用屬性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 組件 Bean 名稱。如果您未提供 channel 屬性,則會在應用程式上下文中建立並註冊 DirectChannel ,並以此 id 屬性作為 Bean 名稱。在這種情況下,端點本身會以 Bean 名稱 id 加上 .adapter 註冊。(如果 Bean 名稱是 thing1 ,則端點註冊為 thing1.adapter 。) |
2 | 要將來自此端點的 Message 實例發送到其中的 MessageChannel 。 |
3 | 一個 SmartLifecycle 屬性,用於指定此端點是否應在應用程式上下文啟動後自動啟動。預設值為 true 。 |
4 | 一個 SmartLifecycle 屬性,用於指定此端點啟動的階段。預設值為 0 。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設值為 redisConnectionFactory 。 |
6 | Redis 列表的名稱,佇列式 'pop' 操作在此列表上執行以取得 Redis 訊息。 |
7 | 當從端點的監聽任務收到例外狀況時,要將 ErrorMessage 實例發送到其中的 MessageChannel 。預設情況下,底層 MessagePublishingErrorHandler 使用應用程式上下文中的預設 errorChannel 。 |
8 | RedisSerializer Bean 引用。它可以是空字串,這表示「沒有序列化器」。在這種情況下,來自輸入 Redis 訊息的原始 byte[] 作為 Message Payload 發送到 channel 。預設情況下,它是 JdkSerializationRedisSerializer 。 |
9 | 'pop' 操作等待來自佇列的 Redis 訊息的逾時時間(以毫秒為單位)。預設值為 1 秒。 |
10 | 監聽器任務在 'pop' 操作發生例外狀況後,在重新啟動監聽器任務之前應休眠的時間(以毫秒為單位)。 |
11 | 指定此端點是否預期來自 Redis 佇列的資料包含整個 Message 實例。如果此屬性設定為 true ,則 serializer 不能為空字串,因為訊息需要某種形式的反序列化(預設為 JDK 序列化)。其預設值為 false 。 |
12 | 對 Spring TaskExecutor (或標準 JDK 1.5+ Executor )Bean 的引用。它用於底層監聽任務。預設值為 SimpleAsyncTaskExecutor 。 |
13 | 指定此端點應使用「右彈出」(當 true 時)還是「左彈出」(當 false 時)來從 Redis 列表讀取訊息。如果 true ,則 Redis List 在與預設 Redis 佇列輸出通道配接器一起使用時充當 FIFO 佇列。將其設定為 false 以與使用「右推入」寫入列表的軟體一起使用,或實現堆疊式訊息順序。其預設值為 true 。自 4.3 版起。 |
必須為 task-executor 設定多個執行緒才能進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動監聽器任務時,可能會發生死鎖。errorChannel 可用於處理這些錯誤,以避免重新啟動,但最好不要將您的應用程式暴露於可能的死鎖情況。有關可能的 TaskExecutor 實作,請參閱 Spring Framework 參考手冊。 |
Redis 佇列輸出通道配接器
Spring Integration 3.0 引入了佇列輸出通道配接器,用於從 Spring Integration 訊息「推入」到 Redis 列表。預設情況下,它使用「左推入」,但您可以將其設定為改用「右推入」。以下列表顯示了 Redis queue-outbound-channel-adapter
的所有可用屬性
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 組件 Bean 名稱。如果您未提供 channel 屬性,則會在應用程式上下文中建立並註冊 DirectChannel ,並以此 id 屬性作為 Bean 名稱。在這種情況下,端點會以 id 加上 .adapter 的 Bean 名稱註冊。(如果 Bean 名稱是 thing1 ,則端點註冊為 thing1.adapter 。) |
2 | 此端點從中接收 Message 實例的 MessageChannel 。 |
3 | 對 RedisConnectionFactory Bean 的引用。預設值為 redisConnectionFactory 。 |
4 | Redis 列表的名稱,佇列式 'push' 操作在此列表上執行以發送 Redis 訊息。此屬性與 queue-expression 互斥。 |
5 | 用於判斷 Redis 列表名稱的 SpEL Expression 。它在執行階段使用傳入的 Message 作為 #root 變數。此屬性與 queue 互斥。 |
6 | RedisSerializer Bean 引用。預設值為 JdkSerializationRedisSerializer 。但是,對於 String Payload,如果未提供 serializer 引用,則會使用 StringRedisSerializer 。 |
7 | 指定此端點應僅將 Payload 或整個 Message 發送到 Redis 佇列。預設值為 true 。 |
8 | 指定此端點應使用「左推入」(當 true 時)還是「右推入」(當 false 時)來將訊息寫入 Redis 列表。如果 true ,則 Redis 列表在與預設 Redis 佇列輸入通道配接器一起使用時充當 FIFO 佇列。將其設定為 false 以與使用「左彈出」從列表讀取的軟體一起使用,或實現堆疊式訊息順序。其預設值為 true 。自 4.3 版起。 |
Redis 應用程式事件
自 Spring Integration 3.0 起,Redis 模組提供 IntegrationEvent
的實作,而後者又是 org.springframework.context.ApplicationEvent
。RedisExceptionEvent
封裝了來自 Redis 操作的例外狀況(端點作為事件的「來源」)。例如,<int-redis:queue-inbound-channel-adapter/>
在捕獲來自 BoundListOperations.rightPop
操作的例外狀況後會發出這些事件。例外狀況可以是任何通用 org.springframework.data.redis.RedisSystemException
或 org.springframework.data.redis.RedisConnectionFailureException
。使用 <int-event:inbound-channel-adapter/>
處理這些事件對於判斷背景 Redis 任務的問題和採取管理動作可能很有用。
Redis 訊息儲存區
如《企業整合模式》(EIP) 書籍中所述,訊息儲存區 可讓您持久保存訊息。當處理具有緩衝訊息能力(彙集器、重排序器等)的組件時,當可靠性是一個考量因素時,這可能很有用。在 Spring Integration 中,MessageStore
策略也為 聲明檢查 模式提供了基礎,EIP 中也對其進行了描述。
Spring Integration 的 Redis 模組提供了 RedisMessageStore
。以下範例示範如何將其與彙集器一起使用
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的範例是 Bean 設定,它期望 RedisConnectionFactory
作為建構子引數。
預設情況下,RedisMessageStore
使用 Java 序列化來序列化訊息。但是,如果您想使用不同的序列化技術(例如 JSON),您可以透過設定 RedisMessageStore
的 valueSerializer
屬性來提供您自己的序列化器。
從 4.3.10 版開始,Framework 為 Message
實例和 MessageHeaders
實例提供了 Jackson 序列化器和反序列化器實作 — 分別為 MessageJacksonDeserializer
和 MessageHeadersJacksonSerializer
。它們必須使用 ObjectMapper
的 SimpleModule
選項進行設定。此外,您應在 ObjectMapper
上設定 enableDefaultTyping
,為每個序列化的複雜物件新增類型資訊(如果您信任來源)。然後,該類型資訊將在反序列化期間使用。Framework 提供了一個名為 JacksonJsonUtils.messagingAwareMapper()
的實用方法,該方法已提供所有先前提及的屬性和序列化器。此實用方法帶有 trustedPackages
引數,用於限制反序列化的 Java 套件,以避免安全性漏洞。預設受信任套件:java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。若要在 RedisMessageStore
中管理 JSON 序列化,您必須以類似於以下範例的方式設定它
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
從 4.3.12 版開始,RedisMessageStore
支援 prefix
選項,以允許區分同一 Redis 伺服器上的儲存區實例。
Redis 通道訊息儲存區
RedisMessageStore
稍早展示 的組件將每個群組維護為單一鍵值 (群組 ID)。雖然您可以使用它來支援 QueueChannel
以實現持久性,但為此目的提供了一個專用的 RedisChannelMessageStore
(自 4.0 版本起)。此儲存區為每個通道使用 LIST
,在發送訊息時使用 LPUSH
,在接收訊息時使用 RPOP
。預設情況下,此儲存區也使用 JDK 序列化,但您可以修改值序列化程式,如稍早所述。
我們建議使用此儲存區來支援通道,而不是使用通用的 RedisMessageStore
。以下範例定義了一個 Redis 訊息儲存區,並在具有佇列的通道中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用於儲存資料的鍵具有以下形式:<storeBeanName>:<channelId>
(在前面的範例中,為 redisMessageStore:somePersistentQueueChannel
)。
此外,還提供了一個子類別 RedisChannelPriorityMessageStore
。當您將其與 QueueChannel
一起使用時,訊息會以 (FIFO) 優先順序接收。它使用標準的 IntegrationMessageHeaderAccessor.PRIORITY
標頭並支援優先順序值 (0 - 9
)。具有其他優先順序 (以及沒有優先順序的訊息) 的訊息會在任何具有優先順序的訊息之後以 FIFO 順序檢索。
這些儲存區僅實作 BasicMessageGroupStore ,而不實作 MessageGroupStore 。它們僅可用於支援 QueueChannel 等情況。 |
Redis Metadata 儲存區
Spring Integration 3.0 引入了一個新的基於 Redis 的 MetadataStore
(請參閱 Metadata 儲存區) 實作。您可以使用 RedisMetadataStore
來跨應用程式重新啟動維護 MetadataStore
的狀態。您可以將這個新的 MetadataStore
實作與以下配接器一起使用,例如
若要指示這些配接器使用新的 RedisMetadataStore
,請宣告一個名為 metadataStore
的 Spring Bean。Feed 輸入通道配接器和 feed 輸入通道配接器都會自動拾取並使用宣告的 RedisMetadataStore
。以下範例示範如何宣告此類 Bean
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由 RedisProperties
支援。與它的互動使用 BoundHashOperations
,而這又需要整個 Properties
儲存區的 key
。在 MetadataStore
的情況下,此 key
扮演區域的角色,這在分散式環境中非常有用,當多個應用程式使用相同的 Redis 伺服器時。預設情況下,此 key
的值為 MetaData
。
從 4.0 版本開始,此儲存區實作了 ConcurrentMetadataStore
,使其可以在多個應用程式實例之間可靠地共享,其中僅允許一個實例儲存或修改鍵的值。
您無法將 RedisMetadataStore.replace() (例如,在 AbstractPersistentAcceptOnceFileListFilter 中) 與 Redis 叢集一起使用,因為目前不支援用於原子性的 WATCH 命令。 |
Redis 儲存區輸入通道配接器
Redis 儲存區輸入通道配接器是一個輪詢消費者,它從 Redis 集合中讀取資料並將其作為 Message
有效負載發送。以下範例示範如何配置 Redis 儲存區輸入通道配接器
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的範例示範如何使用 store-inbound-channel-adapter
元素配置 Redis 儲存區輸入通道配接器,並為各種屬性提供值,例如
-
key
或key-expression
:正在使用的集合的鍵名稱。 -
collection-type
:此配接器支援的集合類型列舉。支援的集合為LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
:對o.s.data.redis.connection.RedisConnectionFactory
實例的引用。 -
redis-template
:對o.s.data.redis.core.RedisTemplate
實例的引用。 -
所有其他輸入配接器共有的其他屬性 (例如 'channel')。
您不能同時設定 redis-template 和 connection-factory 。 |
預設情況下,配接器使用
|
由於它對 key
具有文字值,因此前面的範例相對簡單且靜態。有時,您可能需要根據某些條件在執行時變更鍵的值。若要這麼做,請改用 key-expression
,其中提供的運算式可以是任何有效的 SpEL 運算式。
此外,您可能希望對從 Redis 集合讀取的已成功處理的資料執行一些後處理。例如,您可能希望在處理值之後移動或移除值。您可以透過使用 Spring Integration 2.2 中新增的事務同步功能來執行此操作。以下範例使用 key-expression
和事務同步
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以透過使用 transactional
元素將您的輪詢器宣告為事務性的。此元素可以引用真實的事務管理器 (例如,如果您的流程的其他部分調用 JDBC)。如果您沒有「真實」的事務,則可以使用 o.s.i.transaction.PseudoTransactionManager
,它是 Spring 的 PlatformTransactionManager
的實作,並且在沒有實際事務時啟用 Redis 配接器的事務同步功能的使用。
這不會使 Redis 活動本身成為事務性的。它允許在成功 (提交) 之前或之後或失敗 (回滾) 之後同步採取的動作。 |
一旦您的輪詢器是事務性的,您就可以在 transactional
元素上設定 o.s.i.transaction.TransactionSynchronizationFactory
的實例。TransactionSynchronizationFactory
建立 TransactionSynchronization
的實例。為了您的方便,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory
,它允許您配置 SpEL 運算式,其執行與事務協調 (同步)。支援用於提交前、提交後和回滾後的運算式,以及通道 (每種事件一個),評估結果 (如果有的話) 會發送到這些通道。對於每個子元素,您可以指定 expression
和 channel
屬性。如果僅存在 channel
屬性,則收到的訊息會作為特定同步案例的一部分發送到那裡。如果僅存在 expression
屬性,且運算式的結果是非空值,則會產生一個訊息,其有效負載為結果,並發送到預設通道 (NullChannel
) 並顯示在記錄中 (在 DEBUG
層級)。如果您希望評估結果轉到特定通道,請新增 channel
屬性。如果運算式的結果為 null 或 void,則不會產生訊息。
RedisStoreMessageSource
新增了一個 store
屬性,其中包含一個綁定到事務 IntegrationResourceHolder
的 RedisStore
實例,可以從 TransactionSynchronizationProcessor
實作中存取。
有關事務同步的更多資訊,請參閱 事務同步。
RedisStore 輸出通道配接器
RedisStore 輸出通道配接器允許您將訊息有效負載寫入 Redis 集合,如下列範例所示
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置是一個 Redis 儲存區輸出通道配接器,透過使用 store-inbound-channel-adapter
元素。它為各種屬性提供值,例如
-
key
或key-expression
:正在使用的集合的鍵名稱。 -
extract-payload-elements
:如果設定為true
(預設值) 且有效負載是「多值」物件 (即Collection
或Map
) 的實例,則會使用 "addAll" 和 "putAll" 語意儲存它。否則,如果設定為false
,則有效負載將儲存為單一條目,而與其類型無關。如果有效負載不是「多值」物件的實例,則會忽略此屬性的值,並且始終將有效負載儲存為單一條目。 -
collection-type
:此配接器支援的Collection
類型列舉。支援的集合為LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
:SpEL 運算式,傳回正在儲存的條目的鍵名稱。它僅在collection-type
為MAP
或PROPERTIES
且 'extract-payload-elements' 為 false 時適用。 -
connection-factory
:對o.s.data.redis.connection.RedisConnectionFactory
實例的引用。 -
redis-template
:對o.s.data.redis.core.RedisTemplate
實例的引用。 -
所有其他輸入配接器共有的其他屬性 (例如 'channel')。
您不能同時設定 redis-template 和 connection-factory 。 |
預設情況下,配接器使用 StringRedisTemplate 。這對鍵、值、雜湊鍵和雜湊值使用 StringRedisSerializer 實例。但是,如果 extract-payload-elements 設定為 false ,則將使用具有鍵和雜湊鍵的 StringRedisSerializer 實例,以及值和雜湊值的 JdkSerializationRedisSerializer 實例的 RedisTemplate 。使用 JDK 序列化程式,重要的是要了解 Java 序列化用於所有值,無論該值是否實際上是集合。如果您需要更多地控制值的序列化,請考慮提供您自己的 RedisTemplate ,而不是依賴這些預設值。 |
由於它對 key
和其他屬性具有文字值,因此前面的範例相對簡單且靜態。有時,您可能需要根據某些條件在執行時動態變更值。若要這麼做,請使用它們的 -expression
等效項 (key-expression
、map-key-expression
等),其中提供的運算式可以是任何有效的 SpEL 運算式。
Redis 輸出命令閘道
Spring Integration 4.0 引入了 Redis 命令閘道,讓您可以使用通用的 RedisConnection#execute
方法執行任何標準 Redis 命令。以下列表顯示了 Redis 輸出閘道的可用屬性
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 此端點從中接收 Message 實例的 MessageChannel 。 |
2 | 此端點在其中發送回覆 Message 實例的 MessageChannel 。 |
3 | 指定此輸出閘道是否必須傳回非空值。預設值為 true 。當 Redis 傳回 null 值時,會拋出 ReplyRequiredException 。 |
4 | 等待直到發送回覆訊息的逾時時間 (以毫秒為單位)。它通常應用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設值為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 對 RedisTemplate Bean 的引用。它與 'connection-factory' 屬性互斥。 |
7 | 對 org.springframework.data.redis.serializer.RedisSerializer 實例的引用。如果需要,它用於將每個命令參數序列化為 byte[]。 |
8 | 傳回命令鍵的 SpEL 運算式。預設值為 redis_command 訊息標頭。它不得評估為 null 。 |
9 | 逗號分隔的 SpEL 運算式,評估為命令參數。與 arguments-strategy 屬性互斥。如果您未提供任何屬性,則 payload 將用作命令參數。參數運算式可以評估為 'null' 以支援可變數量的參數。 |
10 | 一個 boolean 標誌,用於指定在配置 argument-expressions 時,評估的 Redis 命令字串是否可用作 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中運算式評估內容中的 #cmd 變數。否則,將忽略此屬性。 |
11 | 對 o.s.i.redis.outbound.ArgumentsStrategy 實例的引用。它與 argument-expressions 屬性互斥。如果您未提供任何屬性,則 payload 將用作命令參數。 |
您可以使用 <int-redis:outbound-gateway>
作為通用元件來執行任何所需的 Redis 操作。以下範例示範如何從 Redis 原子數字取得遞增值
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
有效負載應具有名稱 redisCounter
,這可以由 org.springframework.data.redis.support.atomic.RedisAtomicInteger
Bean 定義提供。
RedisConnection#execute
方法具有通用 Object
作為其傳回類型。實際結果取決於命令類型。例如,MGET
傳回 List<byte[]>
。有關命令、其參數和結果類型的更多資訊,請參閱 Redis 規格。
Redis 佇列輸出閘道
Spring Integration 引入了 Redis 佇列輸出閘道來執行請求和回覆案例。它將對話 UUID
推送到提供的 queue
,將值及其 UUID
作為其鍵推送到 Redis 列表,並等待來自 Redis 列表的回覆,該列表的鍵為 UUID
加上 .reply
。每個互動都使用不同的 UUID。以下列表顯示了 Redis 輸出閘道的可用屬性
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 此端點從中接收 Message 實例的 MessageChannel 。 |
2 | 此端點在其中發送回覆 Message 實例的 MessageChannel 。 |
3 | 指定此輸出閘道是否必須傳回非空值。此值預設為 false 。否則,當 Redis 傳回 null 值時,會拋出 ReplyRequiredException 。 |
4 | 等待直到發送回覆訊息的逾時時間 (以毫秒為單位)。它通常應用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設值為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 輸出閘道在其中發送對話 UUID 的 Redis 列表名稱。 |
7 | 當註冊多個閘道時,此輸出閘道的順序。 |
8 | RedisSerializer Bean 引用。它可以是空字串,這表示「無序列化程式」。在這種情況下,來自輸入 Redis 訊息的原始 byte[] 會作為 Message 有效負載發送到 channel 。預設情況下,它是 JdkSerializationRedisSerializer 。 |
9 | 指定此端點是否期望來自 Redis 佇列的資料包含整個 Message 實例。如果此屬性設定為 true ,則 serializer 不能為空字串,因為訊息需要某種形式的反序列化 (預設情況下為 JDK 序列化)。 |
Redis 佇列輸入閘道
Spring Integration 4.1 引入了 Redis 佇列輸入閘道來執行請求和回覆案例。它從提供的 queue
中彈出一個對話 UUID
,從 Redis 列表中彈出值及其 UUID
作為其鍵,並將回覆推送到 Redis 列表,該列表的鍵為 UUID
加上 .reply
。以下列表顯示了 Redis 佇列輸入閘道的可用屬性
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | 此端點在其中發送從 Redis 資料建立的 Message 實例的 MessageChannel 。 |
2 | 此端點在其中等待回覆 Message 實例的 MessageChannel 。可選 - replyChannel 標頭仍在使用中。 |
3 | 對 Spring TaskExecutor (或標準 JDK Executor ) Bean 的引用。它用於底層監聽任務。預設值為 SimpleAsyncTaskExecutor 。 |
4 | 等待直到發送回覆訊息的逾時時間 (以毫秒為單位)。它通常應用於基於佇列的有限回覆通道。 |
5 | 對 RedisConnectionFactory Bean 的引用。預設值為 redisConnectionFactory 。它與 'redis-template' 屬性互斥。 |
6 | 對話 UUID 的 Redis 列表名稱。 |
7 | 當註冊多個閘道時,此輸入閘道的順序。 |
8 | RedisSerializer Bean 引用。它可以是空字串,這表示「無序列化程式」。在這種情況下,來自輸入 Redis 訊息的原始 byte[] 會作為 Message 有效負載發送到 channel 。預設值為 JdkSerializationRedisSerializer 。(請注意,在 4.3 之前的版本中,預設值為 StringRedisSerializer 。若要還原該行為,請提供對 StringRedisSerializer 的引用)。 |
9 | 等待直到提取接收訊息的逾時時間 (以毫秒為單位)。它通常應用於基於佇列的有限請求通道。 |
10 | 指定此端點是否期望來自 Redis 佇列的資料包含整個 Message 實例。如果此屬性設定為 true ,則 serializer 不能為空字串,因為訊息需要某種形式的反序列化 (預設情況下為 JDK 序列化)。 |
11 | 監聽器任務在「正確彈出」操作發生例外狀況後應休眠的時間 (以毫秒為單位),然後再重新啟動監聽器任務。 |
必須為 task-executor 設定多個執行緒才能進行處理;否則,當 RedisQueueMessageDrivenEndpoint 嘗試在錯誤後重新啟動監聽器任務時,可能會發生死鎖。errorChannel 可用於處理這些錯誤,以避免重新啟動,但最好不要將您的應用程式暴露於可能的死鎖情況。有關可能的 TaskExecutor 實作,請參閱 Spring Framework 參考手冊。 |
Redis Stream 輸出通道配接器
Spring Integration 5.4 引入了 Reactive Redis Stream 輸出通道配接器,用於將訊息有效負載寫入 Redis Stream。輸出通道配接器使用 ReactiveStreamOperations.add(…)
將 Record
新增到 Stream。以下範例示範如何使用 Java 配置和 Service 類別來實現 Redis Stream 輸出通道配接器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | 使用 ReactiveRedisConnectionFactory 和 Stream 名稱建構 ReactiveRedisStreamMessageHandler 的實例,以新增記錄。另一個建構函式變體基於 SpEL 運算式,用於根據請求訊息評估 Stream 鍵。 |
2 | 設定 RedisSerializationContext ,用於在新增到 Stream 之前序列化記錄鍵和值。 |
3 | 設定 HashMapper ,它提供 Java 類型和 Redis 雜湊/映射之間的合約。 |
4 | 如果為 'true',通道配接器將從請求訊息中提取有效負載,以新增 Stream 記錄。或將整個訊息用作值。預設值為 true 。 |
Redis Stream 輸入通道配接器
Spring Integration 5.4 引入了 Reactive Stream 輸入通道配接器,用於從 Redis Stream 讀取訊息。輸入通道配接器根據自動確認標誌使用 StreamReceiver.receive(…)
或 StreamReceiver.receiveAutoAck()
從 Redis Stream 讀取記錄。以下範例示範如何使用 Java 配置來實現 Redis Stream 輸入通道配接器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | 使用 ReactiveRedisConnectionFactory 和 Stream 鍵建構 ReactiveRedisStreamMessageProducer 的實例,以讀取記錄。 |
2 | 用於使用反應式基礎架構取用 Redis Stream 的 StreamReceiver.StreamReceiverOptions 。 |
3 | SmartLifecycle 屬性,用於指定此端點是否應在應用程式內容啟動後自動啟動。預設值為 true 。如果為 false ,則應手動啟動 RedisStreamMessageProducer messageProducer.start() 。 |
4 | 如果為 false ,則收到的訊息不會自動確認。訊息的確認將延遲到取用訊息的用戶端。預設值為 true 。 |
5 | 如果為 true ,則將建立消費者群組。在建立消費者群組期間,也會建立 Stream (如果尚不存在)。消費者群組追蹤訊息傳遞並區分消費者。預設值為 false 。 |
6 | 設定消費者群組名稱。預設值為定義的 Bean 名稱。 |
7 | 設定消費者名稱。從群組 my-group 以 my-consumer 身分讀取訊息。 |
8 | 要將訊息從此端點發送到其中的訊息通道。 |
9 | 定義讀取訊息的偏移量。預設值為 ReadOffset.latest() 。 |
10 | 如果為 'true',通道配接器將從 Record 中提取有效負載值。否則,整個 Record 將用作有效負載。預設值為 true 。 |
如果 autoAck
設定為 false
,則 Redis Stream 中的 Record
不會由 Redis 驅動程式自動確認,而是將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
標頭新增到訊息中,並將 SimpleAcknowledgment
實例作為值。當基於此類記錄完成訊息的業務邏輯時,目標整合流程有責任調用其 acknowledge()
回調。即使在還原序列化期間發生例外狀況且配置了 errorChannel
時,也需要類似的邏輯。因此,目標錯誤處理常式必須決定是否確認或否定此類失敗的訊息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
之外,ReactiveRedisStreamMessageProducer
還會將這些標頭填入要產生的訊息中:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
和 RedisHeaders.CONSUMER
。
從 5.5 版本開始,您可以在 ReactiveRedisStreamMessageProducer
上明確配置 StreamReceiver.StreamReceiverOptionsBuilder
選項,包括新引入的 onErrorResume
函數,如果 Redis Stream 消費者應在發生還原序列化錯誤時繼續輪詢,則需要此函數。預設函數會將訊息發送到錯誤通道 (如果提供),並可能確認失敗的訊息,如上所述。所有這些 StreamReceiver.StreamReceiverOptionsBuilder
都與外部提供的 StreamReceiver.StreamReceiverOptions
互斥。
Redis 鎖定登錄檔
Spring Integration 4.0 引入了 RedisLockRegistry
。某些元件 (例如,彙總器和重新排序器) 使用從 LockRegistry
實例取得的鎖定,以確保一次只有一個執行緒操作群組。DefaultLockRegistry
在單一元件中執行此功能。您現在可以在這些元件上配置外部鎖定登錄檔。當您將其與共用的 MessageGroupStore
一起使用時,可以使用 RedisLockRegistry
在多個應用程式實例之間提供此功能,以便一次只有一個實例可以操作群組。
當鎖定由本機執行緒釋放時,另一個本機執行緒通常可以立即取得鎖定。如果鎖定是由使用不同登錄檔實例的執行緒釋放的,則可能需要長達 100 毫秒才能取得鎖定。
為了避免「懸掛」鎖定 (當伺服器失敗時),此登錄檔中的鎖定會在預設 60 秒後過期,但您可以在登錄檔上配置此值。鎖定通常保持的時間要短得多。
由於鍵可能會過期,因此嘗試解鎖過期的鎖定會導致拋出例外狀況。但是,受此類鎖定保護的資源可能已遭洩露,因此應將此類例外狀況視為嚴重。您應將到期時間設定為足夠大的值以防止這種情況,但應將其設定得足夠低,以便在伺服器故障後在合理的時間內恢復鎖定。 |
從 5.0 版本開始,RedisLockRegistry
實作了 ExpirableLockRegistry
,它會移除上次取得時間超過 age
且目前未鎖定的鎖定。
從 5.5.6 版本開始,RedisLockRegistry
支援透過 RedisLockRegistry.setCacheCapacity()
自動清除 RedisLockRegistry.locks
中 redisLocks 的快取。有關更多資訊,請參閱其 JavaDocs。
從 5.5.13 版本開始,RedisLockRegistry
公開了 setRedisLockType(RedisLockType)
選項,以確定應以哪種模式發生 Redis 鎖定取得
-
RedisLockType.SPIN_LOCK
- 鎖定透過定期迴圈 (100 毫秒) 檢查是否可以取得鎖定來取得。預設值。 -
RedisLockType.PUB_SUB_LOCK
- 鎖定透過 Redis pub-sub 訂閱取得。
pub-sub 是首選模式 - 用戶端 Redis 伺服器之間的網路閒聊較少,並且效能更高 - 當訂閱收到有關其他程序中解鎖的通知時,鎖定會立即取得。但是,Redis 不支援 Master/Replica 連線中的 pub-sub (例如在 AWS ElastiCache 環境中),因此選擇忙碌旋轉模式作為預設值,以使登錄檔在任何環境中都能運作。