我尝试基于反应堆(Reactor + WebFlux)创建一些基本的Spring 5应用程序。

我的下一个目标是实现能够执行以下操作的反应式存储库:


保存书。
查找所有书籍。


我的存储库需要涵盖以下情形:

方案A:


没有人订阅FindAll
有人保存一本书(id = 1)
Client1订阅FindAll
书籍(id = 1)被推送到Client1(Client1保持订阅状态,流未完成!)
有人保存一本书(id = 2)
书籍(id = 2)被推送到Client1(Client1保持订阅状态,流未完成!)


因此,IMO这种情况是冷热概念的混合。在任何人订阅之前,我们会收集某人要保存在我们存储库中某些缓冲区中的数据(让我们说正常列表)。对于将要订阅FindAll的所有订阅者,我们需要推送缓冲列表(在订阅之前收集的列表),并且不要完成流以允许以后推送集合更新。

我能够做到这一点,但我仍在思考是否有任何更简单的方法可以做到这一点?在Reactor项目中也许有一种解决方案已经涵盖了这种情况?

我的实现:

public class InMemoryBookRepository {

private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();

@Override
public void save(Book book) {
    bookMap.put(book.getId(), book);
    fluxSink.next(book);
}

@Override
public Flux<Book> findAll() {
    //without fromIterable I cannot push books that where saved before someone subscribed
    return Flux.fromIterable(bookMap.values())
            .concatWith(hotFlux)
            //Unfortunately this solution produces duplicates so we need to filter them
            .distinct();
}
}


Ofc,我不能只使用Cold Publisher,因为流将在发布收集的Books之后完成。出于同样的原因,我不能使用Hot one,因为我会错过在某人订阅之前生成的元素。

旁注:在我的代码中,我的地图没有任何清理机制,因此它有时会产生异常,但这暂时不重要。

最佳答案

如此简单...我不知道为什么错过了这个漂亮的运算符:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--

因此,基本上删除整个代码的List/Map部分,并使用replay()代替publish()

简化示例:

UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect();

hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));

fluxSink.next("one");

hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));

fluxSink.next("two");


publish()输出:

1st subscriber: one
1st subscriber: two
2nd subscriber: two


replay()输出:

1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two

关于java - react 库,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49295745/

10-09 19:25