Java는 멀티 스레드를 지원하는 언어이기 때문에 잘 사용하려면 이에 대한 이해도 필요하다. Java를 통해 프로그램을 개발한다면 알게 모르게 자주 써왔을 테지만 막상 공부하지 않는다면 깊게 이해하기 힘든 부분이다. 잘 사용한다면 순차적인 접근에 비해 훨씬 빠르고 효율적인 병렬 처리 방법에 대해 간단히 알아보고 사용하는 방법도 많아 간단히 정리하고 넘어가보려 한다.
방법 4가지 소개
아래는 Java에서 병렬 프로그래밍을 하기위한 4가지 방법이다.
Thread
ExecutorService
ParallelStream
Fork/Join
간단 설명
위에서 기술한 4가지 클래스는 병렬 처리를 위해 내부적으로 다수의 스레드를 생성해 처리한다.
이 과정에서 가장 주의해야 할 부분 두 가지만 알아둬도 많은 장애를 피할 수 있다.
동기화 처리와 블로킹을 조심하자
또한 병렬 처리를 위해 대표적으로 두가지를 고려해 볼 수 있다.
리턴 값이 필요한지
작업이 완료되길 기다려야 하는지
이제 정리를 해보자.
Thread
Runnable
객체를 처리한다.Runnable
객체는 작업 수행시 리턴 값을 반환하지 않기 때문에Thread
를 통해 병렬 작업 수행 시 리턴값을 받을 수 없다.작업을 위해선
start()
메서드를 통해 새로운 스레드를 생성 후 작업을 수행한다.run()
메서드는 스레드를 생성하지 않고 작업을 수행하므로 병렬 처리를 위해선 사용하지 않는다.작업을 기다리려면
join()
메서드를 사용하거나CountdownLatch
클래스를 사용한다.ExecutorService
스레드 풀을 관리하기 위해 사용되며
Runnable
객체와Callable
객체를 처리한다.작업을 위해서
execute
혹은submit
을 사용한다.결과 값이 없는
Runnable
객체의 경우execute()
혹은submit()
메서드를 사용하며 결과 값이 있는Callable
의 경우submit()
메서드를 사용한다.작업을 기다리기 위해
submit()
메서드를 통해 Future 객체를 받아get()
메서드를 사용해야 하며 결과 값이 없는Runnable
객체를 사용한 경우get()
은 null을 리턴한다.또한
CountdownLatch
객체 혹은ExecutorService
의shutdown()
메서드를 사용 할 수 있다.ParallelStream
Stream
으로 변환 가능한 객체의 경우 사용 할 수 있다.Stream
에서 사용하는 메서드를 사용 할 수 있으며 각 작업의 결과를 기대 할 수 있다.작업은 내부에선 병렬로 진행되지만 외부에선 항상 블로킹 처리되어 다음 코드를 진행하기 전 기다린다.
Fork/Join
내부적으로 필요한 만큼의 스레드로 분기해 작업을 수행한다.
작업을 실행하기 위해
ForkJoinPool
객체가 필요하다.풀의 갯수만큼 분기하며 그 이상으로 작업 스레드의 수가 많아질 경우 비어있는 풀이 작업을 가져와 처리한다.
결과 값이 필요할 경우
RecursiveTask
를 상속/구현해 사용하며 필요 없을 경우RecursiveAction
을 사용한다.작업을 기다리기 위해
awaitTermination()
메서드를 사용한다.
예제
사용 하는 방법은 아키텍처 구성과 요구사항에 따라 다양하게 변경 될 수 있으나 간단한 구성 방법만 짚어보고 넘어가도록 한다.
예제 코드의 작업 내용은 0 ~ 100까지 수를 가지고 더하거나 출력하며 리턴 값이 필요 없다면 출력하고 리턴 값이 필요하다면 더한 뒤 값을 리턴 하도록 한다.
// 리턴 값이 필요 없는 경우
for (int i = 0; i < 100; i++) {
System.out.println(i);
}
// 리턴 값이 필요한 경우
int n = 0;
for (int i = 0; i < 100; i++) {
n += i;
}
Thread
public class MyRunnable implements Runnable {
private final int s;
public MyRunnable(int s) {
this.s = s;
}
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(i + s);
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
List<Thread> l = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new MyRunnable(i * 20));
l.add(t);
t.run();
}
// 작업 수행이 완료되길 기다려야 한다면 join() 메서드를 사용한다.
for (int i = 0; i < 5; i++) {
Thread t = l.get(i);
t.join();
}
}
}
ExecutorService
public class MyRunnable implements Runnable {
private final int s;
public MyRunnable(int s) {
this.s = s;
}
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(i + s);
}
}
}
public class MyCallable implements Callable {
private final int s;
public MyCallable(int s) {
this.s = s;
}
public Integer call() {
Integer in = new Integer(0);
for (int i = 0; i < 20; i++) {
in += i + s;
}
return in;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService e = Executors.newFixedThreadPool(5);
// 리턴 값이 필요 없는 경우
for (int i = 0; i < 5; i++) {
e.execute(new MyRunnable(i * 20));
}
// 작업을 기다려야 할 경우
//e.shutdown();
// 리턴 값이 필요한 경우
List<Future<Integer>> l = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Future<Integer> f = e.submit(new MyCallable(i * 20));
l.add(f);
}
// 작업이 완료 되길 기다려야 할 경우
int n = 0;
for (int i = 0; i < 5; i++) {
Future<Integer> f = l.get(i);
n += f.get();
}
}
}
ParallelStream
public class Main {
public static void main(String[] args) {
// 항상 작업은 블로킹 된다.
// 리턴 값이 필요 없는 경우
IntStream.range(0, 100).parallel().forEach(System.out::println);
// 리턴 값이 필요한 경우
int n = IntStream.range(0, 100).parallel().reduce(0, Integer::sum);
}
}
Fork/Join
// Fork/Join 방식의 특성으로 위 3개의 결과와 다름을 유의한다.
public class MyRecursiveTask extends RecursiveTask<Integer> {
private final int s;
public MyRecursiveTask(int s) {
this.s = s;
}
protected Integer compute() {
if (s % 2 == 0) {
MyRecursiveTask rt1 = new MyRecursiveTask(s / 2);
MyRecursiveTask rt2 = new MyRecursiveTask(s / 2);
rt1.fork();
rt2.fork();
int r = 0;
r += rt1.join();
r += rt2.join();
return r;
} else {
int r = 0;
for (int i = 0; i < s; i++) {
r++;
}
return r;
}
}
}
public class MyRecursiveAction extends RecursiveAction {
private final int s;
public MyRecursiveAction(int s) {
this.s = s;
}
protected void compute() {
System.out.println(Thread.currentThread().getName());
if (s % 2 == 0) {
MyRecursiveAction ra1 = new MyRecursiveAction(s / 2);
MyRecursiveAction ra2 = new MyRecursiveAction(s / 2);
ra1.fork();
ra2.fork();
} else {
for (int i = 0; i < s; i++) {
System.out.println(i);
}
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ForkJoinPool p = new ForkJoinPool(5);
// 결과 값이 필요한 경우
RecursiveTask<Integer> rt = new MyRecursiveTask(100);
int n = p.invoke(rt);
// 결과 값이 필요 없는 경우
RecursiveAction ra = new MyRecursiveAction(100);
p.invoke(ra);
// 작업이 완료되길 기다려야 하는 경우
// shutdown 메서드의 경우 진행 중인 스레드가 끝나면 종료 되므로 사용에 주의한다.
p.awaitTermination(5, TimeUnit.SECONDS);
}
}