我是RX-Java的新手,我正在尝试设计一种API,其流程如下所述:
Make REST call A to load data
|
|
data not found? | data found
------------------------------------
| |
| |
| |
Make REST Call B Load DB Data 1
| |
| |
| _________________________
| | Parallel |
| | |
| | |
| (condition using DB data 1) (condition using DB data 1)
| Load REST Data C Load DB Data 2
| | |
| |________________________|
| |
| |
Build Response Build Response
假设数据库方法和服务调用返回Observable,需要使用rx运算符对上面的框架流程进行一些说明吗?
我将在下面分享阻塞性伪代码:
Response = REST_Call_1(); // on error throw Exception
if (isResponseValid(response)) { // returns Boolean
if (responseUnderReview(response)) { // validation func
throw Exception;
} else {
//db_data_1 and db_data_2 can be parallel
db_data_1 = Load_DB_Data_1();
// Load data_3 and data_2 based on db_data_1
if (is_data_3_required(db_data_1)) {
data_3 = REST_call_2();
}
if (is_data_2_required(db_data_1)) {
db_data_2 = REST_call_2();
}
buildResponse(db_data_1, db_data_2, data_3, Response);
}
} else {
Response = REST_Call_3(); // on error throw Exception
buildResponse(response);
}
我正在寻找一种完整的非阻塞异步方法。
最佳答案
塔索斯的答案对我来说似乎很合理,我看不到其他任何问题,但
// dataSource.getItemDetails(activityId returns List<ItemDetail> and is a blocking call
// So, I want to run it on a separate IO thread.
return Observable.from(dataSource.getItemDetails(activityId)).observeOn(Schedulers.io());
如果是这样,可以按以下步骤将阻塞的单元素调用转换为线程外调用:
Observable.fromCallable(() -> yourBlockingCall())
.subscribeOn(Schedulers.io())
.flatMapIterable(v -> v)
...
要么
Observable.defer(() -> Observable.from(yourBlockingCall()))
.subscribeOn(Schedulers.io())
...
编辑:基于该图,我将设置以下流程:
serviceCallA()
.flatMap(a -> {
if (dataFound(a)) {
return dbCall1()
.flatMap(db1 -> {
Observable o1 = shouldCallServiceC(db1)
? serviceCallC() : just(placeholderC);
Observable o2 = shouldCallDB2(db1)
? dbCall2() ? just(placeHolderDb2);
return zip(o1, o2, (c, d) -> createResult(c, d));
});
}
return serviceCallB()
.map(c -> mapToResultType(c));
});