我尝试基于反应堆(Reactor + WebFlux)创建一些基本的Spring 5应用程序。
我的下一个目标是实现能够执行以下操作的反应式存储库:
保存书。
查找所有书籍。
我的存储库需要涵盖以下情形:
方案A:
没有人订阅FindAll
有人保存一本书(id = 1)
Client1订阅FindAll
书籍(id = 1)被推送到Client1(Client1保持订阅状态,流未完成!)
有人保存一本书(id = 2)
书籍(id = 2)被推送到Client1(Client1保持订阅状态,流未完成!)
因此,IMO这种情况是冷热概念的混合。在任何人订阅之前,我们会收集某人要保存在我们存储库中某些缓冲区中的数据(让我们说正常列表)。对于将要订阅FindAll的所有订阅者,我们需要推送缓冲列表(在订阅之前收集的列表),并且不要完成流以允许以后推送集合更新。
我能够做到这一点,但我仍在思考是否有任何更简单的方法可以做到这一点?在Reactor项目中也许有一种解决方案已经涵盖了这种情况?
我的实现:
public class InMemoryBookRepository {
private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();
@Override
public void save(Book book) {
bookMap.put(book.getId(), book);
fluxSink.next(book);
}
@Override
public Flux<Book> findAll() {
//without fromIterable I cannot push books that where saved before someone subscribed
return Flux.fromIterable(bookMap.values())
.concatWith(hotFlux)
//Unfortunately this solution produces duplicates so we need to filter them
.distinct();
}
}
Ofc,我不能只使用Cold Publisher,因为流将在发布收集的Books之后完成。出于同样的原因,我不能使用Hot one,因为我会错过在某人订阅之前生成的元素。
旁注:在我的代码中,我的地图没有任何清理机制,因此它有时会产生异常,但这暂时不重要。
最佳答案
如此简单...我不知道为什么错过了这个漂亮的运算符:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--
因此,基本上删除整个代码的List/Map
部分,并使用replay()
代替publish()
简化示例:
UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect();
hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));
fluxSink.next("one");
hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));
fluxSink.next("two");
用
publish()
输出:1st subscriber: one
1st subscriber: two
2nd subscriber: two
用
replay()
输出:1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two
关于java - react 库,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49295745/