Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
Tags
- PrestoDB
- PostgreSQL
- kafka ui
- ui for kafka
- Kafka
- MongoDB
- logstash
- fluentd
- elasticearch
- elasticsearch
- SSL
- Python
- kibana
- kafka connect
- naverdevelopers
- pyspark
- MariaDB
Archives
- Today
- Total
Dev_duri
Thread Pool 본문
Thread Pool
사용 이유
- 프로세스 중 병렬 작업처리가 많아진다
- 스레드 개수가 증가되고 그에따른 스레드생성과 스케줄링 발생
- CPU가 바빠져 메모리 사용량이 늘어난다.
시스템성능이 저하
갑작스러운 병렬작업의 폭증에 따른 스레드 폭증
스레드 생성과 스레드 종료의 오버헤드를 줄여 성능을 향상
위 같은 상황을 위해 사용하는 것이 스레드 풀이다.
- 작업처리에 사용되는 스레드를 제한된 개수만큼 미리 정해놓고 작업큐 (Queue)에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다.
- 작업처리 요청이 폭증되어도 스레드의 전체개수가 늘어나지 않도록 제한해서 하나씩 처리하기 때문에 시스템 성능이 급격히 저하되지 않는다.
ExecutorService
- Executors 는 ExecutorService를 생성하며 다음 매소드를 제공하여 쓰레드 풀 개수 및 종류를 정할 수 있다.
newFixed ThreadPool(int) 인자 개수만큼 고정된 프레드풀을 만든다.
newCachedThredPool() | 필요할때 필요한 만큼 쓰레드풀 생성. 이미 생성된 쓰레드를 재활용 할수 있어 성능 용이하다. |
newScheculedThreadPool(int) | 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 작업이 있다면 스케줄스레드 풀을 활용 할 수 있다. |
newSingleThredExecutor() | 쓰레드 1개인 ExecutorService를 리턴한다. 싱글 스레드에서 동작하는 작업 처리시 사용한다. |
- Executors로 ExecutorService를 생성했다면, 서비스는 작업을 처리한다.
- ExecutorService.submit() 또는 ExecutorService.execute() 메소드로 작업을 추가한다.
| execute() : | • 작업 처리 결과를 반환하지 않는다. • 작업처리 도중 예외 발생시 스레드가 종료되고 해당 스레드는 스레드풀에서 제거된다. • 스레드풀은 다른 작업처리를 위해 새로운 스레드를 생성한다. | | --- | --- | | submit(): | • 작업 처리 결과를 반환한다. • 작업처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용된다. • 가급적 스레드의 생성 오버헤더를 줄이기 위해 submit()을 사용하는 것이 좋다. |
작업 처리
작업 처리 요청
- ExecutorService의 작업큐에 Runnable 또는 Callable객체를 넣는 행위를 말한다.
리턴타입 | 메소드명(매개변수) | 설명 |
void | execute(Runnable Command) | Runnable 을 작업큐에 저장 작업처리결과(리턴값) 없음 |
Future<?> Future<V> Future<V> |
submit(Runnable task) submit(Runnable task, V result) submit(Callable task>) |
Runnable 또는 Callable 작업큐에 저장리턴된 Future을 통해 작업처리 결과(리턴값)을 얻을 수 있음. |
리턴값이 있는(callable) 작업
- Callable 작업의 처리요청은 ExecutorService의 submit()메소드를 호출한다.
- submit()메소드는 작업 큐에 Callable객체를 저장하고 즉시 Future<T>를 리턴한다.
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Example {
public static void main(String[] args) {
// 스레드 생성
ExecutorService executorService = Executors.newFixedThreadPool(2); // 2개 스레드 풀 지정
Callable<String> taskThread = new Callable<String>() {
@Override
public String call() throws Exception {
// 리턴값
return "Task executed asynchronously";
}
};
// Submit
Future<String> future = executorService.submit(taskThread);
try {
// 리턴값 불러오기
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
// 종료
executorService.shutdown();
}
}
$ Task executed asynchronously
리턴값이 없는(Runnable) 작업
- 스레드가 작업처리를 정상적으로 완료했는지, 처리도중 예외가 발생했는지 확인하기 위해서다.
- 작업도중 예외발생시 ExecutorException을 발생시킨다.
- 정상완료 시 Future의 get메소드는 null을 리턴한다.
- 실제 작업이 정의되지 않았기 때문에 해당 코드를 실행하면 아무 동작이 발생하지 않기 때문
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Example {
public static void main(String[] args) {
// 스레드 생성
ExecutorService executorService = Executors.newFixedThreadPool(2); // 2개 스레드 풀 지정
Runnable taskThread = new Runnable() {
@Override
public void run() {
System.out.println("Task executed asynchronously");
}
};
// Submit
Future<?> future = executorService.submit(taskThread);
// 종료
executorService.shutdown();
}
}
$ Task executed asynchronously
작업완료
- 작업이 완료된 순서대로 작업 처리 통보한다. ( 시작 순과 무관 )
- CompletionService 의 처리완료된 작업을 가져오는 poll()과 take()메소드를 사용하여 작업 완료된 것만 통보 받을 수 있다.
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
// Submit
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
completionService.submit(() -> {
Thread.sleep(1000);
return taskNumber * taskNumber;
});
}
// Results
for (int i = 0; i < 10; i++) {
try {
Future<Integer> completedTask = completionService.take();
int result = completedTask.get();
System.out.println("Task " + i + " completed with result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
- 10개의 작업을 **CompletionService**를 사용하여 비동기적으로 실행
- 작업 완료 순서대로 결과를 가져온다.
결과
Task 0 completed with result: 0
Task 1 completed with result: 9
Task 2 completed with result: 1
Task 3 completed with result: 4
Task 4 completed with result: 16
Task 5 completed with result: 25
Task 6 completed with result: 36
Task 7 completed with result: 64
Task 8 completed with result: 49
Task 9 completed with result: 81
Connection Pool과의 관계
- 기본적으로는 서로 독립적이다.
- 애플리케이션에서 스레드 풀과 커넥션 풀을 함께 사용하는 경우, 둘 간의 리소스 경합을 피하고 성능을 최적화하기 위해 적절한 조정이 필요하다.
- 스레드 풀이 너무 많은 작업을 동시에 실행하면서 동시에 많은 데이터베이스 연결을 필요로 한다면, 두 풀의 크기를 적절하게 조정하여 성능 문제를 방지할 수 있다.
참조.
https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html
http://blog.gilliard.lol/2018/01/10/Java-in-containers-jdk10.html