Skip to content
0

Reactor - 单元测试

测试

无论你是编写了一个简单的 Reactor 操作链,还是开发了自定义的操作符,对它进行 自动化的测试总是一个好主意。

Reactor 内置一些专门用于测试的元素,放在一个专门的 artifact 里: reactor-test。 你可以在 on Githubreactor-core 库里找到这个项目。

如果要用它来进行测试,添加 scope 为 test 的依赖。

reactor-test 用 Maven 配置 <dependencies>

xml
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    // 1
</dependency>
  1. 如果你使用了 BOM,你不需要指定 <version>

reactor-test 用 Gradle 配置 dependencies

dependencies {
   testCompile 'io.projectreactor:reactor-test'
}

reactor-test 的两个主要用途:

  • 使用 StepVerifier 一步一步地测试一个给定场景的序列
  • 使用 TestPublisher 生成数据来测试下游的操作符(包括你自己的 operator)

使用 StepVerifier 来测试

最常见的测试 Reactor 序列的场景就是定义一个 FluxMono,然后在订阅它的时候测试它的行为。

当你的测试关注于每一次的事件的时候,就非常容易转化为使用 StepVerifier 的测试场景: 下一个期望的事件是什么?你是否期望使用 Flux 来发出一个特别的值?或者接下来 300ms 什么都不做?所有这些都可以使用 StepVerifier API 来表示。

例如,你可能会使用如下的工具方法来包装一个 Flux

java
public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}

要测试它的话,你需要校验如下内容:

我希望这个 Flux 先发出 foo,然后发出 bar,然后 生成一个内容为 boom 的错误。 最后订阅并校验它们。

使用 StepVerifier API 来表示以上的验证过程:

java
@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("foo", "bar"); // 1

  StepVerifier.create( // 2
    appendBoomError(source)) // 3
    .expectNext("foo") // 4
    .expectNext("bar")
    .expectErrorMessage("boom") // 5
    .verify(); // 6
}
  1. 由于被测试方法需要一个 Flux,定义一个简单的 Flux 用于测试
  2. 创建一个 StepVerifier 构造器来包装和校验一个 Flux
  3. 传进来需要测试的 Flux(即待测方法的返回结果)
  4. 第一个我们期望的信号是 onNext,它的值为 foo
  5. 最后我们期望的是一个终止信号 onError,异常内容应该为 boom
  6. 不要忘了使用 verify() 触发测试

API 是一个构造器,通过传入一个要测试的序列来创建一个 StepVerifier。从而你可以:

  • 表示你 期望 发生的下一个信号。如果收到其他信号(或者信号与期望不匹配),整个测试就会 失败(AssertionError)。例如你可能会用到 expectNext(T...)expectNextCount(long)
  • 消费 下一个信号。当你想要跳过部分序列或者当你想对信号内容进行自定义的 assertion 的时候会用到它(比如要校验是否有一个 onNext 信号,并校验对应发出的元素是否是一个 size 为 5 的 List)。你可能会用到 consumeNextWith(Consumer<T>)
  • 更多样的操作 比如暂停或运行一段代码。比如,你想对测试状态或内容进行调整或处理, 你可能会用到 thenAwait(Duration)then(Runnable)

