Dev_duri

Thread Pool 본문

CS

Thread Pool

marcel 2023. 8. 21. 14:55

Thread Pool

사용 이유

  1. 프로세스 중 병렬 작업처리가 많아진다
  2. 스레드 개수가 증가되고 그에따른 스레드생성과 스케줄링 발생
  3. CPU가 바빠져 메모리 사용량이 늘어난다.

시스템성능이 저하

갑작스러운 병렬작업의 폭증에 따른 스레드 폭증

스레드 생성과 스레드 종료의 오버헤드를 줄여 성능을 향상

위 같은 상황을 위해 사용하는 것이 스레드 풀이다.

  • 작업처리에 사용되는 스레드를 제한된 개수만큼 미리 정해놓고 작업큐 (Queue)에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다.
  • 작업처리 요청이 폭증되어도 스레드의 전체개수가 늘어나지 않도록 제한해서 하나씩 처리하기 때문에 시스템 성능이 급격히 저하되지 않는다.

 

ExecutorService

  • Executors 는 ExecutorService를 생성하며 다음 매소드를 제공하여 쓰레드 풀 개수 및 종류를 정할 수 있다.

newFixed ThreadPool(int) 인자 개수만큼 고정된 프레드풀을 만든다.

newCachedThredPool() 필요할때 필요한 만큼 쓰레드풀 생성. 이미 생성된 쓰레드를 재활용 할수 있어 성능 용이하다.
newScheculedThreadPool(int) 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 작업이 있다면 스케줄스레드 풀을 활용 할 수 있다.
newSingleThredExecutor() 쓰레드 1개인 ExecutorService를 리턴한다. 싱글 스레드에서 동작하는 작업 처리시 사용한다.
  • Executors로 ExecutorService를 생성했다면, 서비스는 작업을 처리한다.
  • ExecutorService.submit() 또는 ExecutorService.execute() 메소드로 작업을 추가한다.

| execute() : | • 작업 처리 결과를 반환하지 않는다. • 작업처리 도중 예외 발생시 스레드가 종료되고 해당 스레드는 스레드풀에서 제거된다. • 스레드풀은 다른 작업처리를 위해 새로운 스레드를 생성한다. | | --- | --- | | submit(): | • 작업 처리 결과를 반환한다. • 작업처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용된다. • 가급적 스레드의 생성 오버헤더를 줄이기 위해 submit()을 사용하는 것이 좋다. |

 

작업 처리

작업 처리 요청

  • ExecutorService의 작업큐에 Runnable 또는 Callable객체를 넣는 행위를 말한다.
리턴타입  메소드명(매개변수) 설명
void execute(Runnable Command) Runnable 을 작업큐에 저장
작업처리결과(리턴값) 없음
Future<?>
Future<V>
Future<V>
submit(Runnable task)
submit(Runnable task, V result)
submit(Callable task>)
Runnable 또는 Callable 작업큐에 저장리턴된 Future을 통해 작업처리 결과(리턴값)을 얻을 수 있음.

 

리턴값이 있는(callable) 작업

  • Callable 작업의 처리요청은 ExecutorService의 submit()메소드를 호출한다.
  • submit()메소드는 작업 큐에 Callable객체를 저장하고 즉시 Future<T>를 리턴한다.
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Example {
    public static void main(String[] args) {
        // 스레드 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2); // 2개 스레드 풀 지정
        
        Callable<String> taskThread = new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 리턴값
                return "Task executed asynchronously";  
            }
        };
        
        // Submit
        Future<String> future = executorService.submit(taskThread);
        
        try {
            // 리턴값 불러오기
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        // 종료
        executorService.shutdown();
    }
}
$ Task executed asynchronously

 

 

리턴값이 없는(Runnable) 작업

  • 스레드가 작업처리를 정상적으로 완료했는지, 처리도중 예외가 발생했는지 확인하기 위해서다.
  • 작업도중 예외발생시 ExecutorException을 발생시킨다.
  • 정상완료 시 Future의 get메소드는 null을 리턴한다.
  • 실제 작업이 정의되지 않았기 때문에 해당 코드를 실행하면 아무 동작이 발생하지 않기 때문
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Example {
    public static void main(String[] args) {
        // 스레드 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2); // 2개 스레드 풀 지정
        
        Runnable taskThread = new Runnable() {
            @Override
            public void run() {
                System.out.println("Task executed asynchronously");
            }
        };
        
        // Submit
        Future<?> future = executorService.submit(taskThread);
        
        // 종료
        executorService.shutdown();
    }
}
$ Task executed asynchronously

 

 

작업완료

  • 작업이 완료된 순서대로 작업 처리 통보한다. ( 시작 순과 무관 )
  • CompletionService 의 처리완료된 작업을 가져오는 poll()과 take()메소드를 사용하여 작업 완료된 것만 통보 받을 수 있다.
import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        // Submit
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            completionService.submit(() -> {
                Thread.sleep(1000); 
                return taskNumber * taskNumber;
            });
        }

        // Results
        for (int i = 0; i < 10; i++) {
            try {
                Future<Integer> completedTask = completionService.take();
                int result = completedTask.get();
                System.out.println("Task " + i + " completed with result: " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }
}
  1. 10개의 작업을 **CompletionService**를 사용하여 비동기적으로 실행
  2. 작업 완료 순서대로 결과를 가져온다.

결과

Task 0 completed with result: 0
Task 1 completed with result: 9
Task 2 completed with result: 1
Task 3 completed with result: 4
Task 4 completed with result: 16
Task 5 completed with result: 25
Task 6 completed with result: 36
Task 7 completed with result: 64
Task 8 completed with result: 49
Task 9 completed with result: 81

 

Connection Pool과의 관계

  • 기본적으로는 서로 독립적이다.
  • 애플리케이션에서 스레드 풀과 커넥션 풀을 함께 사용하는 경우, 둘 간의 리소스 경합을 피하고 성능을 최적화하기 위해 적절한 조정이 필요하다.
  • 스레드 풀이 너무 많은 작업을 동시에 실행하면서 동시에 많은 데이터베이스 연결을 필요로 한다면, 두 풀의 크기를 적절하게 조정하여 성능 문제를 방지할 수 있다.

참조.

https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html

http://blog.gilliard.lol/2018/01/10/Java-in-containers-jdk10.html