JDBC 訊息儲存庫

Spring Integration 提供了兩個 JDBC 特定的訊息儲存庫實作。JdbcMessageStore 適用於彙集器和聲明檢查模式。JdbcChannelMessageStore 實作提供更具針對性和可擴展性的實作,專門用於訊息通道。

請注意,您可以使用 JdbcMessageStore 來支援訊息通道,但 JdbcChannelMessageStore 針對該目的進行了最佳化。

從版本 5.0.11、5.1.2 開始,JdbcChannelMessageStore 的索引已最佳化。如果您的儲存庫中有大型訊息群組,您可能希望變更索引。此外,PriorityChannel 的索引已註解掉,因為除非您使用 JDBC 支援的此類通道,否則不需要它。
當使用 OracleChannelMessageStoreQueryProvider 時,必須新增優先順序通道索引,因為它包含在查詢的提示中。

初始化資料庫

在開始使用 JDBC 訊息儲存庫元件之前,您應該使用適當的物件佈建目標資料庫。

Spring Integration 隨附一些可用於初始化資料庫的範例腳本。在 spring-integration-jdbc JAR 檔案中,您可以在 org.springframework.integration.jdbc 套件中找到腳本。它為一系列常見的資料庫平台提供了範例建立和範例捨棄腳本。使用這些腳本的常見方法是在 Spring JDBC 資料來源初始化器中參考它們。請注意,這些腳本作為範例和所需表格和資料行名稱的規格提供。您可能會發現您需要為生產用途增強它們(例如,透過新增索引宣告)。

從版本 6.2 開始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 實作 SmartLifecycle 並在其各自的表格上執行 `SELECT COUNT` 查詢,在 start() 方法中確保所需的表格(根據提供的字首)存在於目標資料庫中。如果所需的表格不存在,則應用程式內容啟動失敗。可以透過 setCheckDatabaseOnStart(false) 停用檢查。

通用 JDBC 訊息儲存庫

JDBC 模組提供了 Spring Integration MessageStore(在聲明檢查模式中很重要)和 MessageGroupStore(在有狀態模式(例如彙集器)中很重要)的實作,並由資料庫支援。這兩個介面都由 JdbcMessageStore 實作,並且支援在 XML 中設定儲存庫實例,如下列範例所示

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定 JdbcTemplate 而不是 DataSource

下列範例顯示了一些其他選用屬性

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的範例中,我們指定了 LobHandler 來處理作為大型物件的訊息(對於 Oracle 而言通常是必要的),以及用於儲存庫產生的查詢中表格名稱的字首。表格名稱字首預設為 INT_

支援訊息通道

如果您打算使用 JDBC 支援訊息通道,我們建議使用 JdbcChannelMessageStore 實作。它僅與訊息通道結合使用。

支援的資料庫

JdbcChannelMessageStore 使用資料庫特定的 SQL 查詢從資料庫擷取訊息。因此,您必須在 JdbcChannelMessageStore 上設定 ChannelMessageStoreQueryProvider 屬性。此 channelMessageStoreQueryProvider 提供您指定的特定資料庫的 SQL 查詢。Spring Integration 提供對下列關係資料庫的支援

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果您的資料庫未列出,您可以實作 ChannelMessageStoreQueryProvider 介面並提供您自己的自訂查詢。

版本 4.0 將 MESSAGE_SEQUENCE 資料行新增至表格,以確保先進先出 (FIFO) 佇列,即使訊息儲存在同一毫秒也是如此。

從版本 6.2 開始,ChannelMessageStoreQueryProvider 公開 isSingleStatementForPoll 旗標,其中 PostgresChannelMessageStoreQueryProvider 傳回 true,並且其輪詢查詢現在基於單一 DELETE…​RETURNING 陳述式。JdbcChannelMessageStore 會諮詢 isSingleStatementForPoll 選項,並且如果僅支援單一輪詢陳述式,則會略過個別的 DELETE 陳述式。

自訂訊息插入

自版本 5.0 以來,透過覆寫 ChannelMessageStorePreparedStatementSetter 類別,您可以為 JdbcChannelMessageStore 中的訊息插入提供自訂實作。您可以使用它來設定不同的資料行或變更表格結構或序列化策略。例如,您可以將其結構儲存為 JSON 字串,而不是預設序列化為 byte[]

下列範例使用 setValues 的預設實作來儲存常見的資料行,並覆寫行為以將訊息有效負載儲存為 varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

一般而言,我們不建議使用關係資料庫進行佇列。相反地,如果可能,請考慮改用 JMS 或 AMQP 支援的通道。如需進一步參考,請參閱下列資源

如果您仍然計劃將資料庫用作佇列,請考慮使用 PostgreSQL 及其通知機制,該機制在後續章節中說明。

