1. Spring Webflux 비동기 처리 흐름의 이해 - Netty 편
2. Spring Webflux 비동기 처리 흐름의 이해 - Reactor 편
이번 편에서는 1편에 이어서 Reactor의 동작을 알아보고, Netty와 Reactor가 Webflux에서 어떻게 연결되어 처리되는지 까지 알아보도록 하겠습니다.
Reactor의 동작
Netty는 저수준 네트워크 I/O를 다루는 반면, Reactor는 개발자에게 편리한 비동기 프로그래밍을 위해 비교적 고수준의 API를 제공하기 때문에 보다 이해가 쉽습니다.
이 글에서는 Reactor의 Reactive Streams 표준, Publisher-Subcriber 구조 등 의 구체적인 이해보다는 예시 코드를 이용해 동작의 흐름을 이해하는데 집중합니다.
Mono, Flux를 비유하여 이해하기
Reactor 에서 핵심이되는 클래스는 Mono와 Flux입니다. 이들은 데이터를 제공하는 객체로 Publisher라고 불립니다.
우리에게 친숙한 Stream과 비교하며 공통점과 차이점을 알아보도록 하겠습니다.
@Test
public void example() {
// Stream
List<Integer> squaredData = Stream.of(1, 2, 3)
.map(this::square)
.collect(Collectors.toList());
// Reactor
List<Integer> numberData = List.of(1, 2, 3);
Disposable disposable = Flux.fromIterable(numberData)
.map(this::square)
.subscribe(System.out::println);
}
private int square(int i) {
int result = i * i;
System.out.println(result);
return i;
}
위 코드는 Stream과 Reactor Publisher를 활용하여 데이터를 제곱 처리하는 간단한 로직입니다. 위 코드를 통해 Stream과 Publisher API 의 공통점, 차이점을 분석해보도록 하겠습니다.
공통점
- 데이터를 선언하고, 중간 연산을 통해 처리하고, 최종 연산을 호출하는 형태
- 최종 연산을 호출할 때 까지 데이터 연산이 실제로 수행되지 않음 (Lazy Evaluation)
차이점
Stream과 Publisher의 최종연산의 성격에서 차이를 보입니다.
- Stream: 일반적으로 데이터 처리 결과를 특정 자료구조로 반환
- Publisher: 최종 연산으로 구독을 수행하고 데이터 처리 결과를 반환받는 대신 구독자가 데이터 스트림에 대한 구독을 취소하는 등의 제어를 위한 Disposable 객체를 반환
이러한 차이점의 원인은 Reactor 비동기 프로그래밍 특성에 있습니다. 비동기 프로그래밍에서는 데이터 처리가 어떤 시점에 끝날지 모르기 때문입니다. 만약 연산 결과를 특정 타입으로 반환받게 된다면 이를 위해 Thread가 Blocking 되게 될 것입니다. 따라서 Publisher는 데이터 처리 결과를 이용한 로직을 콜백으로 등록하여 수행하도록 합니다.
실제로 예시 코드에서도 데이터 처리가 완료 된 뒤 subscribe() 메소드에 System.out.println(data) 를 콜백으로 등록한 것을 확인할 수 있습니다.
Publisher - Subscriber 모델
위에서 언급된 Publisher, Subscriber (발행-구독) 모델에 대한 개념을 조금 더 살펴보겠습니다.
Reactor는 비동기 데이터 흐름 처리를 Publisher-Subscriber 모델로 구현했습니다.
Publisher-Subscriber 이라는 용어때문에 Publish 하는 주체와 Subscribe 하는 주체가 시스템 내외부에 독립적으로 존재하는 듯한 착각을 부를 수 있지만, 실제로는 위에 첨부한 예시 코드 처럼 발행과 구독 모두 개발자가 프로그래밍하는 코드 안에서 일어나는 동작입니다.
또한 위 설명에서 알 수 있듯이 구독이 일어나기 전까지는 아무런 작업이 일어나지 않습니다. Publisher는 데이터를 생성하고 연산을 수행하는 일련의 작업을 정의해 둘 뿐입니다. 이 작업은 subscribe() 를 호출해서 구독자가 생성될 때 트리거되어 데이터 생성 및 연산을 처리하고 처리 결과를 구독자에게 전달합니다.
데이터 생성과 소비
Publisher를 이용해 데이터를 생성하는 방법은 크게 즉시 평가와 지연 평가로 나눌 수 있습니다.
Sink를 사용해서 프로그래밍 방식으로 데이터 생성 흐름을 제어하거나 실시간 데이터 스트리밍하는 방식도 있으나 이 주제는 따로 다뤄보도록 하겠습니다.
즉시 평가 (Eager Evaluation)
즉시 평가는 일반적인 프로그래밍에서 사용하는 방식과 동일하게, 데이터를 즉시 계산하고 메모리에 할당하는 것 입니다.
// 연산이 복잡하지 않으며 I/O 바운드 작업이 일어나지 않는 작업
List<Integer> numberData = List.of(1, 2, 3);
// 메모리에 있는 값을 즉시 사용
Flux.fromIterable(numberData)
.subscribe();
즉시 평가는 위 코드 예시 처럼 이미 준비된 데이터 소스를 사용하거나, 가벼운 데이터 소스를 사용할 때 활용합니다.
또는, 구독을 여러번 해도 일관된 데이터를 제공해야 할 때도 활용할 수 있습니다.
즉, CPU 작업이 크지 않고 파일 읽기/쓰기, 네트워크 통신 등의 Blocking 작업이 없는 경우에 적합합니다.
사용할 수 있는 API는 Mono.just(), Flux.just(), Flux.fromIterable() 이 있습니다.
지연 평가 (Lazy Evaluation)
지연 평가는 구독이 일어나는 순간에 데이터를 준비합니다.
@Test
public void example2() {
Mono.fromCallable(() -> readFile("example.txt"))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
public List<String> readFile(String filePath) throws IOException {
List<String> fileContents = new ArrayList<>();
FileReader fileReader = new FileReader(filePath);
StringBuilder content = new StringBuilder();
//read file ...
return content.toString();
}
지연 평가는 즉시 평가와 반대로 I/O 작업이 있거나, CPU 부하 작업이 있을 때 적합합니다. 또는 여러번의 구독이 일어날 때, 실시간 데이터를 제공해야 한다면 지연 평가를 사용해야 합니다. 위 코드 예시에서는 Disk I/O가 발생하는 readFile() 메소드를 지연평가 기능으로 수행하였습니다.
만약, readFile() 메소드를 직접 호출한다면 파일을 읽는 동안 Thread가 Blocking 될 것입니다. 그리고 Webflux 환경에서 Blocking 된 Thread가 EventLoop Thread 였다면 애플리케이션 성능에도 큰 악영향을 줄 것입니다. 그렇기 때문에 구독할 때 파일 읽기를 수행할 수 있도록 callback 방식으로 데이터를 생성합니다.
Reactor는 주요 작업을 처리하는 Thread를 차단하지 않고 효율적인 비동기 작업을 수행할 수 있도록 합니다. 이를 돕는 Scheduler라는 객체가 있으며, 위 코드에서도 찾아볼 수 있습니다.
Scheduler는 작업 유형별로 알맞은 실행 정책 혹은 적절한 Thread를 선택할 수 있도록 돕는 역할을 하며, 다음과 같은 실행 컨텍스트를 선택할 수 있습니다.
- Schedulers.immediate(): 현재 스레드에서 작업 실행 (명시하지 않으면 기본적으로 immediate() 와 같이 동작합니다.)
- Schedulers.boundedElastic(): I/O 작업에 적합한 스레드 풀
- Schedulers.single(): 단일 재사용 가능한 스레드에서 작업 실행
- Schedulers.parallel(): 고정된 크기의 워커 스레드 풀에서 작업 실행 (CPU 코어 수에 맞게 최적화 되어 있음)
- Schedulers.fromExecutorService(): 커스텀 ExecutorService를 기반으로 스케줄러 생성
Webflux에서는 높은 CPU 부하 작업을 하는 것을 지양해야 하고, 주로 서버 간 통신 (Network I/O) 가 많이 발생하기 때문에 boundedElastic()의 활용도가 높습니다.
Schedulers가 제공하는 컨텍스트는 subscribesOn(), publishOn() 메소드로 설정할 수 있습니다.
비동기 작업의 작업 연결
Publisher는 생성한 데이터 소스로부터 추가적인 작업을 하기 위해 작업 연결을 위한 API를 제공합니다.
Webflux로 웹 서버 개발을 하게 되면 주로 외부 웹 서버, Database Server, Redis Server 와 통신합니다. 이러한 통신을 위해 WebClient, R2dbc, ReactiveRedis 등의 Reactive 라이브러리를 사용하고 이들은 비동기 통신을 이미 구현하여 제공합니다. 따라서 고수준에서 개발하는 입장에서는 비동기 통신을 구현을 통해 데이터 소스를 생성하는 작업보다는 제공되는 데이터 소스들을 잘 연결하고 활용하는 작업을 주로 하게 됩니다.
이 글에서는 다양한 연산자를 다루기 보다는 비동기 작업 연결에 대한 간단한 예시 코드를 통한 설명을 제공합니다.
사전 환경
예시 코드를 제공하기 위해 RDBMS에서 일대다 매핑 관계를 가지는 Post, Comment 스키마를 가정합니다. 또한 R2DBC 를 사용해서 데이터베이스와 통신하는 상황을 가정합니다. R2DBC에 대한 배경지식이 없더라도 Spring data JPA를 사용해보셨다면 무리 없이 이해할 수 있을 것 입니다.
@Table("post")
public class Post {
@Id
@GeneratedValue
private Long id;
private String content;
}
@Table("comment")
public class Comment {
@Id
@GeneratedValue
private Long id;
private Long postId;
private String content;
}
이에 대한 Mapping Class로 위와 같은 객체를 정의합니다. JPA의 Entity와 같이 데이터베이스와 매핑되는 객체라고 이해해주시면 됩니다. 대신 R2dbc는 ORM이 아니기 때문에 Order와 OrderItem간에 객체로 의존하지 않고, RDBMS 테이블과 같이 PK - FK 관계를 가집니다.
A 비동기 작업의 결과를 이용해서 B 비동기 작업을 처리
대표적인 비동기 작업 예시입니다. 여기서는 게시글 목록을 검색하고, 각 게시물에 대한 Id로 각각의 댓글 갯수를 검색하는 시나리오를 가정합니다.
interface PostRepository extends R2dbcRepository<Post, Long> {
// 검색 조건을 이용해 조건에 부합하는 게시글 목록을 조회합니다.
Flux<Post> findAllBySomethingConditions(Conditions conditions);
}
interface CommentRepository extends R2dbcRepository<Comment, Long> {
// 게시글의 아이디로 댓글 갯수를 조회합니다.
Mono<Long> countByPostId(Long postId);
}
이를 위해 위와 같은 데이터 인터페이스를 정의했습니다.
public Flux<PostDto> get(Conditions conditions) {
return postRepository.findAllByConditions(conditions)
.flatMap(post -> commentRepository.countByPostId(post.getId())
.map(commentCount -> PostDto.from(post, commentCount)));
}
(위 코드는 예시일 뿐이므로 N+1과 같은 문제는 고려하지 않습니다.)
이전 작업의 결과를 사용해서 다음 비동기작업을 수행하기 위해서는 flatMap() API를 사용할 수 있습니다. 위 코드에서는 게시글 목록을 조회하고, 각각의 게시글 Id로 게시글 댓글 갯수를 조회하여 게시글과 댓글 갯수 정보를 합쳐서 PostDto를 생성합니다.
좀 더 자세한 흐름을 다음과 같습니다.
- R2DBC는 findAllByConditions() 를 통한 쿼리 작업을 비동기로 처리할 수 있도록 하는 Publisher A를 반환합니다. R2DBC는 비동기 작업을 위해 boundedElastic() 스케쥴러(비동기 작업 스레드 풀)를 사용합니다.
- countByPostId() 작업 또한 비동기로 처리할 수 있는 Publisher B를 반환합니다. (여기서 Publisher B는 여러개의 인스턴스가 될 수 있습니다.)
- flatMap()에는 게시물 검색 결과로부터 생성된 데이터 소스를 통해 처리할 새로운 비동기 작업을 등록합니다. flatMap() 내부적으로는 다음 비동기 작업으로 반환되는 Mono<PostDto> 를 구독합니다.
- 모든 데이터 소스에 대해 작업이 완료되면 flatMap()이 이들을 다시 하나의 Flux<PostDto>로 합칩니다 (평면화 라는 용어가 주로 사용됨).

코드 예시 작업이 구독된다면 위 그림과 같이 동작합니다.
중요한 부분은 게시글 검색 결과에 대한 데이터는 순차적으로 전달되지만, flatMap()에서 처리하는 댓글 갯수 검색 작업은 비동기로 병렬적으로 처리되기 때문에 순서가 보장되지 않는다는 것 입니다.
만약 게시물 검색 기능에 정렬 쿼리가 포함되어 있었다면 원치 않는 결과가 나올수 있을 것 입니다.
flatMap(), flatMapSequential(), concatMap()
Reactor는 비동기 작업에 대한 순서를 보장하기 위해 flatMapSequential(), concatMap() 등의 api 도 제공합니다.
flatMapSequential()은 flatMap() 그림과 같이 비동기 작업(댓글 조회)을 병렬적으로 처리하지만, 이를 다시 병합할때 원본 데이터의 순서대로 병합해줍니다. 즉, 빠르게 수행하면서 순서를 보장할 수 있습니다.
concatMap()은 원본 데이터의 순서대로 비동기 작업을 처리(게시물1번에 대한 댓글 갯수 조회 -> 게시물2에 대한 댓글 갯수 조회 -> .. ) 합니다. 이는 작업 결과 뿐만 아니라 작업 순서까지 엄격하게 지켜져야하는 트랜잭션 처리 등에 사용할 수 있습니다.
flatMap() vs map()
flatMap()은 Function<T, Mono<R>> 타입을 매개변수로 받습니다. 즉, 특정 타입을 전달받아 새로운 Publisher를 반환하는 콜백을 등록해야합니다. 위 예시에서 사용했듯이, 이전 작업의 결과로 새로운 비동기 작업(네트워크, 디스크 I/O등) 을 해야할 때 주로 사용합니다.
map()은 Function<T, R> 타입을 매개변수로 받습니다. 이는 Stream의 map과 비슷한 동작을하며, 비동기 작업 호출이 아닌 단순 변환작업에 주로 사용합니다.
이외에도 정말 많은 종류의 연산자가 있지만, flatMap() 과 같은 기본적인 비동기 작업 흐름을 이해한다면 다른 연산자도 쉽게 이해하고 사용할 수 있을 것 입니다.
Webflux에서 Netty + Reactor 전체 흐름
이제 이전 편에서 다룬 Netty와 Reactor의 작업 흐름을 전체적으로 그려보도록 하겠습니다.
위 그림은 다음 상황을 나타냅니다.
- X축은 타임라인, Y축은 서로 다른 스레드를 의미합니다.
- 하나의 Boss Thread와 하나의 Worker Thread만 있는 환경을 가정합니다.
그리고 다음과 같이 작업을 처리합니다.
- 여러 클라이언트가 게시글 목록 조회 API를 호출합니다.
- Boss Thread는 요청 수락만 처리하고 Worker Thread가 요청이 준비되면 요청을 컨트롤러로 전달합니다.
- Worker Thread에서 단순 작업을 처리하다가, I/O 작업이 발생하면 예약해둔 스케쥴러로 전환되어 (그림에서는 Bounded Elastic) 요청을 처리합니다.
- Boss Thread와 Worker Thread는 각각 작업을 다른 Thread에게 위임했으므로 다음 요청을 처리할 수 있습니다.
이런 구조로, Webflux는 다수의 요청을 적은 쓰레드로 처리할 수 있습니다.
참고
Webflux에서는 Controller가 반환한 Publisher (Mono, Flux)를 구독하고 클라이언트에게 결과를 응답합니다.
따라서 비동기 작업이 제대로 수행되기 위해서는 모든 비동기 작업이 체인되어 결과적으로 하나의 Publisher를 반환해야 합니다. 내부적으로 체인되지 않은 Publisher가 있다면 메소드를 호출 했더라도(ex-데이터 저장) 실제로는 구독되지 않아 작업이 수행되지 않습니다.
또한, Webflux에서 결과를 구독해서 클라이언트에게 응답하기 때문에 일반적으로는 직접 subscribe() 를 호출하지 않아도 됩니다.
정리하면 컨트롤러의 매개변수는 일반 데이터 타입으로 전달 받으면 되고, 반환 값은 리액티브 타입으로 반환하면 됩니다.
참고자료
'Spring' 카테고리의 다른 글
| Spring Webflux 비동기 처리 흐름의 이해 - 1. Netty 편 (0) | 2025.03.24 |
|---|