輸入通道適配器:輪詢多個伺服器和目錄

從 5.0.7 版本開始,RotatingServerAdvice 可用;當配置為 poller advice 時,輸入適配器可以輪詢多個伺服器和目錄。正常配置 advice 並將其新增至 poller 的 advice 鏈。DelegatingSessionFactory 用於選擇伺服器,請參閱 Delegating Session Factory 以取得更多資訊。Advice 配置包含 RotationPolicy.KeyDirectory 物件的列表。

範例
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此 advice 將輪詢伺服器 one 上的目錄 foo,直到沒有新檔案存在,然後移動到目錄 bar,然後移動到伺服器 two 上的目錄 baz,依此類推。

可以使用 fair 建構子參數修改此預設行為

fair
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在這種情況下,無論先前的輪詢是否傳回檔案,advice 都會移動到下一個伺服器/目錄。

或者,您可以提供自己的 RotationPolicy 以根據需要重新配置訊息來源

policy
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

custom
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression 屬性(同步器的 localFilenameGeneratorExpression)現在可以包含 #remoteDirectory 變數。這允許從不同目錄檢索的檔案下載到本機的相似目錄

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
使用此 advice 時,請勿在 poller 上設定 TaskExecutor;請參閱 Message Sources 的條件式 Poller 以取得更多資訊。