본문 바로가기

공부/컴퓨터

[Java/기초] BlockingQueue를 이용해 쉽게 Producer/Consumer 패턴 만들기

반응형

여전히 책을 읽고 있다. 정확하게 말하면 요즘에는 거의 못 읽고 있다. ㅠ_ㅠ

  자바 병렬 프로그래밍 - 멀티코어를 100% 활용하는  더그 리 외 지음, 강철구 옮김

아무튼 이 책에는 BlockingQueue에 대한 설명이 잠시 나온다.
이를 이용하면 Producer/Consumer 패턴을 만들기 쉽다고 나와서 직접 코드를 짜 본다.

synchronized block를 이용한 생산자/소비자 패턴은 아래와 같다.
( 간단하게 짜 본거라.. 뭐.. 잘못 되었을 수도 있다. 책임 못짐. ㅎㅎ )

  1. import java.util.ArrayList;  
  2. import java.util.Random;  
  3.  
  4.  
  5. public class PlainProsumer {  
  6.     private static ArrayList<Integer> queue = new ArrayList<Integer>();  
  7.       
  8.     public static void main(String[] args) {  
  9.         Consumer c1 = new Consumer("1", queue); c1.start();  
  10.         Consumer c2 = new Consumer("2", queue); c2.start();  
  11.         Consumer c3 = new Consumer("3", queue); c3.start();  
  12.           
  13.         Producer p1 = new Producer(queue);  p1.start();  
  14.     }  
  15.       
  16.     // 생산자. - 무언가를 열심히 만들어 낸다.  
  17.     static class Producer extends Thread {  
  18.         // INDEX  
  19.         private volatile static int i = 1;  
  20.           
  21.         private ArrayList<Integer> queue;  
  22.           
  23.         public Producer(ArrayList<Integer> queue) {  
  24.             this.queue = queue;  
  25.         }  
  26.           
  27.         public void run() {  
  28.             // 0.5초씩 기다렸다가 데이터를 하나씩 넣어 주자.  
  29.             while(true) {  
  30.                 try {  
  31.                     Thread.sleep(new Random().nextInt(1000));  
  32.                 } catch (InterruptedException e) {  
  33.                     e.printStackTrace();  
  34.                 }  
  35.  
  36.                 synchronized (queue) {  
  37.                     // 데이터를 집어 넣고 나면, 데이터가 들어 갔다고 notify 시켜 줘야 한다.  
  38.                     // 그래야 소비자들 중에서 wait하고 있는 놈들을 깨울 수 있다.  
  39.                     queue.add(i++);  
  40.                     queue.notify();  
  41.                 }  
  42.             }  
  43.         }  
  44.     }  
  45.       
  46.     // 소비자.. 생산해 낸 것을 열심히 사용하자.  
  47.     static class Consumer extends Thread {  
  48.         private ArrayList<Integer> queue;  
  49.         private String name;  
  50.         public Consumer(String name, ArrayList<Integer> queue) {  
  51.             this.name = name;  
  52.             this.queue = queue;  
  53.         }  
  54.           
  55.         public void run() {  
  56.             while ( true ) {  
  57.                 synchronized (queue) {  
  58.                     try {  
  59.                         // 데이터가 들어 있지 않고 비었다면 데이터가 올때까지 기다리자.   
  60.                         if ( queue.isEmpty() ) {  
  61.                                 queue.wait();  
  62.                         }  
  63.                           
  64.                         // 생산자에서 데이터를 집어 넣고 notify해 줘서 wait를 벗어나 아래의 코드가 수행된다.  
  65.                         Integer index = queue.remove(0);  
  66.                         System.err.println("Consumer : " + name + "\tCount : " + index);  
  67.                           
  68.                     } catch (InterruptedException e) {  
  69.                         e.printStackTrace();  
  70.                     }  
  71.                 }  
  72.             }  
  73.         }  
  74.     }  
  75. }  

위의 코드를 확인해 보면 알 수 있다시피, queue를 사용할때 synchronized block를 사용하여 queue에 대한 권한을 획득한 뒤에, notify 및 wait를 해 주어야 한다. 이렇게 하면 괜히 코드가 복잡해 지고 synchronized block를 사용하게 되므로 하나의 block를 더 만들어 주어야 해서 코드에 점차 { } 가 많아져서 코드가 보기 어렵게 된다.

