[Java] 병렬 프로그래밍 간단 정리 (Thread, ExecutorService, ParallelStream, Fork/Join)

작성일 : 2023년 03월 23일
  • #Java

Java는 멀티 스레드를 지원하는 언어이기 때문에 잘 사용하려면 이에 대한 이해도 필요하다. Java를 통해 프로그램을 개발한다면 알게 모르게 자주 써왔을 테지만 막상 공부하지 않는다면 깊게 이해하기 힘든 부분이다. 잘 사용한다면 순차적인 접근에 비해 훨씬 빠르고 효율적인 병렬 처리 방법에 대해 간단히 알아보고 사용하는 방법도 많아 간단히 정리하고 넘어가보려 한다.

방법 4가지 소개

아래는 Java에서 병렬 프로그래밍을 하기위한 4가지 방법이다.

  1. Thread

  2. ExecutorService

  3. ParallelStream

  4. Fork/Join

간단 설명

위에서 기술한 4가지 클래스는 병렬 처리를 위해 내부적으로 다수의 스레드를 생성해 처리한다.

이 과정에서 가장 주의해야 할 부분 두 가지만 알아둬도 많은 장애를 피할 수 있다.

동기화 처리와 블로킹을 조심하자


또한 병렬 처리를 위해 대표적으로 두가지를 고려해 볼 수 있다.

  1. 리턴 값이 필요한지

  2. 작업이 완료되길 기다려야 하는지

이제 정리를 해보자.

  1. Thread

    Runnable 객체를 처리한다.

    Runnable 객체는 작업 수행시 리턴 값을 반환하지 않기 때문에 Thread를 통해 병렬 작업 수행 시 리턴값을 받을 수 없다.

    작업을 위해선 start() 메서드를 통해 새로운 스레드를 생성 후 작업을 수행한다.

    run() 메서드는 스레드를 생성하지 않고 작업을 수행하므로 병렬 처리를 위해선 사용하지 않는다.

    작업을 기다리려면 join() 메서드를 사용하거나 CountdownLatch 클래스를 사용한다.

  2. ExecutorService

    스레드 풀을 관리하기 위해 사용되며 Runnable 객체와 Callable 객체를 처리한다.

    작업을 위해서 execute혹은 submit을 사용한다.

    결과 값이 없는 Runnable 객체의 경우 execute()혹은 submit() 메서드를 사용하며 결과 값이 있는 Callable의 경우 submit() 메서드를 사용한다.

    작업을 기다리기 위해 submit() 메서드를 통해 Future 객체를 받아 get() 메서드를 사용해야 하며 결과 값이 없는 Runnable 객체를 사용한 경우 get()은 null을 리턴한다.

    또한 CountdownLatch 객체 혹은 ExecutorServiceshutdown() 메서드를 사용 할 수 있다.

  3. ParallelStream

    Stream으로 변환 가능한 객체의 경우 사용 할 수 있다.

    Stream에서 사용하는 메서드를 사용 할 수 있으며 각 작업의 결과를 기대 할 수 있다.

    작업은 내부에선 병렬로 진행되지만 외부에선 항상 블로킹 처리되어 다음 코드를 진행하기 전 기다린다.

  4. Fork/Join

    내부적으로 필요한 만큼의 스레드로 분기해 작업을 수행한다.

    작업을 실행하기 위해 ForkJoinPool 객체가 필요하다.

    풀의 갯수만큼 분기하며 그 이상으로 작업 스레드의 수가 많아질 경우 비어있는 풀이 작업을 가져와 처리한다.

    결과 값이 필요할 경우 RecursiveTask를 상속/구현해 사용하며 필요 없을 경우 RecursiveAction을 사용한다.

    작업을 기다리기 위해 awaitTermination() 메서드를 사용한다.

예제

사용 하는 방법은 아키텍처 구성과 요구사항에 따라 다양하게 변경 될 수 있으나 간단한 구성 방법만 짚어보고 넘어가도록 한다.


예제 코드의 작업 내용은 0 ~ 100까지 수를 가지고 더하거나 출력하며 리턴 값이 필요 없다면 출력하고 리턴 값이 필요하다면 더한 뒤 값을 리턴 하도록 한다.

// 리턴 값이 필요 없는 경우
for (int i = 0; i < 100; i++) {
	System.out.println(i);
}

// 리턴 값이 필요한 경우
int n = 0;
for (int i = 0; i < 100; i++) {
	n += i;
}
  1. Thread

