Reactor, RxJava, WebFlux에서 사용되는 용어들
Spring WebFlux, Project Reactor 등 Reactive Streams가 사용되는 프로젝트를 개발하면서 구글링을 하면 아래의 용어들을 만나볼 수 있다.
앞으로 구글링할 때 읽다가 단어의 뜻을 몰라 브레이크가 걸리지 않도록 용어의 뜻을 예제와 함께 알아보자
Publisher : 발행자, 생산자, Emitter 데이터를 생산하는 주체
Emit : Publisher가 데이터를 Downstream(아래 참고)으로 방출하는 것
Subscriber : 구독자, 소비자 데이터를 소비하는 주체
Sequence : Publisher가 emit하는 데이터의 연속적인 흐름 stream, pipeline과도 의미가 같다.
Subscribe : Subscriber가 Upstream Sequence를 구독하는 것
Publisher가 Emit하는 요소들이 Reactive Streams Operator들을 지나서 가공되는 흐름을 Sequence 라고 한다.
Subscriber가 Cold Publisher를 Subscribe 하게될 때 Publisher가 요소들을 Emit하기 시작한다.
Subscriber가 Cold Publisher를 Subscribe 하게될 때 Reactive Stream에서 데이터가 흐르기 시작한다.
Hot Publisher는 Subscriber의 유무와 관계없이 데이터를 생산하는 상태이다.
Subscriber가 Hot Publisher를 Subscribe 하는 경우 구독 이후 생산된 요소들만 소비할 수 있다.
예제로 알아보기
@Test
void pubsublearn() {
//@1
Flux<Integer> publisher = Flux.just(1, 2, 3, 4)
.filter(i -> i % 2 == 0);
//@2
BaseSubscriber<Integer> subscriber = new BaseSubscriber<>() {
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
}
};
// @3 구독관계 생성
publisher.subscribe(subscriber);
// @4 주로 사용되는 subscribe 패턴
publisher.subscribe(i -> System.out.println("Received: " + i));
// @5 하나의 연결된 파이프라인으로 표현
Flux.just(1, 2, 3, 4)
.filter(i -> i % 2 == 0)
.subscribe(i -> System.out.println("Received: " + i));
}
@1 : 0~n개의 Integer를 {발행, 생산, emit} 하는 publisher가 정의되어있다.
Flux.just(1, 2, 3, 4) 에서는 1,2,3,4를 방출할 것이고, .filter(i -> i % 2 == 0) 에서는 2,4를 방출하게 될 것이다.
@2 : 어떤 Publisher가 {발행, 생산, emit} 하는 {데이터, 요소} 들을 구독할 subscriber가 정의되어있다.
OnNext로 방출되는 각 요소를 hook 하여 정의한 데이터 소비행위를 한다.
@3 : 1,2 번 과정만 수행한경우 publisher, subscriber가 서로를 모르기때문에 publisher는 데이터를 발행하지 않고, subscriber는 소비할 데이터가 없는 상태이다.
하지만 구독관계를 생성하게되면 publisher가 emit한 2,4 가 소비되는것을 확인할 수 있다.

