본문 바로가기
  • Code Smell
Framework

[Spring Boot Batch] Chunk Multi Thread 처리 시 Thread 분배 문제 (1 reader multiple writer)

by HSooo 2023. 2. 23.

Spring Batch Multi Thread 처리 시 Thread 를 최대한 이용한다 (1 reader & multiple writer)

일반적으로 Thread-safe 한 PagingItemReader를 사용하면서, ItemProcessor, ItemWriter를 사용하며 Multi-Thread 처리를 할때 다음과 같이 생각하게 됩니다.

ThreadPoolTaskExecutor 에서 꺼낸 1개의 worker 가 Reader-Processor-Writer 처리를 다 할 것이다.

즉, ThreadPoolTaskExecutor 에서 생성 된 1개의 worker 가 CHUNK SIZE 만큼 DB를 읽어 현재 worker 가 process, write 를 같이 할 것이라고 생각합니다.

하지만 다른 결과가 나옵니다.

아래처럼 예제를 구성하겠습니다.

현재 DB에는 읽을 아이템이 60개 입니다.

DB와 연결 후 mysql에 batch 메타 테이블과 예제 데이터를 넣어놨습니다.

얘는 따로 예제를 추가 하지 않겠습니다.

Reader 구성

JpaPagingItemReader 도 있지만, 제 의도대로 정확히 돌리기 위해 AbstractPagingItemReader 를 직접 구현하여 돌려보겠습니다.

아래 구성은 AbstractPagingItemReaderChunkOrientedTasklet 에서 process 하기 전에 한방에 리스트를 넘기기 위함입니다.

Processor 구성은 필수가 아니니 건너 뛰고, Writer 구성을 하겠습니다.

Writer 구성

들고 온 item이 몇개인지 찍는 로직입니다.

Multi-Thread를 돌려줄 TaskExecutor 구성입니다.

ThreadPoolTaskExecutor

그리고 Step 과 Job 구성입니다.

Listener도 하나 달아줍시다. ThreadPoolTaskExecutor를 종료해 줍니다.

Job, Step 구성

CHUNK_SIZE, PAGE_SIZE, WORKER_SIZE

  • CHUNK_SIZE, PAGE_SIZE = 100 으로 commit-interval 이 됩니다.
  • WORKER_SIZE = ThreadPoolTaskExecutor core,max size 로 10입니다.
private static final int CHUNK_SIZE = 100;

private static final int PAGE_SIZE = 100;

private static final int WORKER_SIZE = 10;

결과를 보니 이렇습니다.

worker-3이 돌아서 가져온 item 60 개를, 자기 자신이 29개, 다른 worker에 31개를 나눠 주었습니다.

그럼 아래처럼 세팅 된 걸까요?

worker-3 -> 1,2,3,4 ... 29
worker-4 -> 30,31,32 ... 60

실제로 ItemWriter에서 id만 추출해서 돌려보겠습니다.

변형 된 ItemWriter

자기 자신한테 할당 된 entity의 id 값만 모아 comma를 붙이고 하나의 문자열로 계속 append 해서 한번에 출력하는 로직입니다.

재 실행으로 인한 worker 번호가 바뀌긴 했지만, 결국 id를 확인 하면

나 하나 쟤 하나 이런식으로 분배 된 걸 볼 수 있습니다.

왜 그런걸까?

ChunkTaskletStepChunkOrientedTasklet 에 실행 됩니다.

TaskletStep 과 ChunkOrientedTasklet

@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

    @SuppressWarnings("unchecked")
    Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
    if (inputs == null) {
        inputs = chunkProvider.provide(contribution);
        if (buffering) {
            chunkContext.setAttribute(INPUTS_KEY, inputs);
        }
    }

    chunkProcessor.process(contribution, inputs);
    chunkProvider.postProcess(contribution, inputs);
    ...
}

chunkContext 에 의해 item 을 가져오고 chunkProcessor에 의해 process, write 됩니다.

이 때 PagingItemReader 가 list를 한방에 넘겨도, doRead() 에 의해 item 단위로 분해되는데, 이때 ThreadPoolTaskExecutor 로 분배 됩니다.

또한 ThreadPoolTaskExecutor 가 아무리 많다 하더라도, ChunkOrientedTaskletsemaphore 에 의해 동시에 처리 할 숫자가 결정 되는데, 이 semaphore 는 chunkContext 가 Connection 을 얻었는지에 대해 RUNNING / MONITOR / WAIT 상태가 결정되는 것으로 보입니다.

즉 Connection 을 얻고 난 뒤 라는 건 chunkProcessor가 제 일을 할 수 있을 때, ThreadPoolTaskExecutor 에 의해 호출되어 item을 받는 듯 합니다.

따라서, Connection Pool Resource와 Thread Worker 만 충분하다면, 하나의 Chunk가 단일 워커의 Reader-Processor-Writer 구조가 아닌,하나의 Reader가 CHUNK_SIZE 이하 혹은 그만큼 가져온 itemList 를 여러 Worker 의 Processor-Writer 구조로 처리 될 수 있는 것 입니다.

따라서 1 Reader Multiple Writer 같은 키워드의 구현 방법은 필요가 없게 됩니다.

아래는 같은 60개 item 의 숫자를 좀 더 많은 방식으로 분배하는 방식입니다.

Connection 숫자를 Worker Thread 가 충분히 가져 갈 수 있도록 사이즈를 크게 추가한 상태입니다.

로그를 보면 Worker-3 에 의해 가져온 item 60 개를 나머지 Worker 들이 분배하여 들고 있는 걸 볼 수 있습니다.

예제 코드 Github

Github 소스

댓글