本文介绍了如何使用fromWebSocket Subject缓冲流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此(!)完美地描述了所需的结果:

This RxJava buffer example (with marble chart!) describes the desired result perfectly:

编辑:已审核 ,我的问题似乎与使用主题而非直接可观察相关。

having reviewed How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?, my issue appears related to using a Subject as opposed to straight Observable.

使用套接字流生成窗口关闭事件(如下所示)导致打开2个套接字并且没有事件流出:

Using the socket stream to generate window close events (as follows) results in 2 sockets opened and no events streaming out:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
    .subscribe(function(e) { console.log(e, 'socket messages');});


推荐答案

总结调查结果问题:


  • Rx.DOM.fromWebSocket 返回 Rx.subject 包裹websocket。该主题由一个观察者和一个可观察者(通过新的Rx.Subject(观察者,可观察的)组成。根据我的理解,观察者允许通过其写入套接字 onNext 方法,而observable允许从套接字读取。

  • 你总是看到主题是热门来源,但显然在这里只意味着观察者会立即将其值推送到主题,然后将其推送到套接字。在正常情况下( new Rx.Subject()),默认观察者和observable是这样的,observable监听观察者,因此默认的observable很热。但是,observable是一个冷源,然后任何订阅都会重新执行回调创建另一个websocket。因此创建了两个套接字。

  • 这种情况不会发生在例如 Rx.dom.fromEvent ,因为创建的(冷)可观察量是共享的(通过发布()。refCount())。

  • 因此在这里做同样的事情,可以解决重复问题。这意味着在这种特殊情况下,在代码中使用 ws = Rx.DOM.fromWebSocket(wsURI,null,wsOpenObserver,wsCloseObserver).share(); share publish()的别名.refCount()

  • 我不得不怀疑是否应该将 Rx.DOM.fromWebSocket 的行为报告为错误

  • Rx.DOM.fromWebSocket returns a Rx.subject which wraps around the websocket. That subject is made from one observer and one observable (via new Rx.Subject(observer, observable). From what I understood, that observer allows to write to the socket via its onNext method, while the observable allows to read from the socket.
  • you always read that subjects are hot sources, but apparently here that only means that the observer will immediately push its value to the subject which here pushes it to the socket. In normal cases(new Rx.Subject()), the default observer and observable are so that the observable listens to the observer, hence the default observable is hot. Here however, the observable is a cold source and then any subscription will reexecute the callback creating another websocket. Hence the creation of two sockets.
  • this does not happen for instance with Rx.dom.fromEvent because the created (cold) observable is shared (via publish().refCount()).
  • thus by doing the same here, the duplication issue can be solved. That means in this particular case, use in your code ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();, share being an alias for publish().refCount().
  • I have to wonder whether that behaviour of Rx.DOM.fromWebSocket should be reported as a bug

两种方法的代码:




  • https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/dom/websocket.js
  • https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/events/fromevent.js

这篇关于如何使用fromWebSocket Subject缓冲流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 16:47