子元素
當此 Gateway
從 PollableChannel
接收消息時,您必須提供全域預設 Poller
,或為 Job Launching Gateway
提供 Poller
子元素。
-
Java
-
XML
以下範例展示如何在 Java 中提供 poller
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
以下範例展示如何在 XML 中提供 poller
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
使用資訊性消息提供回饋
由於 Spring Batch Job 可能長時間執行,因此提供進度資訊通常至關重要。例如,利益相關者可能希望在部分或全部批次 Job 失敗時收到通知。Spring Batch 透過以下方式支援收集此資訊:
-
主動輪詢
-
事件驅動的監聽器
當非同步啟動 Spring Batch Job 時(例如,透過使用 Job Launching Gateway),會返回 JobExecution
實例。因此,您可以使用 JobExecution.getJobId()
,透過使用 JobExplorer
從 JobRepository
檢索 JobExecution
的更新實例來持續輪詢狀態更新。但是,這被認為是次佳選擇,事件驅動方法是較佳的。
因此,Spring Batch 提供了監聽器,包括三個最常用的監聽器:
-
StepListener
-
ChunkListener
-
JobExecutionListener
在下圖所示的範例中,Spring Batch Job 已配置 StepExecutionListener
。因此,Spring Integration 接收並處理任何步驟之前或之後的事件。例如,您可以使用 Router
檢查接收到的 StepExecution
。根據該檢查的結果,可能會發生各種情況(例如將消息路由到郵件發送通道適配器),以便可以根據某些條件發送電子郵件通知。

以下由兩部分組成的範例展示如何配置監聽器以將消息發送到 Gateway
以處理 StepExecution
事件,並將其輸出記錄到 logging-channel-adapter
。
首先,建立通知整合 Bean。
-
Java
-
XML
以下範例展示如何在 Java 中建立通知整合 Bean
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要將 @IntegrationComponentScan 注解新增到您的配置中。 |
以下範例展示如何在 XML 中建立通知整合 Bean
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
其次,修改您的 Job 以新增步驟級別監聽器。
-
Java
-
XML
以下範例展示如何在 Java 中新增步驟級別監聽器
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importPayments", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.chunk(200, transactionManager)
.listener(notificationExecutionsListener())
// ...
.build();
)
.build();
}
以下範例展示如何在 XML 中新增步驟級別監聽器
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
非同步處理器
非同步處理器可協助您擴展項目的處理。在非同步處理器使用案例中,AsyncItemProcessor
作為分派器,在新執行緒上執行項目的 ItemProcessor
邏輯。項目完成後,Future
會傳遞到 AsynchItemWriter
以進行寫入。
因此,您可以透過使用非同步項目處理來提高效能,基本上讓您實作 fork-join 情境。AsyncItemWriter
收集結果,並在所有結果都可用時立即寫回 Chunk。
-
Java
-
XML
以下範例展示如何在 Java 中配置 AsyncItemProcessor
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
以下範例展示如何在 XML 中配置 AsyncItemProcessor
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
delegate
屬性參考您的 ItemProcessor
Bean,而 taskExecutor
屬性參考您選擇的 TaskExecutor
。
-
Java
-
XML
以下範例展示如何在 Java 中配置 AsyncItemWriter
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
以下範例展示如何在 XML 中配置 AsyncItemWriter
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
同樣,delegate
屬性實際上是對您的 ItemWriter
Bean 的參考。
外部化批次處理執行
到目前為止討論的整合方法建議了 Spring Integration 包裝 Spring Batch(如外殼)的使用案例。但是,Spring Batch 也可以在內部使用 Spring Integration。通過使用這種方法,Spring Batch 用戶可以將項目甚至 Chunk 的處理委派給外部進程。這讓您可以卸載複雜的處理。Spring Batch Integration 為以下項目提供專門支援:
-
遠程 Chunking
-
遠程 Partitioning
遠程 Chunking
下圖顯示了當您將 Spring Batch 與 Spring Integration 一起使用時,遠程 Chunking 的一種運作方式

