olrlobt
[Java] 블록킹큐(Blocking Queue)와 딜레이 큐(Delay Queue) 본문
Queue
를 사용해 오면서 구현체를 만들 때 new
를 치면 자동 완성 되는 수많은 구현체들을 보면서도, 아무 관심도 주지 않고 내가 쓰는 구현체만 써 왔었다.
"개발자가 반드시 알아야 할 자바 성능 튜닝 이야기"
를 읽으며, 그리고 북스터디에서 해당 내용을 이슈로 다루며 Queue
에는 어떤 구현체가 있고 어디에 사용되는 지를 알아보게 되었다.
Blocking Queue
Queue에 대한 개념은 생략하고 바로 Blocking Queue
로 넘어가겠다.
먼저 BlockingQueue
는 java.util.concurrent
패키지에 포함된, 구현체가 아닌 인터페이스다.
Blocking Queue
는 동시성 프로그래밍에서 사용되는 스레드 안전한 큐이다.
큐의 기본 작업에 블로킹 연산
을 추가하여, 큐가 가득 찼을 때나 항목을 추가하려는 스레드나,
큐가 비었을 때 항목을 제거하려는 스레드를 대기 상태로 만든다.
블로킹 연산이란?
특정 조건이 충족될 때까지 스레드를 일시 중지시키는 연산으로, 연산이 완료될 때까지 스레드를 대기 상태로 만든다.
Blocking Queue의 특징, 일반 Queue와의 차이
- 스레드 안전:
BlockingQueue
는 내부적으로 동기화되어 있어 여러 스레드에서 동시에 접근해도 안전하다.- 일반 Queue는 동시성을 지원하지 않는 경우도 많다. 예로,
LinkedList
나,PriorityQueue
의 경우 여러 스레드에서 동시에 접근하면 데이터 일관성이 깨질 수 있다.
- 블로킹 연산:
- 큐가 가득 찼을 때의
put
연산과 큐가 비었을 때의take
연산이 블로킹된다. 이러한 연산들은 특정 조건이 충족될 때까지 스레드를 대기(Block)시킨다.
- 큐가 가득 찼을 때의
- 시간제한 있는 연산:
offer(E e, long timeout, TimeUnit unit)
및poll(long timeout, TimeUnit unit)
로 대기 시간을 설정할 수 있다. 지정된 시간 내에 연산이 완료되지 않으면 타임아웃과 함께 실패한다.
BlockingQueue의 구현체
ArrayBlockingQueue
:
고정 크기의 배열을 기반으로 한 구현. 크기가 일단 설정되면 변경할 수 없고, 구현 시 크기를 지정해 주어야 한다.
LinkedBlockingQueue
:
연결 노드를 기반으로 한 구현. 선택적으로 최대 크기를 설정할 수 있다.
PriorityBlockingQueue
:
요소를 우선순위에 따라 저장하는 구현.
SynchronousQueue
:
단 하나의 항목만 저장할 수 있는 블로킹 큐. 이 큐에 항목을 넣으면 다른 스레드가 그 항목을 꺼낼 때까지 현재 스레드는 대기(블록)한다.
Java Code Example
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 큐의 최대 크기는 5
// 프로듀서 스레드
Thread producer = new Thread(() -> {
int value = 0;
while (true) {
try {
queue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000); // 1초에 한 번 PUT
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 컨슈머 스레드
Thread consumer = new Thread(() -> {
while (true) {
try {
int value = queue.take();
System.out.println("Consumed " + value);
Thread.sleep(1500); // 1.5초에 한 번 TAKE
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
위 코드를 보자.
설정
Producer 스레드는 1초당 1회 값을 넣고.
Consumer 스레드 1.5초당 1회 값을 소비한다.
BlockingQueue
의 크기를 5로 설정하고 실행한다.
예상
크기가 5가 다 찰 때까지는 우리가 아는 일반 Queue
와 똑같이 실행될 것이다.
크기가 다 찬 순간부터는 앞서 본 개념대로라면, Consumer
가 실행되어야만 Producer
가 실행될 것이다.
실행 결과
... 생략 ...
Consumed 4
Produced 6
Produced 7
Consumed 5
Produced 8
Consumed 6
Produced 9 // 아직 5까지 안 차서
Produced 10 // Produced가 두 번 실행 된다.
... 중략 ...
Consumed 9 // 일정 시간 이후부터는 size = 5
Produced 14 // Consumed 해야만 Produced 된다.
Consumed 10
Produced 15
Consumed 11
Produced 16
Consumed 12
Produced 17
Consumed 13
Produced 18
앞선 개념처럼 사이즈가 다 찰 때까지, 대기 없이 동작하다가 사이즈가 가득 찬 시점 이후부터는 Consumed
가 실행되어야 Produced
가 실행되는 것을 알 수 있다.
DelayQueue
DelayQueue는 java.util.concurrent패키지에
포함된 동시성 유틸리티 중 하나로 BlockingQueue
의 구현체이다.
요소가 지정된 지연 시간이 지날 때까지 가져올 수 없으며,
이는 스케쥴링 또는 재시도와 같은 연산에서 유용하게 사용될 수 있다.
DelayQueue 특징
DelayQueue
의 요소는Delayed
인터페이스를 구현해야 한다.
Delayed
인터페이스를 구현하지 않았다면 컴파일 에러가 발생한다.
Delayed?
DelayQueue에서 얼마나 Delay 될지는 Delayed 인터페이스의 getDelay() 메서드를 통해서 제공된다.
따라서, Delayed 인터페이스를 구현하지 않으면 컴파일 에러가 발생한다.
- 내부적으로 PriorityQueue를 사용하여, 지연시간에 따라 자동으로 정렬된다.
Java Code Example
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
static class DelayedTask implements Delayed {
private long delayUntil;
private String taskName;
public DelayedTask(String taskName, long delayInMillis) {
this.taskName = taskName;
this.delayUntil = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayUntil - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.delayUntil, ((DelayedTask) o).delayUntil);
}
@Override
public String toString() {
return taskName;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
delayQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행
long startTime = System.currentTimeMillis();
System.out.println("Start: " + startTime);
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take(); // 지연이 만료될 때까지 블로킹
System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
}
}
}
Start: 1694526619763
Executed Task2 at 2008 // 실행 2초 후 발생
Executed Task1 at 5003 // 실해 5초 후 발생
DelayQueue
는 지연시간에 따라 자동 정렬되기 때문에, 2초의 지연 시간을 갖는 Task 2가 먼저 실행이 되었다.
이후 3초 후에 Task1이 실행된 것을 확인하였다.
또한, DelayQueue
는 BlockingQueue
의 구현체이므로, 아래처럼 바꾸어 주어도 무방하다.
Java에서 Colletion
계열은 인터페이스로 선언을 해 주는 것이 유지보수 관점에서 좋다니,
아래 방법을 더 권장한다.
public static void main(String[] args) throws InterruptedException {
// DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
BlockingQueue<DelayedTask> blockingQueue = new DelayQueue<>();
blockingQueue.add(new DelayedTask("Task1", 5000)); // 5초 후 실행
blockingQueue.add(new DelayedTask("Task2", 2000)); // 2초 후 실행
long startTime = System.currentTimeMillis();
System.out.println("Start: " + startTime);
while (!blockingQueue.isEmpty()) {
DelayedTask task = blockingQueue.take(); // 지연이 만료될 때까지 블로킹
System.out.println("Executed " + task + " at " + (System.currentTimeMillis() - startTime));
}
}
'Java > 자료구조' 카테고리의 다른 글
[JAVA/자료구조] 리스트 List, Collections 클래스와 메서드의 활용 (0) | 2023.01.14 |
---|---|
[JAVA/자료구조] 배열 Array, Arrays 클래스와 메서드의 활용 (1) | 2023.01.13 |