对于终止事件,相应的期望方法(expectComplete()expectError(),及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了。最后你只能对 StepVerifier 进行一些额外的配置并 触发校验(通常调用 verify() 及其变体方法)。

从 StepVerifier 内部来看,它订阅了待测试的 FluxMono,然后将序列中的每个信号与测试 场景的期望进行比对。如果匹配的话,测试成功。如果有不匹配的情况,则抛出 AssertionError 异常。

请记住是 verify() 触发了校验过程。这个 API 还有一些结合了 verify() 与期望的终止信号 的方法:verifyComplete()verifyError()verifyErrorMessage(String) 等。

注意,如果有一个传入 lambda 的期望方法抛出了 AssertionError,会被报告为测试失败。 这可用于自定义 assertion。

默认情况下,verify() 方法(及同源的 verifyThenAssertThatverifyComplete()等) 没有超时的概念。它可能会永远阻塞住。你可以使用 StepVerifier.setDefaultTimeout(Duration) 来设置一个全局的超时时间,或使用 verify(Duration) 指定。

操控时间

StepVerifier 可以用来测试基于时间的操作符,从而避免测试的长时间运行。可以使用构造器 StepVerifier.withVirtualTime 达到这一点。

示例如下:

java
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... 继续追加期望方法

虚拟时间(virtual time) 的功能会在 Reactor 的调度器(Scheduler)工厂方法中插入一个自定义的 调度器。这些基于时间的操作符通常默认使用 Schedulers.parallel() 调度器。(虚拟时间的) 技巧在于使用一个 VirtualTimeScheduler 来代替默认调度器。然而一个重要的前提就是,只有在初始化 虚拟时间调度器之后的操作符才会起作用。

为了提高 StepVerifier 正常起作用的概率,它一般不接收一个简单的 Flux 作为输入,而是接收 一个 Supplier,从而可以在配置好订阅者 之后 「懒创建」待测试的 flux。

要注意的是,Supplier<Publisher<T>> 可用于「懒创建」,否则不能保证虚拟时间 能完全起作用。尤其要避免提前实例化 Flux,要在 Supplier 中用 lambda 创建并返回 Flux 变量。

有两种处理时间的期望方法,无论是否配置虚拟时间都是可用的:

  • thenAwait(Duration) 暂停校验步骤(允许信号延迟发出)
  • expectNoEvent(Duration) 同样让序列持续一定的时间,期间如果有 任何 信号发出则测试失败

两个方法都会基于给定的持续时间暂停线程的执行,如果是在虚拟时间模式下就相应地使用虚拟时间。

expectNoEvent 将订阅(subscription)也认作一个事件。假设你用它作为第一步,如果检测 到有订阅信号,也会失败。这时候可以使用 expectSubscription().expectNoEvent(duration) 来代替。

为了快速校验前边提到的 Mono.delay,我们可以这样完成代码:

java
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription()
    .expectNoEvent(Duration.ofDays(1)) // 2
    .expectNext(0) // 3
    .verifyComplete(); // 4
  1. 期待一天内没有信号发生
  2. 然后期待一个 next 信号为 0
  3. 然后期待完成(同时触发校验)

我们也可以使用 thenAwait(Duration.ofDays(1)),但是 expectNoEvent 的好处是 能够验证在此之前不会发生什么。

注意 verify() 返回一个 Duration,这是整个测试的 真实时间

虚拟时间并非银弹。请记住 所有的 调度器都会被替换为 VirtualTimeScheduler。 有些时候你可以锁定校验过程,因为虚拟时钟在遇到第一个期望校验之前并不会开始,所以对于 「无数据「的期望校验也必须能够运行在虚拟时间模式下。在无限序列中,虚拟时间模式的发挥 空间也很有限,因为它可能导致线程(序列的发出和校验的运行都在这个线程上)卡住。

使用 StepVerifier 进行后校验

当配置完你测试场景的最后的期望方法后,你可以使用 verifyThenAssertThat() 来代替 verify() 触发执行后的校验。

verifyThenAssertThat() 返回一个 StepVerifier.Assertions 对象,你可以用它来校验 整个测试场景成功刚结束后的一些状态(它也会调用 verify())。典型应用就是校验有多少 元素被操作符丢弃(参考 使用全局的 Hooks)。

测试 Context

更多关于 Context 的内容请参考 增加一个 Context 到响应式序列

StepVerifier 有一些期望方法可以用来测试 Context

  • expectAccessibleContext: 返回一个 ContextExpectations 对象,你可以用它来在 Context 上配置期望校验。一定记住要调用 then() 来返回到对序列的期望校验上来
  • expectNoAccessibleContext: 是对「没有Context」的校验。通常用于 被测试的 Publisher 并不是一个响应式的,或没有任何操作符能够传递 Context (比如一个 generatePublisher

此外,还可以用 StepVerifierOptions 方法传入一个测试用的初始 ContextStepVerifier, 从而可以创建一个校验(verifier)。

这些特性通过下边的代码演示:

java
StepVerifier.create(Mono.just(1).map(i -> i + 10),
                                StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) // 1
                            .expectAccessibleContext() // 2
                            .contains("foo", "bar") // 3
                            .then() // 4
                            .expectNext(11)
                            .verifyComplete(); // 5
  1. 使用 StepVerifierOptions 创建 StepVerifier 并传入初始 Context
  2. 开始对 Context 进行校验,这里只是确保 Context 正常传播了
  3. Context 进行校验的例子:比如验证是否包含一个 "foo" - "bar" 键值对
  4. 使用 then() 切换回对序列的校验
  5. 不要忘了用 verify() 触发整个校验过程

TestPublisher 手动发出元素

对于更多高级的测试,如果能够完全掌控源发出的数据就会方便很多,因为这样就可以在测试的 时候更加有的放矢地发出想测的数据。

另一种情况就是你实现了自己的操作符,然后想校验它的行为——尤其是在源不稳定的时候——是否符合响应式流规范。

reactor-test 提供了 TestPublisher 类来应对这两种需求。这个类本质上是一个 Publisher<T>, 你可以通过可编程的方式来用它发出各种信号:

  • next(T) 以及 next(T, T...) 发出 1-n 个 onNext 信号
  • emit(T...) 起同样作用,并且会执行 complete()
  • complete() 会发出终止信号 onComplete
  • error(Throwable) 会发出终止信号 onError

使用 create 工厂方法就可以得到一个正常的 TestPublisher。而使用 createNonCompliant 工厂方法可以创建一个「不正常」的 TestPublisher。后者需要传入由 TestPublisher.Violation 枚举指定的一组选项,这些选项可用于告诉 publisher 忽略哪些问题。枚举值有:

  • REQUEST_OVERFLOW: 允许 next 在请求不足的时候也可以调用,而不会触发 IllegalStateException
  • ALLOW_NULL: 允许 next 能够发出一个 null 值而不会触发 NullPointerException
  • CLEANUP_ON_TERMINATE: 可以重复多次发出终止信号,包括 complete()error()emit()

最后,TestPublisher 还可以用不同的 assert* 来跟踪其内部的订阅状态。

使用转换方法 flux()mono() 可以将其作为 FluxMono 来使用。

PublisherProbe 检查执行路径

当构建复杂的操作链时,可能会有多个子序列,从而导致多个执行路径。

多数时候,这些子序列会生成一个足够明确的 onNext 信号,你可以通过检查最终结果 来判断它是否执行。

考虑下边这个方法,它构建了一条操作链,并使用 switchIfEmpty 方法在源为空的情况下, 替换成另一个源。

java
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}

很容易就可以测试出 switchIfEmpty 的哪一个逻辑分支被使用了,如下:

java
@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}