하지만 BlockingQueue를 사용하면 synchronized block를 사용하지 않고도 똑같은 구현을 할 수 있다.

  1. import java.util.Random;  
  2. import java.util.concurrent.ArrayBlockingQueue;  
  3. import java.util.concurrent.BlockingQueue;  
  4.  
  5.  
  6. public class BlockingProsumer {  
  7.     private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);  
  8.       
  9.     public static void main(String[] args) {  
  10.         Consumer c1 = new Consumer("1", queue); c1.start();  
  11.         Consumer c2 = new Consumer("2", queue); c2.start();  
  12.         Consumer c3 = new Consumer("3", queue); c3.start();  
  13.           
  14.         Producer p1 = new Producer(queue);  p1.start();  
  15.     }  
  16.       
  17.     // 생산자. - 무언가를 열심히 만들어 낸다.  
  18.     static class Producer extends Thread {  
  19.         // INDEX  
  20.         private volatile static int i = 1;  
  21.           
  22.         private BlockingQueue<Integer> queue;  
  23.           
  24.         public Producer(BlockingQueue<Integer> queue) {  
  25.             this.queue = queue;  
  26.         }  
  27.           
  28.         public void run() {  
  29.             // 임의의 시간마다 데이터를 넣어 준다.  
  30.             while(true) {  
  31.                 try {  
  32.                     Thread.sleep(new Random().nextInt(500));  
  33.                     // 수정사항 - offer에서 put으로 변경
                       
    // 데이터를 넣고 나면 알아서 notify시켜 준다.
                        queue.put(i++);
                    } catch (InterruptedException e) {  
  34.                     e.printStackTrace();  
  35.                 }  
  36.   
  37.             }  
  38.         }  
  39.     }  
  40.       
  41.       
  42.     // 소비자.. 생산해 낸 것을 열심히 사용하자.  
  43.     static class Consumer extends Thread {  
  44.         private BlockingQueue<Integer> queue;  
  45.         private String name;  
  46.         public Consumer(String name, BlockingQueue<Integer> queue) {  
  47.             this.name = name;  
  48.             this.queue = queue;  
  49.         }  
  50.           
  51.         public void run() {  
  52.             while ( true ) {  
  53.                 try {  
  54.                     // queue에 data가 없으면 알아서 wait하고 있다.  
  55.                     Integer index = queue.take();  
  56.                     System.err.println("Consumer : " + name + "\tIndex : " + index);  
  57.                 } catch (InterruptedException e) {  
  58.                     e.printStackTrace();  
  59.                 }  
  60.             }  
  61.         }  
  62.     }  
  63.       
  64. }  

보다시피 BlockingQueue는 자기가 알아서 wait 상태로 들어 가고 notify를 하게 된다.
이러한 BlockingQueue의 기능을 이용하면 생산자 소비자 패턴을 좀 더 쉽게 만들 수 있다.

BlockingQueue는 대략 아래와 같은 기능을 가지고 있다.

1. queue에 data를 넣을때 가득 차 있으면, queue에 빈칸이 생길때까지 대기
boolean put(E o) throws InterruptedException;
boolean offer(E o)

2. queue에 data를 넣을때 가득 차 있으면, queue에 빈칸이 생길때까지 시간을 두고 대기
boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException;

3. queue에 data가 없을 경우, 데이터가 들어 올때까지 대기
E take() throws InterruptedException;

4. queue에 data가 없을 경우, 데이터가 들어 올때까지 시간을 두고 대기
E poll(long timeout, TimeUnit unit) throws InterruptedException;

사실은 BlockingQueue를 사용해서 생산자/소비자 패턴을 만드는 예제는 이미 BlockingQueue의 API문서에 소개 되고 있다 ^^ ( 즉, 나는 이미 있는 예제를 만든다고 삽질한거다. ㅎㅎ )
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html


그리고 아래의 링크를 따라 가면 적당한 예제 및 사용법을 볼 수 있다. ( 한글임 )
Core Java Technologies Tech Tips - QUEUE와 DELAYED 프로세싱
http://kr.sun.com/developers/techtips/c2004_1019.html#1
반응형