更進一步來說,您還可以通過使用 ChunkMessageChannelItemWriter
(由 Spring Batch Integration 提供)來外部化 Chunk 處理,它會發送項目並收集結果。一旦發送,Spring Batch 將繼續讀取和分組項目的過程,而無需等待結果。相反,ChunkMessageChannelItemWriter
負責收集結果並將它們整合回 Spring Batch 流程。
借助 Spring Integration,您可以完全控制進程的並行性(例如,通過使用 QueueChannel
而不是 DirectChannel
)。此外,通過依賴 Spring Integration 豐富的通道適配器集合(例如 JMS 和 AMQP),您可以將批次 Job 的 Chunk 分發到外部系統進行處理。
-
Java
-
XML
具有要遠程 Chunking 的步驟的 Job 可能具有類似於以下 Java 的配置:
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
具有要遠程 Chunking 的步驟的 Job 可能具有類似於以下 XML 的配置:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
ItemReader
參考指向您要在管理器上用於讀取資料的 Bean。ItemWriter
參考指向特殊的 ItemWriter
(稱為 ChunkMessageChannelItemWriter
),如前所述。處理器(如果有的話)從管理器配置中省略,因為它是在 Worker 上配置的。在實作您的使用案例時,您應該檢查任何其他組件屬性,例如節流限制等等。
-
Java
-
XML
以下 Java 配置提供了基本管理器設定
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://127.0.0.1:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
以下 XML 配置提供了基本管理器設定
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
先前的配置為我們提供了許多 Bean。我們透過使用 ActiveMQ 以及 Spring Integration 提供的輸入和輸出 JMS 適配器來配置我們的消息中間件。如所示,我們的 itemWriter
Bean(由我們的 Job 步驟參考)使用 ChunkMessageChannelItemWriter
通過配置的中間件寫入 Chunk。
現在我們可以繼續 Worker 配置,如下例所示
-
Java
-
XML
以下範例展示 Java 中的 Worker 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://127.0.0.1:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
以下範例展示 XML 中的 Worker 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
大多數配置項目在管理器配置中看起來應該很熟悉。Worker 不需要訪問 Spring Batch JobRepository
,也不需要訪問實際的 Job 配置文件。主要感興趣的 Bean 是 chunkProcessorChunkHandler
。ChunkProcessorChunkHandler
的 chunkProcessor
屬性採用配置的 SimpleChunkProcessor
,您可以在其中提供對您的 ItemWriter
(以及可選的 ItemProcessor
)的參考,這些 ItemWriter
(和 ItemProcessor
)將在 Worker 從管理器接收 Chunk 時在 Worker 上運行。
有關更多資訊,請參閱「可擴展性」章節中關於 遠程 Chunking 的章節。
從 4.1 版開始,Spring Batch Integration 引入了 @EnableBatchIntegration
注解,可用於簡化遠程 Chunking 設定。此注解提供了兩個 Bean,您可以在應用程式環境定義中自動裝配它們:
-
RemoteChunkingManagerStepBuilderFactory
:配置管理器步驟 -
RemoteChunkingWorkerBuilder
:配置遠程 Worker 整合流程
這些 API 負責配置許多組件,如下圖所示

在管理器端,RemoteChunkingManagerStepBuilderFactory
讓您通過宣告配置管理器步驟:
-
Item Reader 用於讀取項目並將它們發送到 Worker
-
輸出通道(「傳出請求」)用於將請求發送到 Worker
-
輸入通道(「傳入回覆」)用於接收來自 Worker 的回覆
您無需顯式配置 ChunkMessageChannelItemWriter
和 MessagingTemplate
。(如果您找到這樣做的理由,仍然可以顯式配置它們)。
在 Worker 端,RemoteChunkingWorkerBuilder
讓您配置 Worker 以:
-
監聽管理器在輸入通道(「傳入請求」)上發送的請求
-
對於每個具有配置的
ItemProcessor
和ItemWriter
的請求,調用ChunkProcessorChunkHandler
的handleChunk
方法 -
在輸出通道(「傳出回覆」)上向管理器發送回覆
您無需顯式配置 SimpleChunkProcessor
和 ChunkProcessorChunkHandler
。(如果您找到這樣做的理由,可以仍然顯式配置它們)。
以下範例展示如何使用這些 API
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在此處找到遠程 Chunking Job 的完整範例。
遠程 Partitioning
下圖顯示了典型的遠程 Partitioning 情況

