ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스프링 인 액션] Chapter 10 - 리액터 개요 :: 리액티브 오퍼레이션 적용하기
    개발서적읽기/Spring in Action 제 5판 2020. 10. 4. 21:32


    Flux와 Mono는 리액터가 제공하는 가장 핵심적인 구성 요소(리액티브 타임)다. 그리고 Flux와


    Mono가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는


    파이프라인을 생성한다. Flux와 Mono에는 500개 이상의 오퍼레이션이 있으며, 각 오퍼레이션은 


    다음과 같이 분류될 수 있다.


    • 생성 오퍼레이션

    • 조합 오퍼레이션

    • 변환 오퍼레이션

    • 로직 오퍼레이션


    500개 이상의 오퍼레이션 모두를 살펴보면 좋겠지만, 분량이 많아서 그렇게 하기는 어렵다.


    따라서 여기서는 가장 유용한 몇 가지 오퍼레이션을 선정하였다. 우선, 생성 오퍼레이션부터 


    시작해 보자.




    ■리액티브 타입 생성하기


    스프링에서 리액티브 타입을 사용할 때는 Repository나 Service로부터 Flux나 Mono가 


    제공되므로 우리의 리액티브 타입을 생성할 필요가 없다. 그러나 데이터를 발행하는 새로운


    리액티브 발행자(Publisher)를 생성해야 할 때가 있다.


    객체로부터 생성하기


    Flux나 Mono로 생성하려는 하나 이상의 객체가 있다면 Flux나 Mono의 just() static 


    메서드를 사용하여 리액티브 타입을 생성할 수 있다. 예를 들어, 다음의 테스트 메서드는


    다섯 개의 String 객체로부터 Flux를 생성한다.

    @Test
    public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("fruit1","fruit2","fruit3","fruit4","fruit5")
    }

    이 경우 Flux는 생성되지만 구독자(Publisher)가 없다. 구독자가 없이는 데이터가 전달되지


    않을 것이다. 구독자를 추가할 때는 Flux의 subscrtibe()를 호출한다.

    @Test
    public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("fruit1","fruit2","fruit3","fruit4","fruit5");
    fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f));
    }

    여기서 subscribe()에 지정된 람다는 실제로는 java.util.Consumer이며, 이것은 리액티브 


    스트림의 Subscriber 객체를 생성하기 위해 사용된다. subscribe()를 호출하는 즉시 데이터가


    전달되기 시작한다. 이 예에는 중간에 다른 오퍼레이션이 없으므로 데이터는 곧바로 


    Flux로부터 Subscriber로 전달된다.


    이처럼 Flux나 Mono의 항목들을 콘솔로 출력하면 리액티브 타입이 실제 작동하는 것을 


    파악하는데 좋다. 그러나 리액터의 StepVerifier를 사용하는 것이 Flux나 Mono를 


    테스트하는 더 좋은 방법이다. Flux나 Mono가 지정되면 StepVerifier는 해당 리액티브 


    타입을 구독한 다음에 스트림을 통해 전달되는 데이터에 대해 assertion을 적용한다.


    그리고 해당 스트림이 기대한 대로 완전하게 작동하는지 검사한다. 예를 들어


    조금 전에 예로 든 fruitFlux를 통해 구독 데이터를 검사하기 위해 다음과 같이 테스트를


    작성할 수 있다.

    StepVerifier.create(fruitFlux)
    .expectNext("fruit1")
    .expectNext("fruit2")
    .expectNext("fruit3")
    .expectNext("fruit4")
    .expectNext("fruit5")
    .verifyComplete();

    이번 장의 나머지 예에서는 리액터의 가장 유용한 오퍼레이션들을 파악하기 위해 


    StepVerifier를 사용해서 테스트를 작성할 것이다.


    컬렉션으로부터 생성하기


    또한 Flux는 배열, Iterable, Java Stream 등으로 부터 생성될 수도 있다.


    배열로부터 Flux를 생성하려면 static 메서드인 fromArray()를 호출한다.

    String[] fruits = new String[]{"fruit1", "fruit2", "fruit3", "fruit4", "fruit5"};

    Flux<String> fruitFlux = Flux.fromArray(fruits);

    StepVerifier.create(fruitFlux)
    .expectNext("fruit1")
    .expectNext("fruit2")
    .expectNext("fruit3")
    .expectNext("fruit4")
    .expectNext("fruit5")
    .verifyComplete();

    java.util.List, java.util.Set, java.lang.Iterable의 구현 컬렉션으로부터 Flux를 생성해야 한다면


    해당 컬렉션을 fromIterable()에 인자로 전달하면 된다.


    Java Stream 객체를 소스로 사용해야 한다면 fromStream()을 호출한다.


    Flux 데이터 생성하기


    때로는 데이터 없이 매번 새 값으로 증가하는 숫자를 방출하는 카운터 역할의 Flux만 


    필요한 경우가 있다. 이처럼 카운터 Flux를 생성할 때는 static 메서드 range()를 사용한다.

    Flux<Integer> intervalFlux = Flux.range(1,5);

    StepVerifier.create(intervalFlux)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .expectNext(4)
    .expectNext(5)
    .verifyComplete();

    range()와 유사한 또 다른 Flux 생성 메서드로 interval()이 있다. interval()도 range() 처럼


    증가값을 방출하는 Flux를 생성한다. 하지만 시작 값과 종료 값 대신 값이 방출되는 시간 


    간격이나 주기를 지정한다. 

    Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);

    StepVerifier.create(intervalFlux)
    .expectNext(0L)
    .expectNext(1L)
    .expectNext(2L)
    .expectNext(3L)
    .expectNext(4L)
    .verifyComplete();

    interval()에는 최대값이 지정되지 않으므로 무한정 실행된다.




    ■리액티브 타입 조합하기


    두 개의 리액티브 타입을 결합해야 하거나 Flux를 두 개 이상의 리액티브 타입으로 분할해야


    하는 경우가 있을 수 있다. 여기서는 리액터의 Flux나 Mono를 결합하거나 분할하는


    오퍼레이션을 알아본다.


    리액티브 타입 결합하기


    두 개의 Flux 스트림이 있는데 이것을 하나의 Flux로 생성해야 한다고 해보자. 


    이 때는 mergeWith() 오퍼레이션을 사용하면 된다.

    @org.junit.Test
    public void mergeFluxes() {
    Flux<String> characterFlux = Flux
    .just("Garfield", "kojak", "Barbossa")
    .delayElements(Duration.ofMillis(500));
    Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples")
    .delaySubscription(Duration.ofMillis(250))
    .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

    StepVerifier.create(mergedFlux)
    .expectNext("Garfield")
    .expectNext("Lasagna")
    .expectNext("kojak")
    .expectNext("Lollipops")
    .expectNext("Barbossa")
    .expectNext("Apples")
    .verifyComplete();
    }

    일반적으로 Flux는 가능한 빨리 데이터를 방출한다. 따라서 생성되는 Flux 스트림 두 개 


    모두에 delayElements() 오퍼레이션을 사용해서 조금 느리게 방출되도록 설정했다.


    또한 foodFlux가 characterFlux 다음에 스트리밍을 시작하도록 foodFlux에 


    delaySubscribption() 오퍼레이션을 적용하여 250밀리초가 지난 후에 구독 및 데이터를 


    방출하도록 설정했다.


    두 Flux가 결합되면 하나의 Flux가 새로 생성된다. 그리고 mergedFlux를 StepVerifier가


    구독할 때는 데이터의 흐름이 시작되면서 두 개의 소스 Flux 스트림을 번갈아 구독하게


    된다. 


    mergedFlux로부터 방출되는 항목의 순서는 두 개의 소스 Flux로부터 방출되는 시간에 


    맞춰 결정된다. 여기서는 두 Flux 객체 모두 일정한 속도로 방출되게 설정되었으므로 


    두 Flux의 값은 번갈아 mergedFlux에 끼워진다. 만약 어느 한 쪽 Flux의 지연 시간이 


    변경된다면 한 Flux의 값이 두 번씩 방출되는 것을 볼 수도 있다.


    mergeWith()는 소스 Flux들의 값이 완벽하게 번갈아 방출되게 보장할 수 없으므로 


    필요하다면 zip() 오퍼레이션을 대신 사용할 수 있다. 이 오퍼레이션은 각 Flux 소스로부터


    한 항목씩 번갈아 가져와 새로운 Flux를 생성한다.

    @org.junit.Test
    public void zipFluxes() {
    Flux<String> characterFlux = Flux
    .just("Garfield", "kojak", "Barbossa");
    Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples");

    Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(zippedFlux)
    .expectNextMatches(p ->
    p.getT1().equals("Garfield") && p.getT2().equals("Lasagna")
    )
    .expectNextMatches(p ->
    p.getT1().equals("kojak") && p.getT2().equals("Lollipops")
    )
    .expectNextMatches(p ->
    p.getT1().equals("Barbossa") && p.getT2().equals("Apples")
    )
    .verifyComplete();
    }

    mergeWith()와 다르게 zip() 오퍼레이션은 정적인 생성 오퍼레이션이다. 따라서 여기서 


    생성되는 Flux는 characterFlux와 foodFlux를 완벽하게 조합한다. zippedFlux로부터 


    방출되는 각 항목은 Tuple2이며 각 소스 Flux가 순서대로 방출하는 항목을 포함한다.


    만일 Tuple2가 아닌 다른 타입을 사용하고 싶다면, 아래와 같이 우리가 원하는 객체를 


    생성하는 함수를 아래와 같이 zip()에 제공하면 된다.

    @org.junit.Test
    public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux
    .just("Garfield", "kojak", "Barbossa");
    Flux<String> foodFlux = Flux
    .just("Lasagna", "Lollipops", "Apples");

    Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);

    StepVerifier.create(zippedFlux)
    .expectNext("Garfield eats lasagna")
    .expectNext("kojak eats Lollipops")
    .expectNext("Barbossa eats Apples")
    .verifyComplete();
    }

    먼저 값을 방출하는 리액티브 타입 선택하기


    두 개의 Flux 객체가 있는데, 이것을 결합하는 대신 먼저 값을 방출하는 소스 Flux의 값을


    발행하는 새로운 Flux를 생성하고 싶다고 해보자. first() 오퍼레이션은 두 Flux 객체 중 먼저 


    값을 방출하는 Flux의 값을 선택해서 이 값을 발행한다.

    @org.junit.Test
    public void firstFLux() {
    Flux<String> slowFlux = Flux
    .just("tortoise", "snail", "sloth")
    .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux
    .just("hare", "cheetah", "squirrel");

    Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);

    StepVerifier.create(firstFlux)
    .expectNext("hare")
    .expectNext("cheetah")
    .expectNext("squirrel")
    .verifyComplete();
    }

    slowFlux는 100밀리초가 경과한 후에 구독 신청과 발행을 시작하므로, 새로 생성되는 


    firstFlux는 slowFlux를 무시하고 fastFlux의 값만 발행하게 된다.




    ■리액티브 스트림의 변환과 필터링


    리액티브 타입으로부터 데이터 필터링하기


    Flux로부터 데이터가 전달될 때, 데이터를 필터링하는 가장 기본적인 방법은 맨 앞부터 


    원하는 개수의 항목을 무시하는 것이다. 이때 skip() 오퍼레이션을 사용한다.


    skip() 오퍼레이션은 소스 Flux의 항목에서 지정된 수만큼 건너뛴 후 나머지 항목을 방출하는


    새로운 Flux를 생성한다.

    @org.junit.Test
    public void skipAFew() {
    Flux<String> skipFlux = Flux.just(
    "one", "two", "skip a few", "ninety nine", "one hundred")
    .skip(3);

    StepVerifier.create(skipFlux)
    .expectNext("ninety nine", "one hundred")
    .verifyComplete();
    }

    특정 수의 항목을 건너뛰는 대신, 일정 시간이 경과할 때까지 처음의 여러 항목을 


    건너뛰어야 하는 경우가 있다. 이런 형태의 skip() 오퍼레이션은 지정된 시간이 경과할 


    때까지 기다렸다가 소스 Flux의 항목을 방출하는 Flux를 생성한다. 아래의 테스트 


    메서드에서는 skip()을 사용해서 4초 동안 기다렸다가 값을 방출하는 결과 Flux를 생성한다.


    여기서는 delayElements()를 사용하여 항목 간에 1초 동안 지연되는 Flux로부터


    결과 Flux(skipFlux)가 생성되었으므로 마지막 두 개의 항목만이 방출된다.

    @org.junit.Test
    public void skipAFewSeconds() {
    Flux<String> skipFlux = Flux.just(
    "one", "two", "skip a few", "ninety nine", "one hundred")
    .delayElements(Duration.ofSeconds(1))
    .skip(Duration.ofSeconds(4));

    StepVerifier.create(skipFlux)
    .expectNext("ninety nine", "one hundred")
    .verifyComplete();
    }

    skip() 오퍼레이션의 반대 기능이 필요할 때는 take()를 고려할 수 있다.

    @org.junit.Test
    public void take() {
    Flux<String> nationalParkFlux = Flux.just(
    "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
    .take(3);

    StepVerifier.create(nationalParkFlux)
    .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
    .verifyComplete();
    }

    skip()처럼, take()도 항목 수가 아닌 경과 시간을 기준으로 하는 형태로도 사용할 수 있다.


    skip()과 take() 오퍼레이션은 카운트나 경과 시간을 필터 조건으로 하는 일종의 필터 


    오퍼레이션이라고 생각할 수 있다. 그러나 Flux 데이터를 더 범용적으로 필터링 할 때는


    filter() 오퍼레이션이 더 적합할 수 있다. 

    @org.junit.Test
    public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
    "Yellowstone", "yosemite", "Grand Canyon", "Zion", "Grand Teton"
    ).filter(np -> !np.contains(" "));

    StepVerifier.create(nationalParkFlux)
    .expectNext("Yellowstone", "yosemite", "Zion").
    verifyComplete();
    }

    고유한 문자열 값들만 방출시키고 싶을 경우 distinct() 오퍼레이션을 사용한다.

    @org.junit.Test
    public void distinct() {
    Flux<String> animalFlux = Flux.just("dog", "cat", "bird", "dog")
    .distinct();

    StepVerifier.create(animalFlux)
    .expectNext("dog", "cat", "bird")
    .verifyComplete();
    }

    리액티브 데이터 매핑하기


    리액터의 타입은 발행된 항목을 다른 형태나 타입으로 매핑하는 기능을 하는 map()과 


    flatMap() 오퍼레이션을 제공한다. 아래 테스트 메서드에서는 문자열 값을 전달하는 소스 


    Flux가 새로운 데이터를 발행하는 Flux로 변환된다.

    @org.junit.Test
    public void map() {
    Flux<SplitedString> splitedStringFlux = Flux.just("A AA", "B BB", "C CC")
    .map(str -> {
    String[] split = str.split("\\s");
    return new SplitedString(split[0], split[1]);
    });

    StepVerifier.create(splitedStringFlux)
    .expectNext(new SplitedString("A", "AA"))
    .expectNext(new SplitedString("B", "BB"))
    .expectNext(new SplitedString("C", "CC"))
    .verifyComplete();
    }

    map()의 중요한 특징은, 각 항목이 소스 Flux로부터 발행될 때 동기적으로 매핑이 


    수행된다는 것이다. 따라서 비동기적으로 매핑을 수행하고 싶다면 flatMap() 오퍼레이션을


    사용해야 한다. flatMap()에서는 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나


    Flux들의 결과는 하나의 새로운 Flux가 된다. flatMap()을 subscribeOn()과 함께 사용하면


    리액터 타입의 변환을 비동기적으로 수행할 수 있다.

    @org.junit.Test
    public void flatMap() {
    Flux<SplitedString> splitedStringFlux = Flux.just("A AA", "B BB", "C CC")
    .flatMap(str -> Mono.just(str).map(
    p -> {
    String[] split = p.split("\\s");
    return new SplitedString(split[0], split[1]);
    }).subscribeOn(Schedulers.parallel())
    );

    List<SplitedString> list = Arrays.asList(
    new SplitedString("A", "AA"),
    new SplitedString("B", "BB"),
    new SplitedString("C", "CC")
    );

    StepVerifier.create(splitedStringFlux)
    .expectNextMatches(p -> list.contains(p))
    .expectNextMatches(p -> list.contains(p))
    .expectNextMatches(p -> list.contains(p))
    .verifyComplete();
    }

    subscribeOn()을 호출해서 각 구독이 병렬 스레드로 수행되도록 했다. 즉, 다수의 String 


    입력 객체들의 map() 오퍼레이션이 비동기적으로 병행 수행되도록 했다.


    subscribeOn()의 이름은 subscribe()와 유사하지만, 두 오퍼레이션은 매우 다르다. 


    subscribe()는 이름이 동사형이면서 리액티브 플로우를 구독 요청하고 실제로 구독하는 


    반면, subscribeOn()은 이름이 더 서술적이면서 구독이 동시적으로 처리된다. 리액터는


    어떤 특정 동시성 모델도 강요하지 않으며, 우리가 사용하기 원하는 동시성 모델을


    subscribeOn()의 인자로 지정할 수 있다. 위 코드에서는 CPU 코어의 개수가 그 크기가 되는 


    고정된 크기의 스레드로 실행되는 parallel()을 사용하였다. 아래는 Schedulers가 지원하는


    동시성 모델들이다.


     Schedulers 메서드

     개요 

     .immediate() 

     현재 스레드에서 구독을 실행한다. 

     .single() 

     단일의 재사용 가능한 스레드에서 구독을 실행한다. 모든 호출자에 대해 동일한 스레드를 재사용한다.

     .newSingle() 

     매 호출마다 전용 스레드에서 구독을 실행한다.

     .elastic()

     무한하고 신축성 있는 풀에서 가져온 작업 스레드에서 구독을 실행한다. 필요 시 새로운 작업 스레드가 생성되며, 유휴 스레드는 제거된다. 

     .parallel()

     고정된 크기의 풀에서 가져온 작업 스레드에서 구독을 실행하며, CPU 코어의 개수가 그 크기가 된다.


    flatMap()이나 subscribeOn()을 사용할 때의 장점은 다수의 병행 스레드에 작업을 분할하여


    스트림의 처리량을 증가시킬 수 있다는 것이다. 그러나 작업이 병행으로 수행되므로 어떤


    작업이 먼저 끝날지 모장이 안 되어 결과 Flux에서 방출되는 항목의 순서를 알 방법이 


    없다는 것이 단점이기도 한다.


    리액티브 스트림의 데이터 버퍼링하기


    Flux를 통해 전달되는 데이터를 처리하는 동안 데이터 스트림을 작은 덩어리로 분할해야


    하는 경우가 생길 수 있다. 이때 buffer() 오퍼레이션을 사용한다.

    @org.junit.Test
    public void buffer() {
    Flux<String> fruitFlux = Flux.just(
    "apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier
    .create(bufferedFlux)
    .expectNext(Arrays.asList("apple", "orange", "banana"))
    .expectNext(Arrays.asList("kiwi", "strawberry"))
    .verifyComplete();
    }

    위 경우 String 요소의 Flux는 List 컬렉션을 포함하는 새로운 Flux로 버퍼링한다. 따라서 


    5개의 String 값을 방출하는 원래의 Flux는 두 개의 컬렉션을 방출하는 Flux로 변환된다.


    이처럼 리액티브 Flux로부터 리액티브가 아닌 List 컬렉션으로 버퍼링되는 값은 비생산적인


    것처럼 보인다. 그러나 buffer()를 flatMap()과 같이 사용하면 각 List 컬렉션을 병행으로


    처리할 수 있다.

    Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x -> Flux.fromIterable(x)
    .map(String::toUpperCase)
    .subscribeOn(Schedulers.parallel())
    .log()
    ).subscribe();

    여기서는 5개의 값으로 된 Flux를 새로운 Flux로 버퍼링하지만, 이 Flux는 여전히 List 


    컬렉션을 포함한다. 그러나 그다음에 List 컬렉션의 Flux에 flatMap()을 적용한다. 이 경우


    flatMap()에서는 각 List 버퍼를 가져와서 해당 List의 요소로부터 새로운 Flux를 생성하고


    map() 오퍼레이션을 적용한다. 따라서 버퍼링된 각 List는 별도의 스레드에서 병행으로 


    계속 처리될 수 있다. 이 동작들을 확인하기 위해 log() 오퍼레이션을 사용했다.


    log() 오퍼레이션은 모든 리액티브 스트림 이벤트를 로깅하므로 실제 어떻게 되는지 파악할


    수 있다.


    만일 어떤 이유로든 Flux가 방출하는 모든 항목을 List로 모을 필요가 있다면 인자를 


    전달하지 않고 buffer를 호출하면 된다.

    Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer();

    이 경우 소스 Flux가 발행한 모든 항목을 포함하는 List를 방출하는 새로운 Flux가 생성된다.


    한편 collectionList() 오퍼레이션을 사용해도 같은 결과를 얻을 수 있다. collectionList()는


    List를 발행하는 Flux 대신 Mono를 생성한다.

    @org.junit.Test
    public void collectionList() {
    Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");

    Mono<List<String>> fruitLIstMono = fruitFlux.collectList();

    StepVerifier.create(fruitLIstMono)
    .expectNext(Arrays.asList("apple", "orange", "banana", "kiwi", "strawberry"))
    .verifyComplete();
    }

    Flux가 방출하는 항목들을 모으는 훨씬 더 흥미로운 방법으로 collectMap()이 있다.


    collectMap() 오퍼레이션은 Map을 포함하는 Mono를 생성한다. 해당 Map에는 지정된 


    함수로 키를 생성한다.

    @org.junit.Test
    public void collectMap() {
    Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Map<Character, String>> animalMapMono = animalFlux.collectMap(a -> a.charAt(0));

    StepVerifier.create(animalMapMono)
    .expectNextMatches(map -> map.size() == 3 &&
    map.get('a').equals("aardvark") &&
    map.get('e').equals("eagle") &&
    map.get('k').equals("kangaroo")
    ).verifyComplete();
    }




    ■리액티브 타입에 로직 오퍼레이션 수행하기


    Mono나 Flux가 방행한 항목이 어떤 조건과 일치하는지'만' 알아야 하는 경우엔, all()이나


    any() 오퍼레이션을 사용한다.

    @org.junit.Test
    public void all() {
    Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
    StepVerifier.create(hasAMono)
    .expectNext(true)
    .verifyComplete();

    Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
    StepVerifier.create(hasAMono)
    .expectNext(false)
    .verifyComplete();
    }
    @org.junit.Test
    public void any() {
    Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
    Mono<Boolean> hasTMono = animalFlux.any(a -> a.contains("t"));
    StepVerifier.create(hasTMono)
    .expectNext(true)
    .verifyComplete();

    Mono<Boolean> hasZMono = animalFlux.all(a -> a.contains("z"));
    StepVerifier.create(hasZMono)
    .expectNext(false)
    .verifyComplete();
    }



    댓글

Designed by Tistory.