일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 |
- spring boot tomcat
- batch
- spring-webmvc #코드읽기
- Spring Batch
- IntelliJ
- tomcat
- ApplicationPidFileWriter
- static inner class
- Spring
- spring-webmvc
- docker
- Spring Data REST
- spring boot
- Data REST
- spring bean
- spring batch 코드
- JPA
- Spring Data JPA
- 코드 리뷰
- 세미나
- JUnit
- 톰캣
- spring camp
- spring pid
- JPA mapping
- SuperTypeToken
- spring-mvc
- ORM
- spring jpa
- REST API
- Today
- 3
- Total
- 916,325
woniper
Spring Batch의 동작 코드 #Job 생성과 실행 본문
누구나 아는 Spring Batch 기본 개념에서 Spring Batch Domain에 대해 간단히 알아보았다. 이번 글에선 기본 개념에 정리한 여러 클래스가 서로 어떤 의존성을 가지며, 어떻게 동작하는지 코드를 살펴볼 것이다. 모든 코드를 다 볼 수 없기에 이번 글은 Job이 어떻게 생성되고 실행되는지 살펴본다.
Sample
@Configuration
public class SimpleConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("simple-job")
.start(step())
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("simple-step")
.<String, StringWrapper>chunk(10)
.reader(itemReader())
.processor(itemProcess())
.writer(itemWriter())
.build();
}
private ItemReader<String> itemReader() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("test" + i);
}
return new ListItemReader(list);
}
private ItemProcessor<String, StringWrapper> itemProcess() {
return StringWrapper::new;
}
private ItemWriter<StringWrapper> itemWriter() {
return System.out::println;
}
private class StringWrapper {
private String value;
StringWrapper(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return String.format("i'm %s", getValue());
}
}
}
위 예제는 아주 단순한 batch다.
- Job은 하나의 Step을 갖고 있으며,
- Step의 ItemReader는 ArrayList에 100개의
String value
를 담고 있다. (읽기) - ItemProcessor는 ItemReader에서 반환된 String List를
StringWrapper
클래스로 wrapping 한다. (가공) - ItemWriter는 ItemProcessor를 통해 StringWrapper로 반환된 List를
System.out.println
으로 로그를 찍는다. (쓰기)
실행
@Configuration
@EnableBatchProcessing
public class JobRunnerConfiguration {
@Bean
public JobLauncherTestUtils utils() throws Exception {
return new JobLauncherTestUtils();
}
}
Batch를 실행하기 위한 JobLauncherTestUtils
를 Bean으로 등록
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SimpleConfiguration.class, JobRunnerConfiguration.class})
public class SimpleConfigurationTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void testLaunchJob() throws Exception {
jobLauncherTestUtils.launchJob();
}
}
SimpleConfiguration, JobRunnerConfiguration
을@Contextconfiguration
을 이용해 테스트에 필요한 Config Bean으로 등록- Bean으로 등록된
JobLauncherTestUtils
를 주입(@Autowired) 받아 Batch Job(SimpleConfiguration)을 실행 - 이때 JobLauncherTestUtils은
SimpleJobLauncher
를 이용해 Batch Job을 실행 - SimpleConfiguration 예제의
JobBuilderFactory, StepBuilderFactory
의 자세한 설명은 생략한다. Job과 Step을 생성하는 객체라고 생각하면 된다.
SimpleJobLauncher
최대한 간단하게 diagram을 그리려 노력했다. 위 예제를 기준으로 Spring Batch가 내부적으로 어떻게 동작하는지 살펴보자.
일단 예제에서 JobLauncherTestUtils가 SimpleJobLauncher
를 통해 Job을 실행한다고 설명했다. 실제로 어떻게 실행하는지 코드를 보자.
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
// 재실행 가능한 Job 인지 체크
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
* retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
job.execute(jobExecution);
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]");
}
catch (Throwable t) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
rethrow(t);
}
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}
return jobExecution;
}
- JobExecution 반환
- JobRepository로 JobExecution 조회 및 생성
run 메소드는 Job 객체와 JobParamter 객체를 받아 JobRepository
를 이용해 JobExecution을 조회(getLastJobExecution) 및 생성(createJobExecution)한다.
SimpleJobRepository.createJobExecution
@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
Assert.notNull(jobName, "Job name must not be null.");
Assert.notNull(jobParameters, "JobParameters must not be null.");
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
ExecutionContext executionContext;
if (jobInstance != null) {
List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
for (JobExecution execution : executions) {
if (execution.isRunning() || execution.isStopping()) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ jobInstance);
}
BatchStatus status = execution.getStatus();
if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException("Cannot restart job from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
throw new JobInstanceAlreadyCompleteException(
"A job instance already exists and is complete for parameters=" + jobParameters
+ ". If you want to run this job again, change the parameters.");
}
}
executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
}
else {
jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
executionContext = new ExecutionContext();
}
JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
jobExecution.setExecutionContext(executionContext);
jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
jobExecutionDao.saveJobExecution(jobExecution);
ecDao.saveExecutionContext(jobExecution);
return jobExecution;
}
SimpleJobRepository는 JobRepository interface의 구현체.
- JobInstance를 조회
- JobInstance가 null이 아니라면, 실행 가능한 Job인지 체크 후
JobExecution
을 조회(ecDao.getExecutionContext) - JobInstance가 null이라면, JobInstance와
ExecutionContext
를생성
- 마지막으로 JobExecution을
저장
이 코드에서 몇가지 객체가 눈에 들어온다.
JobExecution
앞 글에서 JobExecution에 대해 설명했다. Job이 한번 실행될 때 생성되는 객체다. 이 객체는 Job이 실행되는 데 위해 필요한 아래와 같은 객체를 담고 있다.
- JobParamter : Job을 실행하기 위해 필요한 paramter
- JobInstance : JobExecution을 조회하기 위한 id, name
- Collection
: Job이 포함하고 있는 실행 가능한 StepExecution List - Job 실행 생성, 시작, 종료, 수정 시간
- 그 외 여러 객체
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
private final String jobConfigurationName;
JobExecution에 멤버 변수로 선언된 객체들.
ExecutionContext
ExecutionContext
객체는 Job이 실행되는 동안 필요한 데이터를 메모리(Map)에 저장하고 관리하는 객체다. 실제로 이 객체를 살펴보면 Map을 통해 데이터를 저장, 조회한다.
ExecutionContext의 생명 주기는 Job이 실행되는 동안 사용된다.
job.execute(jobExecution);
다시 SimpleJobLauncher
코드로 돌아가 보자. job.execute 메소드가 바로 Job을 실행하는 부분이다. JobRepository를 통해 생성된 JobExecution
을 argument로 넘긴다. 즉, JobExecution은 Job을 실행하는 데 필요한 객체다.
AbstractJob.execute
@Override
public final void execute(JobExecution execution) {
if (logger.isDebugEnabled()) {
logger.debug("Job execution starting: " + execution);
}
// 1. ThreadLocal에 현재 실행될 Job 등록
JobSynchronizationManager.register(execution);
try {
// 2. 실행 가능한 Job인지 JobParameter 검증
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
// 3. 시작 시간 등록
execution.setStartTime(new Date());
// 4. Batch 상태를 시작으로 변경
updateStatus(execution, BatchStatus.STARTED);
// 5. JobExecutionListener.beforeJob 실행 (전 처리)
listener.beforeJob(execution);
try {
// 6. job 구현체 실행
doExecute(execution);
if (logger.isDebugEnabled()) {
logger.debug("Job execution complete: " + execution);
}
} catch (RepeatException e) {
throw e.getCause();
}
} else {
// The job was already stopped before we even got this far. Deal
// with it in the same way as any other interruption.
execution.setStatus(BatchStatus.STOPPED);
execution.setExitStatus(ExitStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Job execution was stopped: " + execution);
}
}
} catch (JobInterruptedException e) {
logger.info("Encountered interruption executing job: "
+ e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
execution.addFailureException(e);
} catch (Throwable t) {
logger.error("Encountered fatal error executing job", t);
execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
execution.setStatus(BatchStatus.FAILED);
execution.addFailureException(t);
} finally {
try {
if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
&& execution.getStepExecutions().isEmpty()) {
ExitStatus exitStatus = execution.getExitStatus();
ExitStatus newExitStatus =
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}
execution.setEndTime(new Date());
try {
// 7. JobExecutionListener.afterJob 실행 (후 처리)
listener.afterJob(execution);
} catch (Exception e) {
logger.error("Exception encountered in afterStep callback", e);
}
jobRepository.update(execution);
} finally {
JobSynchronizationManager.release();
}
}
}
코드가 길고 복잡하지만, 주석을 보면 그렇게 복잡한 로직은 아니다.
5. JobExecutionListener.beforeJob 실행
Job이 실행되기 전처리, 후처리 가능한 JobExecutionListener가 있다. 이 정도만 알고 넘어가자.
6. job 구현체 실행
abstract protected void doExecute(JobExecution execution) throws JobExecutionException;
AbstractJob은 이름과 같이 추상 클래스다. AbstractJob.doExecute
메소드는 추상 메소드다. 이를 상속받아 구현된 Job 객체가 doExecution을 구현하고 있을 것이다. 그럼 이 예제에서 AbstractJob을 구현한 구현 객체는 무엇일까?
SimpleJob.doExecute
@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
//
// Terminate the job if a step fails
//
break;
}
}
//
// Update the job status to be the same as the last step
//
if (stepExecution != null) {
if (logger.isDebugEnabled()) {
logger.debug("Upgrading JobExecution status: " + stepExecution);
}
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}
SimpleJob
이 바로 AbstractJob을 구현한 구현체다. doExecute 메소드는 Step List를 실행한다.
Step이 실행되는 과정은 다음 포스팅에.
'Spring' 카테고리의 다른 글
Spring Data JPA 같은 이름, 다른 type인 2개의 @Entity인 경우 주의 사항 (0) | 2018.05.24 |
---|---|
Spring Batch의 동작 코드 #Step 생성과 실행 (2) | 2018.05.08 |
누구나 아는 Spring Batch 기본 개념 (2) | 2018.04.30 |
Spring Data REST #3 내부 동작 (2) | 2017.06.17 |
Spring Data REST #2 동작 원리 (0) | 2017.05.09 |
- Tag
- batch, Spring Batch, spring batch 코드, 코드 리뷰