olrlobt

[Java] 블록킹큐(Blocking Queue)와 딜레이 큐(Delay Queue) 본문

Java/자료구조

[Java] 블록킹큐(Blocking Queue)와 딜레이 큐(Delay Queue)

olrlobt 2023. 9. 13. 08:48

Queue를 사용해 오면서 구현체를 만들 때 new를 치면 자동 완성 되는 수많은 구현체들을 보면서도, 아무 관심도 주지 않고 내가 쓰는 구현체만 써 왔었다.

 "개발자가 반드시 알아야 할 자바 성능 튜닝 이야기"

를 읽으며, 그리고 북스터디에서 해당 내용을 이슈로 다루며 Queue에는 어떤 구현체가 있고 어디에 사용되는 지를 알아보게 되었다.


Blocking Queue

Queue에 대한 개념은 생략하고 바로 Blocking Queue로 넘어가겠다.

먼저 BlockingQueuejava.util.concurrent 패키지에 포함된, 구현체가 아닌 인터페이스다.

 

img

Blocking Queue는 동시성 프로그래밍에서 사용되는 스레드 안전한 큐이다.
큐의 기본 작업에 블로킹 연산을 추가하여, 큐가 가득 찼을 때나 항목을 추가하려는 스레드나,
큐가 비었을 때 항목을 제거하려는 스레드를 대기 상태로 만든다.

 

블로킹 연산이란?

특정 조건이 충족될 때까지 스레드를 일시 중지시키는 연산으로, 연산이 완료될 때까지 스레드를 대기 상태로 만든다.

 

 

 

Blocking Queue의 특징, 일반 Queue와의 차이

  1. 스레드 안전:
    • BlockingQueue는 내부적으로 동기화되어 있어 여러 스레드에서 동시에 접근해도 안전하다.
    • 일반 Queue는 동시성을 지원하지 않는 경우도 많다. 예로, LinkedList나, PriorityQueue의 경우 여러 스레드에서 동시에 접근하면 데이터 일관성이 깨질 수 있다.
  2. 블로킹 연산:
    • 큐가 가득 찼을 때의 put 연산과 큐가 비었을 때의 take 연산이 블로킹된다. 이러한 연산들은 특정 조건이 충족될 때까지 스레드를 대기(Block)시킨다.
  3. 시간제한 있는 연산:
  • offer(E e, long timeout, TimeUnit unit)poll(long timeout, TimeUnit unit)로 대기 시간을 설정할 수 있다. 지정된 시간 내에 연산이 완료되지 않으면 타임아웃과 함께 실패한다.



 

BlockingQueue의 구현체

  1. ArrayBlockingQueue:

고정 크기의 배열을 기반으로 한 구현. 크기가 일단 설정되면 변경할 수 없고, 구현 시 크기를 지정해 주어야 한다.

  1. LinkedBlockingQueue:

연결 노드를 기반으로 한 구현. 선택적으로 최대 크기를 설정할 수 있다.

  1. PriorityBlockingQueue:

요소를 우선순위에 따라 저장하는 구현.

  1. SynchronousQueue:

단 하나의 항목만 저장할 수 있는 블로킹 큐. 이 큐에 항목을 넣으면 다른 스레드가 그 항목을 꺼낼 때까지 현재 스레드는 대기(블록)한다.



