JDBC 訊息儲存庫
Spring Integration 提供了兩個 JDBC 特定的訊息儲存庫實作。JdbcMessageStore
適用於彙集器和聲明檢查模式。JdbcChannelMessageStore
實作提供更具針對性和可擴展性的實作,專門用於訊息通道。
請注意,您可以使用 JdbcMessageStore
來支援訊息通道,但 JdbcChannelMessageStore
針對該目的進行了最佳化。
從版本 5.0.11、5.1.2 開始,JdbcChannelMessageStore 的索引已最佳化。如果您的儲存庫中有大型訊息群組,您可能希望變更索引。此外,PriorityChannel 的索引已註解掉,因為除非您使用 JDBC 支援的此類通道,否則不需要它。 |
當使用 OracleChannelMessageStoreQueryProvider 時,必須新增優先順序通道索引,因為它包含在查詢的提示中。 |
初始化資料庫
在開始使用 JDBC 訊息儲存庫元件之前,您應該使用適當的物件佈建目標資料庫。
Spring Integration 隨附一些可用於初始化資料庫的範例腳本。在 spring-integration-jdbc
JAR 檔案中,您可以在 org.springframework.integration.jdbc
套件中找到腳本。它為一系列常見的資料庫平台提供了範例建立和範例捨棄腳本。使用這些腳本的常見方法是在 Spring JDBC 資料來源初始化器中參考它們。請注意,這些腳本作為範例和所需表格和資料行名稱的規格提供。您可能會發現您需要為生產用途增強它們(例如,透過新增索引宣告)。
從版本 6.2 開始,JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
和 DefaultLockRepository
實作 SmartLifecycle
並在其各自的表格上執行 `SELECT COUNT` 查詢,在 start()
方法中確保所需的表格(根據提供的字首)存在於目標資料庫中。如果所需的表格不存在,則應用程式內容啟動失敗。可以透過 setCheckDatabaseOnStart(false)
停用檢查。
通用 JDBC 訊息儲存庫
JDBC 模組提供了 Spring Integration MessageStore
(在聲明檢查模式中很重要)和 MessageGroupStore
(在有狀態模式(例如彙集器)中很重要)的實作,並由資料庫支援。這兩個介面都由 JdbcMessageStore
實作,並且支援在 XML 中設定儲存庫實例,如下列範例所示
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 JdbcTemplate
而不是 DataSource
。
下列範例顯示了一些其他選用屬性
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的範例中,我們指定了 LobHandler
來處理作為大型物件的訊息(對於 Oracle 而言通常是必要的),以及用於儲存庫產生的查詢中表格名稱的字首。表格名稱字首預設為 INT_
。
支援訊息通道
如果您打算使用 JDBC 支援訊息通道,我們建議使用 JdbcChannelMessageStore
實作。它僅與訊息通道結合使用。
支援的資料庫
JdbcChannelMessageStore
使用資料庫特定的 SQL 查詢從資料庫擷取訊息。因此,您必須在 JdbcChannelMessageStore
上設定 ChannelMessageStoreQueryProvider
屬性。此 channelMessageStoreQueryProvider
提供您指定的特定資料庫的 SQL 查詢。Spring Integration 提供對下列關係資料庫的支援
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果您的資料庫未列出,您可以實作 ChannelMessageStoreQueryProvider
介面並提供您自己的自訂查詢。
版本 4.0 將 MESSAGE_SEQUENCE
資料行新增至表格,以確保先進先出 (FIFO) 佇列,即使訊息儲存在同一毫秒也是如此。
從版本 6.2 開始,ChannelMessageStoreQueryProvider
公開 isSingleStatementForPoll
旗標,其中 PostgresChannelMessageStoreQueryProvider
傳回 true
,並且其輪詢查詢現在基於單一 DELETE…RETURNING
陳述式。JdbcChannelMessageStore
會諮詢 isSingleStatementForPoll
選項,並且如果僅支援單一輪詢陳述式,則會略過個別的 DELETE
陳述式。
自訂訊息插入
自版本 5.0 以來,透過覆寫 ChannelMessageStorePreparedStatementSetter
類別,您可以為 JdbcChannelMessageStore
中的訊息插入提供自訂實作。您可以使用它來設定不同的資料行或變更表格結構或序列化策略。例如,您可以將其結構儲存為 JSON 字串,而不是預設序列化為 byte[]
。
下列範例使用 setValues
的預設實作來儲存常見的資料行,並覆寫行為以將訊息有效負載儲存為 varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
一般而言,我們不建議使用關係資料庫進行佇列。相反地,如果可能,請考慮改用 JMS 或 AMQP 支援的通道。如需進一步參考,請參閱下列資源 如果您仍然計劃將資料庫用作佇列,請考慮使用 PostgreSQL 及其通知機制,該機制在後續章節中說明。 |
並行輪詢
輪詢訊息通道時,您可以選擇使用 TaskExecutor
參考設定相關聯的 Poller
。
但是請記住,如果您使用 JDBC 支援的訊息通道,並且計劃使用多個執行緒以交易方式輪詢通道和後續的訊息儲存庫,則應確保您使用支援多版本並行控制 (MVCC) 的關係資料庫。否則,鎖定可能會成為問題,並且在使用多個執行緒時,效能可能無法如預期實現。例如,Apache Derby 在這方面存在問題。 為了實現更好的 JDBC 佇列輸送量,並避免不同執行緒可能從佇列輪詢相同
|
優先順序通道
從版本 4.0 開始,JdbcChannelMessageStore
實作 PriorityCapableChannelMessageStore
並提供 priorityEnabled
選項,使其可以用作 priority-queue
實例的 message-store
參考。為此,INT_CHANNEL_MESSAGE
表格具有 MESSAGE_PRIORITY
資料行,用於儲存 PRIORITY
訊息標頭的值。此外,新的 MESSAGE_SEQUENCE
資料行讓我們能夠實現穩健的先進先出 (FIFO) 輪詢機制,即使在同一毫秒內儲存多個具有相同優先順序的訊息也是如此。訊息是從資料庫中輪詢(選取)的,並依 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
排序。
我們不建議對優先順序和非優先順序佇列通道使用相同的 JdbcChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存庫,並且佇列通道未保留正確的 FIFO 佇列語意。但是,相同的 INT_CHANNEL_MESSAGE 表格(甚至 region )可以用於兩種 JdbcChannelMessageStore 類型。若要設定該情境,您可以從另一個訊息儲存庫 Bean 擴充一個訊息儲存庫 Bean,如下列範例所示 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分割訊息儲存庫
常見的做法是將 JdbcMessageStore
用作同一應用程式中應用程式群組或節點的全域儲存庫。為了提供一些針對名稱衝突的保護,並提供對資料庫中繼資料設定的控制,訊息儲存庫允許以兩種方式分割表格。一種方法是透過變更字首(如先前所述)來使用個別的表格名稱。另一種方法是為分割單一表格中的資料指定 region
名稱。第二種方法的一個重要用例是當 MessageStore
管理支援 Spring Integration 訊息通道的持久性佇列時。持久性通道的訊息資料在儲存庫中以通道名稱為鍵。因此,如果通道名稱不是全域唯一的,則通道可能會擷取不適用於它們的資料。為了避免這種危險,您可以使用訊息儲存庫 region
來為具有相同邏輯名稱的不同實體通道保持資料分離。
PostgreSQL:接收推送通知
PostgreSQL 提供了用於接收資料庫表格操作時的推送通知的監聽和通知框架。Spring Integration 利用此機制(從版本 6.0 開始),以允許在將新訊息新增至 JdbcChannelMessageStore
時接收推送通知。當使用此功能時,必須定義資料庫觸發程序,該觸發程序可以在 Spring Integration 的 JDBC 模組中包含的 schema-postgresql.sql
檔案的註解部分找到。
推送通知透過 PostgresChannelMessageTableSubscriber
類別接收,該類別允許其訂閱者在任何給定 region
和 groupId
的新訊息到達時接收回呼。即使訊息附加在不同的 JVM 上,但附加到相同的資料庫,也會收到這些通知。PostgresSubscribableChannel
實作使用 PostgresChannelMessageTableSubscriber.Subscription
合約,以從儲存庫中提取訊息,以回應上述 PostgresChannelMessageTableSubscriber
通知。
例如,可以按如下方式接收 some group
的推送通知
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
交易支援
從版本 6.0.5 開始,在 PostgresSubscribableChannel
上指定 PlatformTransactionManager
將在交易中通知訂閱者。訂閱者中的例外狀況將導致交易回滾,並且訊息將放回訊息儲存庫中。預設情況下,交易支援未啟動。
重試
從版本 6.0.5 開始,可以透過將 RetryTemplate
提供給 PostgresSubscribableChannel
來指定重試原則。預設情況下,不執行重試。
任何作用中的 對於這種對獨佔連線的需求,也建議 JVM 僅執行單一 |