並行輪詢

輪詢訊息通道時,您可以選擇使用 TaskExecutor 參考設定相關聯的 Poller

但是請記住,如果您使用 JDBC 支援的訊息通道,並且計劃使用多個執行緒以交易方式輪詢通道和後續的訊息儲存庫,則應確保您使用支援多版本並行控制 (MVCC) 的關係資料庫。否則,鎖定可能會成為問題,並且在使用多個執行緒時,效能可能無法如預期實現。例如,Apache Derby 在這方面存在問題。

為了實現更好的 JDBC 佇列輸送量,並避免不同執行緒可能從佇列輪詢相同 Message 時出現問題,當使用不支援 MVCC 的資料庫時,重要的是將 JdbcChannelMessageStoreusingIdCache 屬性設定為 true。下列範例顯示如何執行此操作

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

優先順序通道

從版本 4.0 開始,JdbcChannelMessageStore 實作 PriorityCapableChannelMessageStore 並提供 priorityEnabled 選項,使其可以用作 priority-queue 實例的 message-store 參考。為此,INT_CHANNEL_MESSAGE 表格具有 MESSAGE_PRIORITY 資料行,用於儲存 PRIORITY 訊息標頭的值。此外,新的 MESSAGE_SEQUENCE 資料行讓我們能夠實現穩健的先進先出 (FIFO) 輪詢機制,即使在同一毫秒內儲存多個具有相同優先順序的訊息也是如此。訊息是從資料庫中輪詢(選取)的,並依 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 排序。

我們不建議對優先順序和非優先順序佇列通道使用相同的 JdbcChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存庫,並且佇列通道未保留正確的 FIFO 佇列語意。但是,相同的 INT_CHANNEL_MESSAGE 表格(甚至 region)可以用於兩種 JdbcChannelMessageStore 類型。若要設定該情境,您可以從另一個訊息儲存庫 Bean 擴充一個訊息儲存庫 Bean,如下列範例所示
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

分割訊息儲存庫

常見的做法是將 JdbcMessageStore 用作同一應用程式中應用程式群組或節點的全域儲存庫。為了提供一些針對名稱衝突的保護,並提供對資料庫中繼資料設定的控制,訊息儲存庫允許以兩種方式分割表格。一種方法是透過變更字首(如先前所述)來使用個別的表格名稱。另一種方法是為分割單一表格中的資料指定 region 名稱。第二種方法的一個重要用例是當 MessageStore 管理支援 Spring Integration 訊息通道的持久性佇列時。持久性通道的訊息資料在儲存庫中以通道名稱為鍵。因此,如果通道名稱不是全域唯一的,則通道可能會擷取不適用於它們的資料。為了避免這種危險,您可以使用訊息儲存庫 region 來為具有相同邏輯名稱的不同實體通道保持資料分離。

PostgreSQL:接收推送通知

PostgreSQL 提供了用於接收資料庫表格操作時的推送通知的監聽和通知框架。Spring Integration 利用此機制(從版本 6.0 開始),以允許在將新訊息新增至 JdbcChannelMessageStore 時接收推送通知。當使用此功能時,必須定義資料庫觸發程序,該觸發程序可以在 Spring Integration 的 JDBC 模組中包含的 schema-postgresql.sql 檔案的註解部分找到。

推送通知透過 PostgresChannelMessageTableSubscriber 類別接收,該類別允許其訂閱者在任何給定 regiongroupId 的新訊息到達時接收回呼。即使訊息附加在不同的 JVM 上,但附加到相同的資料庫,也會收到這些通知。PostgresSubscribableChannel 實作使用 PostgresChannelMessageTableSubscriber.Subscription 合約,以從儲存庫中提取訊息,以回應上述 PostgresChannelMessageTableSubscriber 通知。

例如,可以按如下方式接收 some group 的推送通知

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

交易支援

從版本 6.0.5 開始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 將在交易中通知訂閱者。訂閱者中的例外狀況將導致交易回滾,並且訊息將放回訊息儲存庫中。預設情況下,交易支援未啟動。

重試

從版本 6.0.5 開始,可以透過將 RetryTemplate 提供給 PostgresSubscribableChannel 來指定重試原則。預設情況下,不執行重試。

任何作用中的 PostgresChannelMessageTableSubscriber 都會在其作用中的生命週期內佔用獨佔 JDBC Connection。因此,重要的是此連線不是來自共用 DataSource。此類連線共用通常預期發出的連線會在預先定義的逾時視窗內關閉。

對於這種對獨佔連線的需求,也建議 JVM 僅執行單一 PostgresChannelMessageTableSubscriber,該訂閱者可用於註冊任何數量的訂閱。