olrlobt

[Java] CompletableFuture로 비동기 프로그래밍 구현하기 본문

Spring/Project

[Java] CompletableFuture로 비동기 프로그래밍 구현하기

olrlobt 2024. 10. 9. 22:18

비동기 프로그래밍

비동기 프로그래밍은 작업을 병렬로 실행하여 CPU의 효율을 극대화하고, 응답 시간을 줄이기 위해 중요한 기법이다. 특히 네트워크 요청, 파일 I/O, 데이터베이스 쿼리와 같이 시간이 오래 걸리는 작업을 처리할 때 유용하다. 비동기 프로그래밍을 사용하면 한 작업이 완료되기를 기다리지 않고 다른 작업을 병행해서 수행할 수 있어 애플리케이션 성능을 향상시킬 수 있다.

 

자바에서는 Thread, Runnable, Future 등을 사용해 비동기 작업을 처리할 수 있지만, 기존 방법들은 다소 복잡하거나 제한적일 수 있다. 

 


Future

Future는 Java 5에서 처음 도입된 인터페이스로, 비동기 작업의 결과를 나타내는 객체이다. 현재 실행 중인 작업이 완료될 때까지 기다리지 않고, 미래의 어느 시점에서 그 작업의 결과를 받을 수 있게 함으로써 비동기 프로그래밍을 구현할 수 있다. Java에서 멀티스레딩이나 병렬 처리를 구현할 때 사용된다.

 

Future, FutureTask 예제

Future는 인터페이스이기 때문에 비동기 작업을 직접 실행할 수 없다. 대신, 작업의 상태나 결과를 추적하는 역할을 한다. 비동기 작업을 실행하려면 Future의 구현체인 FutureTask를 사용해야 하는데, FutureTask는 Runnable과 Future를 모두 구현한 클래스로, 직접 스레드에서 실행할 수 있는 기능을 제공한다.

 