public class MyRunnable implements Runnable {
	private final int s;
	public MyRunnable(int s) {
		this.s = s;
	}
	
	public void run() {
		for (int i = 0; i < 20; i++) {
			System.out.println(i + s);
		}
	}
}

public class Main {
	public static void main(String[] args) throws InterruptedException {
		List<Thread> l = new ArrayList<>();
		for (int i = 0; i < 5; i++) {
			Thread t = new Thread(new MyRunnable(i * 20));
			l.add(t);
			t.run();
		}
		
		// 작업 수행이 완료되길 기다려야 한다면 join() 메서드를 사용한다.
		for (int i = 0; i < 5; i++) {
			Thread t = l.get(i);
			t.join();
		}
    }
}
  1. ExecutorService

public class MyRunnable implements Runnable {
	private final int s;
	public MyRunnable(int s) {
		this.s = s;
	}
	
	public void run() {
		for (int i = 0; i < 20; i++) {
			System.out.println(i + s);
		}
	}
}

public class MyCallable implements Callable {
	private final int s;
	
	public MyCallable(int s) {
		this.s = s;
	}
	
	public Integer call() {
		Integer in = new Integer(0);
		for (int i = 0; i < 20; i++) {
			in += i + s;
		}
		return in;
	}
}

public class Main {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService e = Executors.newFixedThreadPool(5);
		// 리턴 값이 필요 없는 경우
		for (int i = 0; i < 5; i++) {
			e.execute(new MyRunnable(i * 20));
		}
		// 작업을 기다려야 할 경우
		//e.shutdown();
		
        // 리턴 값이 필요한 경우
		List<Future<Integer>> l = new ArrayList<>();
		for (int i = 0; i < 5; i++) {
			Future<Integer> f = e.submit(new MyCallable(i * 20));
			l.add(f);
		}
		
        // 작업이 완료 되길 기다려야 할 경우
		int n = 0;
		for (int i = 0; i < 5; i++) {
			Future<Integer> f = l.get(i);
			n += f.get();
		}
    }
}
  1. ParallelStream

public class Main {
	public static void main(String[] args) {
        // 항상 작업은 블로킹 된다.
        // 리턴 값이 필요 없는 경우
		IntStream.range(0, 100).parallel().forEach(System.out::println);
        // 리턴 값이 필요한 경우
		int n = IntStream.range(0, 100).parallel().reduce(0, Integer::sum);
    }
}
  1. Fork/Join

// Fork/Join 방식의 특성으로 위 3개의 결과와 다름을 유의한다.
public class MyRecursiveTask extends RecursiveTask<Integer> {
	private final int s;
	
	public MyRecursiveTask(int s) {
		this.s = s;
	}
	
	protected Integer compute() {
		if (s % 2 == 0) {
			 
			MyRecursiveTask rt1 = new MyRecursiveTask(s / 2);
			MyRecursiveTask rt2 = new MyRecursiveTask(s / 2);
			
			rt1.fork();
			rt2.fork();
			
			int r = 0;
			
			r += rt1.join();
			r += rt2.join();
			
			return r;
		} else {
			int r = 0;
			for (int i = 0; i < s; i++) {
				r++;
			}
			return r;
		}
	}
}

public class MyRecursiveAction extends RecursiveAction {
	private final int s;
	
	public MyRecursiveAction(int s) {
		this.s = s;
	}
	
	protected void compute() {
		System.out.println(Thread.currentThread().getName());
		if (s % 2 == 0) {
			MyRecursiveAction ra1 = new MyRecursiveAction(s / 2);
			MyRecursiveAction ra2 = new MyRecursiveAction(s / 2);
			
			ra1.fork();
			ra2.fork();
		} else {
			for (int i = 0; i < s; i++) {
				System.out.println(i);
			}
		}
	}
}

public class Main {
	public static void main(String[] args) throws InterruptedException {
		ForkJoinPool p = new ForkJoinPool(5);
        // 결과 값이 필요한 경우
		RecursiveTask<Integer> rt = new MyRecursiveTask(100);
		int n = p.invoke(rt);
		
        // 결과 값이 필요 없는 경우
		RecursiveAction ra = new MyRecursiveAction(100);
		p.invoke(ra);
		
        // 작업이 완료되길 기다려야 하는 경우
        // shutdown 메서드의 경우 진행 중인 스레드가 끝나면 종료 되므로 사용에 주의한다.
		p.awaitTermination(5, TimeUnit.SECONDS);
    }
}