@4 : 모든 Subscriber를 2번과 같이 정의해야한다면 편리하지도 않고 stream을 흘러가는대로 이해할 수 없게된다. 하지만 subscribe에 전달되는 매개변수인 CoreSubscriber 대신 함수형 인터페이스인 Consumer 를 활용할 수 있다.
BaseSubscriber 에서 hookOnNext소비되는 각각의 요소에 취할 functional method인 void accept(Object) 를 람다식으로 전달하면 됨
@5 : sequence가 눈에 잘 띄도록 publisher → subscriber 를 연결해놓은 코드, {sequence, stream, pipeline} 등 세세하게는 뜻이 다를 수 있어도 비슷한 의미로 표현된다.
Downstream : 현재 Operator 를 기준으로 하위 Operator 및 method 체인
Upstream : 현재 Operator 를 기준으로 상위 Operator 및 method 체인
어떤 operator를 기준으로 publisher방향에 위치한 operator들을 말할 땐 upstream
어떤 operator를 기준으로 consumer(subscriber) 방향에 위치한 operator들을 말할 땐 downstream이라고 부른다.
upstream downstream
publisher/past operators <--------- operator -----------> consumer/further operators
아래 주석메세지를 보면 Downstream 과 Upstream 의 사용 용례를 알 수 있다.
Flux.just(1, 2, 3, 4, 5, 100, 1000)
// 위의 just는 아래 filter의 Upstream이다.
.filter(i -> i >= 100)
// 아래 map, subscribe는 위 filter의 Downstream이다.
.map(String::valueOf)
.subscribe(System.out::println);
Dispose : Suscriber가 Sequence의 구독을 해지 하는 것
@Test
void dispose() throws InterruptedException {
// @1 도매상의 펜 재고, 초록색 펜이 5번째에 있음에 유의
List<Pen> penList = new ArrayList<>();
penList.add(new Pen("검정"));
penList.add(new Pen("불량"));
penList.add(new Pen("빨강"));
penList.add(new Pen("불량"));
penList.add(new Pen("초록"));
// @2 penList에서 매 초 Pen 1개 씩 emit되어 불량인지 검사하는 도매상의 판매 과정
Flux<Pen> penWholesaler = Flux.fromIterable(penList).delayElements(Duration.ofSeconds(1))
.filter(Pen::checkQuality);
// @3 소매상의 도매상(Publisher)으로부터 생성되는 펜들(Sequence)을 구입(Subscribe)하여 판매(소비)하는 계약 성립
Disposable contract = penWholesaler.subscribe(
pen -> System.out.printf("도매상에게서 %s을(를) 100원에 샀다.%n손님에게 %s을(를) 500원에 팔았다.%n", pen, pen));
// @4 4초 후 계약을 취소(subscribe 후 dispose)했으므로 4초간 pub-sub stream에서 데이터가 흐를 것,
// 따라서 5번째 펜 재고인 초록색 펜은 결과로 출력되지 않는다.
Thread.sleep(4000);
contract.dispose(); // @5
}
테스트 실행 결과

@3 에서 subscribe하여 생긴 pub-sub관계를 Disposable 형식의 객체로 받아올 수 있다.
언제든지 @5 처럼 dispose()를 호출하여 pub-sub 관계를 해제할 수 있다는 의미로 이해하면 좋을 것 같다.
[참고] Pen class 구현
public static class Pen {
String color;
public Pen(String color) {
this.color = color;
}
public boolean checkQuality() {
return !Objects.equals("불량", this.color);
}
@Override
public String toString() {
return this.color + "색 펜";
}
}
Reactor, RxJava, WebFlux에서 사용되는 용어들
Spring WebFlux, Project Reactor 등 Reactive Streams가 사용되는 프로젝트를 개발하면서 구글링을 하면 아래의 용어들을 만나볼 수 있다.
앞으로 구글링할 때 읽다가 단어의 뜻을 몰라 브레이크가 걸리지 않도록 용어의 뜻을 예제와 함께 알아보자
Publisher : 발행자, 생산자, Emitter 데이터를 생산하는 주체
Emit : Publisher가 데이터를 Downstream(아래 참고)으로 방출하는 것
Subscriber : 구독자, 소비자 데이터를 소비하는 주체
Sequence : Publisher가 emit하는 데이터의 연속적인 흐름 stream, pipeline과도 의미가 같다.
Subscribe : Subscriber가 Upstream Sequence를 구독하는 것
Publisher가 Emit하는 요소들이 Reactive Streams Operator들을 지나서 가공되는 흐름을 Sequence 라고 한다.
Subscriber가 Cold Publisher를 Subscribe 하게될 때 Publisher가 요소들을 Emit하기 시작한다.
Subscriber가 Cold Publisher를 Subscribe 하게될 때 Reactive Stream에서 데이터가 흐르기 시작한다.
Hot Publisher는 Subscriber의 유무와 관계없이 데이터를 생산하는 상태이다.
Subscriber가 Hot Publisher를 Subscribe 하는 경우 구독 이후 생산된 요소들만 소비할 수 있다.
예제로 알아보기
@Test
void pubsublearn() {
//@1
Flux<Integer> publisher = Flux.just(1, 2, 3, 4)
.filter(i -> i % 2 == 0);
//@2
BaseSubscriber<Integer> subscriber = new BaseSubscriber<>() {
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
}
};
// @3 구독관계 생성
publisher.subscribe(subscriber);
// @4 주로 사용되는 subscribe 패턴
publisher.subscribe(i -> System.out.println("Received: " + i));
// @5 하나의 연결된 파이프라인으로 표현
Flux.just(1, 2, 3, 4)
.filter(i -> i % 2 == 0)
.subscribe(i -> System.out.println("Received: " + i));
}
@1 : 0~n개의 Integer를 {발행, 생산, emit} 하는 publisher가 정의되어있다.
Flux.just(1, 2, 3, 4) 에서는 1,2,3,4를 방출할 것이고, .filter(i -> i % 2 == 0) 에서는 2,4를 방출하게 될 것이다.
@2 : 어떤 Publisher가 {발행, 생산, emit} 하는 {데이터, 요소} 들을 구독할 subscriber가 정의되어있다.
OnNext로 방출되는 각 요소를 hook 하여 정의한 데이터 소비행위를 한다.
@3 : 1,2 번 과정만 수행한경우 publisher, subscriber가 서로를 모르기때문에 publisher는 데이터를 발행하지 않고, subscriber는 소비할 데이터가 없는 상태이다.
하지만 구독관계를 생성하게되면 publisher가 emit한 2,4 가 소비되는것을 확인할 수 있다.