일반적으로 Future를 사용하기 위해서는 ExecutorService를 통해 비동기 작업을 처리해야 한다. 하지만 이 예제에서는 ExecutorService를 사용하지 않고, 스레드를 직접 생성하여 비동기 작업을 실행하는 방식을 사용했다.

 

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {

        Callable<String> task = () -> { // Callable 작업 생성
            Thread.sleep(2000); // 2초 동안 작업 수행
            return "작업 완료!";
        };

        FutureTask<String> futureTask = new FutureTask<>(task); // FutureTask 생성

        Thread thread = new Thread(futureTask); // Thread를 통해 FutureTask 실행
        thread.start();
        System.out.println("작업을 제출하고 다른 작업을 수행 중입니다...");

        try {
            while (!futureTask.isDone()) { // 비동기 작업이 완료될때까지 기다린다
                System.out.println("작업이 아직 완료되지 않았습니다...");
                Thread.sleep(500); // 0.5초 간격으로 작업 상태 확인
            }
            String result = futureTask.get();  // FutureTask에서 결과를 가져옴 // 작업이 완료될 때까지 블로킹
            System.out.println("FutureTask 결과: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

 

간단하게 2초가 소요되는 다른 작업을 한다고 가정하고 비동기 프로그래밍이 정상적으로 작동하는지 확인했다. Callable 작업을 FutureTask를 통해 비동기적으로 실행하고, 그 상태와 결과를 추적하는 식으로 작동한다.

 

작업을 제출하고 다른 작업을 수행 중입니다...
작업이 아직 완료되지 않았습니다...
작업이 아직 완료되지 않았습니다...
작업이 아직 완료되지 않았습니다...
작업이 아직 완료되지 않았습니다...
FutureTask 결과: 작업 완료!

Process finished with exit code 0

 

출력 결과에서 확인할 수 있듯이, 비동기 작업이 진행되는 동안 while문에서 로그가 정상적으로 출력되고, 작업이 완료되면 그 결과를 출력하게 된다.

 

 

Future의 한계

  • 콜백 기능 부족: Future는 비동기 작업이 완료되면 그 결과를 받을 수 있지만, 작업이 완료되었을 때 자동으로 실행되는 콜백 처리가 없다. 즉, 작업 완료 후 추가 작업을 지정할 수 없다.
  • 예외 처리 제한적: Future는 비동기 작업 도중 발생한 예외를 처리하는 데 있어 다소 제한적이다. 예외가 발생한 경우에도 get()을 호출해야만 해당 예외를 확인할 수 있다.
  • 작업 병합 및 흐름 제어 부족: Future는 여러 작업을 조합하여 처리하는 데 있어 유연성이 부족하다. 후속 작업을 연결하거나 복합적인 비동기 작업을 다루기 어렵다.

 

이러한 한계를 보완하기 위해, Java 8에서는 CompletableFuture가 도입되었다. CompletableFuture는 콜백 처리, 예외 처리, 작업 병합 등의 기능을 제공하여, 더 유연하고 강력한 비동기 프로그래밍을 지원한다.

 


CompletableFuture

CompletableFuture는 Java 8에서 도입된 클래스로, 비동기 프로그래밍을 보다 유연하게 처리할 수 있게 해 준다.

Future의 기능을 확장하여, 비동기 작업의 결과뿐만 아니라, 콜백 작업을 지정할 수 있고, 여러 비동기 작업을 결합하거나 체인으로 연결할 수 있는 다양한 기능을 제공한다. 

 

supplyAsync(), runAsync()  : 비동기 작업 실행

supplyAsync()와 runAsync()를 사용해 비동기 작업을 실행할 수 있다.

supplyAsync()는 값을 반환하는 작업을, runAsync()는 반환 값이 없는 작업을 처리할 때 사용된다.

 

// supplyAsync()는 반환 값이 없다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
    try {
        System.out.println("비동기 작업 시작 .. ");
        Thread.sleep(2000); // 2초 동안 데이터 처리 시뮬레이션
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "비동기 작업 완료 !";
});

// runAsync()는 반환 값이 없다.
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("비동기 작업 시작...");
        Thread.sleep(2000);  // 2초 동안 작업 수행
        System.out.println("비동기 작업 완료!");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

 

 

 

join(), get()  : 비동기 작업 처리

join()과 get()은 둘 다 CompletableFuture에서 비동기 작업이 완료될 때까지 기다리는 역할을 한다.

둘의 차이는 예외 처리의 필요성인데, get() 메서드는 InterruptedException과 ExecutionException을 발생시킬 수 있다. 반면 join()은 예외를 처리하지 않고, 런타임 예외로 던진다. 따라서 서비스에 따라 적절한 메서드를 선택하는 것이 중요하다.

 

// get()은 예외 처리 필요
try {
    String result = future.get(); // 작업이 완료될 때까지 기다림
    System.out.println("비동기 작업 결과: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
        
// join()은 예외 처리 불필요       
String result = future.join(); // 작업이 완료될 때까지 기다림
System.out.println("비동기 작업 결과: " + result);

 

 

runAsync()를 사용하여 반환 값이 없는 경우에도, join()과 get()을 이용하여 반환 값 없이 처리하면 된다.

future.join();  // 작업이 끝날 때까지 대기
System.out.println("모든 작업이 완료되었습니다.");

 

 

 

 

thenApply(), thenAccept(), thenRun() : 콜백 처리

CompletableFuture는 다양한 콜백 메서드를 제공하며, 이를 통해 작업이 완료된 후 추가 작업을 처리할 수 있다. 주요 메서드는 다음과 같으며 차이가 있으니 상황에 맞게 잘 사용하자.

  1. thenApply(): 비동기 결과 값을 변환해서 다른 값으로 반환하는 메서드
  2. thenAccept(): 비동기 결과를 반환하지 않고, 단순히 처리하는 메서드
  3. thenRun(): 작업의 결과 값을 사용하지 않고, 그저 후속 작업만 실행하는 메서드

thenApply(), thenAccept(), thenRun()는 이전 작업이 완료된 후 같은 스레드에서 즉시 실행된다. 이 콜백 작업 역시 비동기적으로 작업되기를 원하면 thenApplyAsync(), thenAcceptAsync(), thenRunAsync()를 사용하여 다른 스레드에서 비동기적으로 실행할 수 있다.

 

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);  // 2초 지연
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "작업 완료!";
        });

        future.thenAccept(result -> { // 작업 결과를 받아 출력
            System.out.println("비동기 작업 결과: " + result);
        });


    // Thread.sleep(3000);  // 비동기 작업이 끝날 때까지 대기해야 결과 확인 가능
    }
}

 

 

 

 

exceptionally(), handle(), whenComplete() : 예외처리