另一方面,當瓶頸不是項目處理而是相關的 I/O 時,遠程 Partitioning 非常有用。通過遠程 Partitioning,您可以將工作發送到執行完整 Spring Batch 步驟的 Worker。因此,每個 Worker 都有自己的 ItemReader
、ItemProcessor
和 ItemWriter
。為此,Spring Batch Integration 提供了 MessageChannelPartitionHandler
。
PartitionHandler
介面的此實作使用 MessageChannel
實例向遠程 Worker 發送指令並接收其回應。這提供了對用於與遠程 Worker 通信的傳輸方式(例如 JMS 和 AMQP)的良好抽象。
「可擴展性」章節中關於 遠程分割 的章節概述了配置遠程 Partitioning 所需的概念和組件,並顯示了使用預設 TaskExecutorPartitionHandler
在單獨的本地執行緒中進行分割的範例。對於遠程 Partitioning 到多個 JVM,需要兩個額外組件:
-
遠程處理架構或網格環境
-
支援所需遠程處理架構或網格環境的
PartitionHandler
實作
與遠程 Chunking 類似,您可以使用 JMS 作為「遠程處理架構」。在這種情況下,使用 MessageChannelPartitionHandler
實例作為 PartitionHandler
實作,如前所述。
-
Java
-
XML
以下範例假設現有的分割 Job,並著重於 Java 中的 MessageChannelPartitionHandler
和 JMS 配置
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
以下範例假設現有的分割 Job,並著重於 XML 中的 MessageChannelPartitionHandler
和 JMS 配置
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
您還必須確保分割 handler
屬性映射到 partitionHandler
Bean。
-
Java
-
XML
以下範例將分割 handler
屬性映射到 Java 中的 partitionHandler
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
以下範例將分割 handler
屬性映射到 XML 中的 partitionHandler
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
您可以在此處找到遠程 Partitioning Job 的完整範例。
您可以使用 @EnableBatchIntegration
注解來簡化遠程 Partitioning 設定。此注解提供了兩個對遠程 Partitioning 有用的 Bean:
-
RemotePartitioningManagerStepBuilderFactory
:配置管理器步驟 -
RemotePartitioningWorkerStepBuilderFactory
:配置 Worker 步驟
這些 API 負責配置許多組件,如下圖所示


在管理器端,RemotePartitioningManagerStepBuilderFactory
讓您通過宣告配置管理器步驟:
-
用於分割資料的
Partitioner
-
輸出通道(「傳出請求」),用於將請求發送到 Worker
-
輸入通道(「傳入回覆」),用於接收來自 Worker 的回覆(當配置回覆聚合時)
-
輪詢間隔和超時參數(當配置 Job 儲存庫輪詢時)
您無需顯式配置 MessageChannelPartitionHandler
和 MessagingTemplate
。(如果您找到這樣做的理由,可以仍然顯式配置它們)。
在 Worker 端,RemotePartitioningWorkerStepBuilderFactory
讓您配置 Worker 以:
-
監聽管理器在輸入通道(「傳入請求」)上發送的請求
-
對於每個請求調用
StepExecutionRequestHandler
的handle
方法 -
在輸出通道(「傳出回覆」)上向管理器發送回覆
您無需顯式配置 StepExecutionRequestHandler
。(如果您找到這樣做的理由,可以顯式配置它)。
以下範例展示如何使用這些 API
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}