Java Code Example

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {

    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);  // 큐의 최대 크기는 5

        // 프로듀서 스레드
        Thread producer = new Thread(() -> {
            int value = 0;
            while (true) {
                try {
                    queue.put(value);
                    System.out.println("Produced " + value);
                    value++;
                    Thread.sleep(1000);  // 1초에 한 번 PUT
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 컨슈머 스레드
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    int value = queue.take();
                    System.out.println("Consumed " + value);
                    Thread.sleep(1500);  // 1.5초에 한 번 TAKE
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

위 코드를 보자.

설정

Producer 스레드는 1초당 1회 값을 넣고.
Consumer 스레드 1.5초당 1회 값을 소비한다.

BlockingQueue의 크기를 5로 설정하고 실행한다.

예상

크기가 5가 다 찰 때까지는 우리가 아는 일반 Queue와 똑같이 실행될 것이다.
크기가 다 찬 순간부터는 앞서 본 개념대로라면, Consumer 가 실행되어야만 Producer가 실행될 것이다.

실행 결과

... 생략 ...
Consumed 4
Produced 6
Produced 7
Consumed 5
Produced 8
Consumed 6
Produced 9  // 아직 5까지 안 차서
Produced 10  // Produced가 두 번 실행 된다.

... 중략 ...
Consumed 9  // 일정 시간 이후부터는 size = 5
Produced 14 // Consumed 해야만 Produced 된다.
Consumed 10
Produced 15
Consumed 11
Produced 16
Consumed 12
Produced 17
Consumed 13
Produced 18

앞선 개념처럼 사이즈가 다 찰 때까지, 대기 없이 동작하다가 사이즈가 가득 찬 시점 이후부터는 Consumed가 실행되어야 Produced가 실행되는 것을 알 수 있다.



 

 

 


DelayQueue

DelayQueue는 java.util.concurrent패키지에 포함된 동시성 유틸리티 중 하나로 BlockingQueue의 구현체이다.
요소가 지정된 지연 시간이 지날 때까지 가져올 수 없으며,
이는 스케쥴링 또는 재시도와 같은 연산에서 유용하게 사용될 수 있다.



DelayQueue 특징

  1. DelayQueue의 요소는 Delayed 인터페이스를 구현해야 한다.

Delayed 인터페이스를 구현하지 않았다면 컴파일 에러가 발생한다.

 

Delayed?

DelayQueue에서 얼마나 Delay 될지는 Delayed 인터페이스의 getDelay() 메서드를 통해서 제공된다.
따라서, Delayed 인터페이스를 구현하지 않으면 컴파일 에러가 발생한다.

 

 

  1. 내부적으로 PriorityQueue를 사용하여, 지연시간에 따라 자동으로 정렬된다.

img_1



Java Code Example

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {

    static class DelayedTask implements Delayed {
        private long delayUntil;
        private String taskName;

        public DelayedTask(String taskName, long delayInMillis) {
            this.taskName = taskName;
            this.delayUntil = System.currentTimeMillis() + delayInMillis;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayUntil - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.delayUntil, ((DelayedTask) o).delayUntil);
        }

        @Override
        public String toString() {
            return taskName;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        delayQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
        delayQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행

        long startTime = System.currentTimeMillis();
        System.out.println("Start: " + startTime);

        while (!delayQueue.isEmpty()) {
            DelayedTask task = delayQueue.take(); // 지연이 만료될 때까지 블로킹
            System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
        }
    }
}
Start: 1694526619763
Executed Task2 at 2008 // 실행 2초 후 발생
Executed Task1 at 5003 // 실해 5초 후 발생



DelayQueue는 지연시간에 따라 자동 정렬되기 때문에, 2초의 지연 시간을 갖는 Task 2가 먼저 실행이 되었다.
이후 3초 후에 Task1이 실행된 것을 확인하였다.

또한, DelayQueueBlockingQueue의 구현체이므로, 아래처럼 바꾸어 주어도 무방하다.
Java에서 Colletion 계열은 인터페이스로 선언을 해 주는 것이 유지보수 관점에서 좋다니,
아래 방법을 더 권장한다.

public static void main(String[] args) throws InterruptedException {
//        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        BlockingQueue<DelayedTask> blockingQueue = new DelayQueue<>();
        blockingQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
        blockingQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행

        long startTime = System.currentTimeMillis();
        System.out.println("Start: " + startTime);

        while (!blockingQueue.isEmpty()) {
            DelayedTask task = blockingQueue.take(); // 지연이 만료될 때까지 블로킹
            System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
        }
    }
Comments