但是如果例子中的方法返回的是一个 Mono<Void> 呢?它等待源发送结束,执行一个额外的任务, 然后就结束了。如果源是空的,则执行另一个备用的类似于 Runnable 的任务,如下:

java
private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) // 1
            .switchIfEmpty(doWhenEmpty); // 2
}
  1. then() 方法会忽略 command,它只关心是否结束
  2. 两个都是空序列,这个时候如何区分(哪边执行了)呢?

为了验证执行路径是经过了 doWhenEmpty 的,你需要编写额外的代码,比如你需要一个这样的 Mono<Void>

  • 能够捕获到它被订阅的事实
  • 以上事实需要在整个执行结束 之后 再进行验证

在 3.1 版本以前,你需要为每一种状态维护一个 AtomicBoolean 变量,然后在相应的 doOn* 回调中观察它的值。这需要添加不少的额外代码。好在,版本 3.1.0 之后可以使用 PublisherProbe 来做, 如下:

java
@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); // 1

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) // 2
                .verifyComplete();

    probe.assertWasSubscribed(); // 3
    probe.assertWasRequested(); // 4
    probe.assertWasNotCancelled(); // 5
}
  1. 创建一个探针(probe),它会转化为一个空序列
  2. 在需要使用 Mono<Void> 的位置调用 probe.mono() 来替换为探针
  3. 序列结束之后,你可以用这个探针来判断序列是如何使用的,你可以检查是它从哪(条路径)被订阅的
  4. 对于请求也是一样的
  5. 以及是否被取消了

你也可以在使用 Flux<T> 的位置通过调用 .flux() 方法来放置探针。如果你既需要用探针检查执行路径 还需要它能够发出数据,你可以用 PublisherProbe.of(Publisher) 方法包装一个 Publisher<T> 来搞定

最近更新