程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

RxJava 1.x:如何在单元测试中模拟背压

发布于2023-02-04 22:49     阅读(589)     评论(0)     点赞(17)     收藏(0)


给定代码:

logItemPublisher
    .buffer(
            loggingProperties.getBufferTimeoutMillis(),
            TimeUnit.MILLISECONDS,
            loggingProperties.getBufferSize(),
            logDispatchScheduler
    )
    .onBackpressureDrop(droppedLogsHandler)
    // persist uses Spring RestOperations
    .flatMap(logs -> persist(logs, url)
            .subscribeOn(logDispatchScheduler)
    )
    .subscribe();

在哪里,

logItemPublisher = ReplaySubject.<AbstractHttpLogItem>createWithSize(4 * loggingProperties.getBufferSize())
                .toSerialized();
logDispatchScheduler = new TestScheduler()

我有一个单元测试:

@Test
public void testLogServiceSlow() {
    loggingProperties.setBufferSize(1);
    // rx.ring-buffer.size property stores the size of any in-memory ring buffers that RxJava uses when an
    // Observable cannot keep up with rate of event emissions.
    // default value is 128 on the JVM; RxJava 2.x makes this configurable in flatMap
    System.setProperty("rx.ring-buffer.size", "2");
    // this is what persist does  
    when(restOperations.postForEntity(anyString(), any(HttpEntity.class), eq(Void.class)))
            .thenAnswer(invocation -> {
                Thread.sleep(500);
                return ResponseEntity.ok().build();
            });

    logServiceClient.persistLogs(logs);
    scheduler.advanceTimeBy(2L, TimeUnit.SECONDS);

    System.clearProperty("rx.ring-buffer.size");
    Mockito.verify(restOperations, times(2))
            .postForEntity(Mockito.eq("http://log:9000/log/service"), logsCaptors.capture(), eq(Void.class));
}

测试失败,因为尽管有Thread.sleep,但没有产生背压。我不明白为什么不;内部环形缓冲区应在 2 个项目后填满,其余项目应丢弃。


解决方案


回答我自己的问题:

int ringBufferSize = Optional.ofNullable(System.getProperty("rx.ring-buffer.size"))
    .map(Integer::parseInt)
    .orElse(128);
logItemPublisher
    .buffer(
            loggingProperties.getBufferTimeoutMillis(),
            TimeUnit.MILLISECONDS,
            loggingProperties.getBufferSize(),
            logDispatchScheduler
    )
    .onBackpressureDrop(droppedLogsHandler)
    .observeOn(logDispatchScheduler, ringBufferSize)
    .flatMap(logs -> persist(logs, url))
    .subscribe();

然后在测试中,直接设置即可ringBufferSize = 2,无需设置系统属性rx.ring-buffer.size



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:http://www.javaheidong.com/blog/article/641606/84aa72e391c25f496b78/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

17 0
收藏该文
已收藏

评论内容:(最多支持255个字符)