CompletableFuture에서 예외 처리는 비동기 작업 도중 발생할 수 있는 예외를 처리하기 위한 방법을 제공한다. 주로 exceptionally(), handle(), whenComplete() 같은 메서드를 사용하여 예외를 처리할 수 있다.

 

 

exceptionally()는 예외가 발생한 경우 기본 값을 반환하거나 예외 처리 로직을 수행하는 데 사용된다.

import java.util.concurrent.CompletableFuture;

public class ExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Something went wrong!");
            }
            return "Success!";
        }).exceptionally(ex -> {
            System.out.println("Exception occurred: " + ex.getMessage());
            return "Default Value";
        });

        // 결과 출력
        System.out.println(future.join());  // "Default Value"
    }
}

 

 

handle()는 정상적으로 완료되든 예외가 발생하든 상관없이 결과를 처리할 수 있다. 이 메서드는 두 개의 인자를 받으며, 하나는 결과 값이고, 다른 하나는 예외이다.

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Something went wrong!");
            }
            return "Success!";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Exception occurred: " + ex.getMessage());
                return "Handled Error";
            }
            return result;
        });

        // 결과 출력
        System.out.println(future.join());  // "Handled Error"
    }
}

 

 

이 외에도 whenComplete()는 비동기 작업이 완료된 후 결과와 예외를 처리할 수 있다. 이 메서드는 결과를 반환하지 않고, 단순히 완료 후 후속 작업을 수행하는 데 사용하고 handle()과 달리 반환 값을 변경할 수는 없지만, 결과나 예외에 대해 후처리 할 수 있다.

또한, completeExceptionally()는 특정 시점에서 의도적으로 예외를 발생시키고 싶은 경우에 사용된다.

이처럼 다양한 예외처리 메서드를 제공하기 때문에, 서비스와 의도에 맞게 선택해서 사용하는 것이 중요하다.

 

 

 

thenCompose(), thenCombine(), allOf(), anyOf() : 작업 결합

 

thenCompose()는 두 개의 비동기 작업을 순차적으로 연결할 때 사용된다. 첫 번째 비동기 작업이 완료된 후 그 결과를 받아 또 다른 비동기 작업을 실행할 때 유용하다. 예를 들어, 첫 번째 작업의 결과에 따라 두 번째 비동기 작업이 실행되는 경우에 사용할 수 있다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));

// 결과 출력
System.out.println(future.join());  // 출력: "Hello World"

 

이렇게만 생각해 보면 thenApplyAsync()와 차이가 별로 없어 보인다. 실제로도 거의 차이는 없고, 목적과 반환 값에서만 약간의 차이가 있다. thenApplyAsync()의 경우에는  결과 값을 받아 변환하는 것에 목적을 두고, thenCompose()의 경우에는 결과를 받아 새로운 비동기 작업을 실행하는 것에 목적을 둔다. 또한, thenApplyAsync()의 경우에는 변환된 값(<T>)이 반환값이 되는 반면, thenCompose()는 새로운 비동기 작업인 (CompletableFuture <T>) 형식으로 반환된다.

 

 

 

thenCombine()은 두 개의 독립적인 비동기 작업을 동시에 실행하고, 두 작업이 완료된 후 그 결과를 조합하여 새로운 결과를 반환하는 데 사용된다. 두 작업이 동시에 수행되지만 결과는 서로 결합된다.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);

// 결과 출력
System.out.println(combinedFuture.join());  // 출력: "Hello World"

 

 

CompletableFuture.allOf()는 여러 개의 비동기 작업이 모두 완료될 때까지 기다리는 메서드이다. 여러 비동기 작업이 완료되면, 각각의 결과를 처리할 수 있어서 주로 여러 작업이 모두 완료되어야 할 때 유용하게 사용된다.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3");

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

// 모든 작업이 완료된 후 결과 수집
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
    return List.of(future1, future2, future3).stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
});

// 결과 출력
allResults.get().forEach(System.out::println);  // 출력: "Task 1", "Task 2", "Task 3"

 

 

CompletableFuture.anyOf()는 여러 개의 비동기 작업 중 가장 먼저 완료된 작업의 결과를 반환하는 메서드이다. 여러 작업 중 하나라도 빨리 완료되면 그 작업의 결과를 바로 사용할 수 있을 때 유용하다.

 

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(300); } catch (InterruptedException e) {}
    return "Task 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(100); } catch (InterruptedException e) {}
    return "Task 2";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