@4 : 모든 Subscriber를 2번과 같이 정의해야한다면 편리하지도 않고 stream을 흘러가는대로 이해할 수 없게된다. 하지만 subscribe에 전달되는 매개변수인 CoreSubscriber 대신 함수형 인터페이스인 Consumer 를 활용할 수 있다.
BaseSubscriber 에서 hookOnNext소비되는 각각의 요소에 취할 functional method인 void accept(Object) 를 람다식으로 전달하면 됨
@5 : sequence가 눈에 잘 띄도록 publisher → subscriber 를 연결해놓은 코드, {sequence, stream, pipeline} 등 세세하게는 뜻이 다를 수 있어도 비슷한 의미로 표현된다.
Downstream : 현재 Operator 를 기준으로 하위 Operator 및 method 체인
Upstream : 현재 Operator 를 기준으로 상위 Operator 및 method 체인
어떤 operator를 기준으로 publisher방향에 위치한 operator들을 말할 땐 upstream
어떤 operator를 기준으로 consumer(subscriber) 방향에 위치한 operator들을 말할 땐 downstream이라고 부른다.
upstream downstream
publisher/past operators <--------- operator -----------> consumer/further operators
아래 주석메세지를 보면 Downstream 과 Upstream 의 사용 용례를 알 수 있다.
Flux.just(1, 2, 3, 4, 5, 100, 1000)
// 위의 just는 아래 filter의 Upstream이다.
.filter(i -> i >= 100)
// 아래 map, subscribe는 위 filter의 Downstream이다.
.map(String::valueOf)
.subscribe(System.out::println);
Dispose : Suscriber가 Sequence의 구독을 해지 하는 것
@Test
void dispose() throws InterruptedException {
// @1 도매상의 펜 재고, 초록색 펜이 5번째에 있음에 유의
List<Pen> penList = new ArrayList<>();
penList.add(new Pen("검정"));
penList.add(new Pen("불량"));
penList.add(new Pen("빨강"));
penList.add(new Pen("불량"));
penList.add(new Pen("초록"));
// @2 penList에서 매 초 Pen 1개 씩 emit되어 불량인지 검사하는 도매상의 판매 과정
Flux<Pen> penWholesaler = Flux.fromIterable(penList).delayElements(Duration.ofSeconds(1))
.filter(Pen::checkQuality);
// @3 소매상의 도매상(Publisher)으로부터 생성되는 펜들(Sequence)을 구입(Subscribe)하여 판매(소비)하는 계약 성립
Disposable contract = penWholesaler.subscribe(
pen -> System.out.printf("도매상에게서 %s을(를) 100원에 샀다.%n손님에게 %s을(를) 500원에 팔았다.%n", pen, pen));
// @4 4초 후 계약을 취소(subscribe 후 dispose)했으므로 4초간 pub-sub stream에서 데이터가 흐를 것,
// 따라서 5번째 펜 재고인 초록색 펜은 결과로 출력되지 않는다.
Thread.sleep(4000);
contract.dispose(); // @5
}
테스트 실행 결과

@3 에서 subscribe하여 생긴 pub-sub관계를 Disposable 형식의 객체로 받아올 수 있다.
언제든지 @5 처럼 dispose()를 호출하여 pub-sub 관계를 해제할 수 있다는 의미로 이해하면 좋을 것 같다.
[참고] Pen class 구현
public static class Pen {
String color;
public Pen(String color) {
this.color = color;
}
public boolean checkQuality() {
return !Objects.equals("불량", this.color);
}
@Override
public String toString() {
return this.color + "색 펜";
}
}