我是RX-Java的新手,我正在尝试设计一种API,其流程如下所述:

            Make REST call A to load data
                        |
                        |
        data not found? |  data found
    ------------------------------------
    |                                  |
    |                                  |
    |                                  |
Make REST Call B                  Load DB Data 1
    |                                  |
    |                                  |
    |                              _________________________
    |                             |       Parallel         |
    |                             |                        |
    |                             |                        |
    |                  (condition using DB data 1)   (condition using DB data 1)
    |                      Load REST Data C                Load DB Data 2
    |                             |                        |
    |                             |________________________|
    |                                         |
    |                                         |
Build Response                            Build Response

假设数据库方法和服务调用返回Observable,需要使用rx运算符对上面的框架流程进行一些说明吗?

我将在下面分享阻塞性伪代码:
Response = REST_Call_1(); // on error throw Exception

if (isResponseValid(response)) { // returns Boolean
    if (responseUnderReview(response)) { // validation func
        throw Exception;
    } else {
        //db_data_1 and db_data_2 can be parallel
        db_data_1 = Load_DB_Data_1();

        // Load data_3 and data_2 based on db_data_1
        if (is_data_3_required(db_data_1)) {
            data_3 = REST_call_2();
        }
        if (is_data_2_required(db_data_1)) {
            db_data_2 = REST_call_2();
        }
        buildResponse(db_data_1, db_data_2, data_3, Response);
    }
} else {
    Response = REST_Call_3(); // on error throw Exception
    buildResponse(response);
}

我正在寻找一种完整的非阻塞异步方法。

最佳答案

塔索斯的答案对我来说似乎很合理,我看不到其他任何问题,但

// dataSource.getItemDetails(activityId returns List<ItemDetail> and is a blocking call
// So, I want to run it on a separate IO thread.
return Observable.from(dataSource.getItemDetails(activityId)).observeOn(Schedulers.io());

如果是这样,可以按以下步骤将阻塞的单元素调用转换为线程外调用:
Observable.fromCallable(() -> yourBlockingCall())
.subscribeOn(Schedulers.io())
.flatMapIterable(v -> v)
...

要么
Observable.defer(() -> Observable.from(yourBlockingCall()))
.subscribeOn(Schedulers.io())
...

编辑:基于该图,我将设置以下流程:
serviceCallA()
.flatMap(a -> {
    if (dataFound(a)) {
        return dbCall1()
           .flatMap(db1 -> {
               Observable o1 = shouldCallServiceC(db1)
                    ? serviceCallC() : just(placeholderC);
               Observable o2 = shouldCallDB2(db1)
                    ? dbCall2() ? just(placeHolderDb2);

               return zip(o1, o2, (c, d) -> createResult(c, d));
           });
    }
    return serviceCallB()
        .map(c -> mapToResultType(c));
});

08-06 04:27