// 가장 먼저 완료된 작업의 결과 출력
System.out.println(anyOfFuture.join());  // 출력: "Task 2"

 

 

 


CompletableFuture 병렬 처리 활용하여 개인프로젝트 개선

 

처음 CompletableFuture를 알아보게 된것은 개인 프로젝트 성능 개선의 목적이 컸다. 이 전 포스팅부터 개인프로젝트 개선을 진행하면서 성능적인 부분을 많이 개선하려고 집중하고 있는데, 특히 오래 걸리는 비즈니스 로직이 여러 단계에 거쳐 직렬적으로 연결되어 있었다.

 

이 부분을 병렬적으로 처리하면, 한 SVG태그에 여러 부분을 동시에 그리게 되니까 성능이 많이 개선될 것으로 예상했다.

 

SVG이미지를 생성하는데 많은 개선을 했지만, 평균 700ms의 시간이 소요됐다.
SVG이미지를 생성하는데 많은 개선을 했지만, 평균 700ms의 시간이 소요됐다.

 

먼저 개선에 앞서 기존 API 호출 시간을 Profiler를 통해 측정했다.

SVG 태그를 만드는 createSvgImageBox()가 평균적으로 700ms의 시간이 소요되었고, 여기에 블로그를 크롤링하는 과정까지 합치면 API 호출 시간이 약 1초 이상 걸리게 된다.

 

drawBackground(svgGenerator, postingBase);
drawThumbnail(posting, svgGenerator, postingBase);
drawText(posting, svgGenerator, postingBase);
drawFooter(posting, svgGenerator, postingBase);
drawAuthorImg(posting, svgGenerator, postingBase);
drawAuthorText(posting, svgGenerator, postingBase);
drawWatermark(posting, svgGenerator, postingBase);
drawStroke(svgGenerator, postingBase);

 

기존의 로직은 위와 같이 작성되어 있었다.

배경 그리기, 썸네일 그리기, 제목, 내용, 날짜, 작성자 정보 등 모든 작업을 단계적으로 처리하는 로직이었다.

 

그리고 이 과정을 CompletableFuture를 이용하여 병렬처리하였다.

최종적으로 완성된 코드는 아래와 같다.

CompletableFuture<Void> backgroundTask = CompletableFuture.runAsync(() -> drawBackground(finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> thumbnailTask = CompletableFuture.runAsync(() -> drawThumbnail(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> textTask = backgroundTask.thenRunAsync(() -> drawText(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> footerTask = textTask.thenRunAsync(() -> drawFooter(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> authorImgTask = backgroundTask.thenRunAsync(() -> drawAuthorImg(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> authorTextTask = footerTask.thenRunAsync(() -> drawAuthorText(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> watermarkTask = CompletableFuture.runAsync(() -> drawWatermark(posting, finalSvgGenerator, finalPostingBase));
CompletableFuture<Void> strokeTask = CompletableFuture.allOf(thumbnailTask, authorTextTask)
        .thenRunAsync(() -> drawStroke(finalSvgGenerator, finalPostingBase));

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
        backgroundTask, thumbnailTask, textTask, footerTask, authorImgTask, authorTextTask, watermarkTask, strokeTask);

allTasks.get();

 

여기서 중요한 것은 allOf() 메서드를 이용하여 병렬적으로 한 번에 처리하고 있는 것으로 보이지만, 사실 자세히 보면 엄연히 순서가 존재한다.

 

예를 들어, 백그라운드 작업이 완료된 후에 텍스트를 처리해 주고, 그 후에 Footer나 테두리 작업을 진행하는 순차적인 방식으로 코드를 작성했다. 이렇게 한 이유는 내가 사용한 svgGenerator객체의 설정 값이 공유되기 때문이었는데, 배경을 칠할 때 Color값을 White를 주었다면, 병렬 처리 과정에서 Text를 그려줄 때도 White가 설정되는 오류가 발생할 수 있었다.

 

따라서, 순서가 필요한 작업은 의도적으로 순차적으로 연결하여 오류를 줄이며 성능을 끌어올렸다.

 

비동기 프로그래밍을 도입하여 331ms까지 성능을 끌어올렸다
비동기 프로그래밍을 도입하여 331ms까지 성능을 끌어올렸다

 

그 결과, createSvgImageBox()의 호출시간이 664ms -> 331ms로 약 50.15% 개선되었다

 

 

 

Comments