本文介绍了RxJS并行队列与并发工作者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我要下载10,000个文件.我可以轻松地建立这10,000个文件的队列(如果可以做的更好,我们很乐意为您提供建议),

Let's say I want to I download 10,000 files. I can easily build a queue of those 10,000 files (happy to take advice if any of this can be done better),

import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};

现在我有一个Rx.JS数组,它是根据表示队列的Promise创建的.现在针对我想要的行为,我要发布

Now I have an array of Rx.JS observable I've created from promises that represent my queue. Now for the behavior of what I want, I want to issue

  • 对服务器的三个并发请求
  • 请求完成后,我想触发一个新请求.

我可以针对此问题创建解决方案,但是要考虑 Rxjs队列,我从没用过,我想知道最正确的Rxjs方法是什么.

I can create a solution to this problem, but in light of things like the Rxjs queue, which I've never used I'm wondering what the right-most Rxjs way to do this is.

推荐答案

听起来您想要一个等效的forkJoin,它支持呼叫者指定的最大并发订阅数.

It sounds like you want an equivalent of forkJoin that supports a caller-specified maximum number of concurrent subscriptions.

可以使用mergeMap重新实现forkJoin并公开concurrent参数像这样:

It's possible to re-implement forkJoin using mergeMap and to expose the concurrent parameter, like this:

import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}

这篇关于RxJS并行队列与并发工作者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-29 06:35