本文介绍了调用bodyToMono AFTER exchange()后,block()/blockFirst()/blockLast()发生阻塞错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO会在生成文件时详细说明错误,而不是使用文件本身.这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用.

I am trying to use Webflux to stream a generated file to another location, however, if the generation of the file ran into an error, the api returns success, but with a DTO detailing the errors while generating the file instead of the file itself. This is using a very old and poorly designed api so please excuse the use of post and the api design.

api调用(exchange())的响应是ClientResponse.从这里,我可以使用bodyToMono转换为ByteArrayResource并将其传输到文件,或者,如果在创建文件时出错,那么我也可以使用bodyToMono转换为DTO.但是,我似乎既不执行任何操作,也不依赖于ClientResponse标头的内容.

The response from the api call (exchange()) is a ClientResponse. From here I can either convert to a ByteArrayResource using bodyToMono which can be streamed to a file, or, if there is an error in creating the file, then I can convert to the DTO also using bodyToMono. However, I cannot seem to do either or depending on the contents of the header of ClientResponse.

在运行时我收到由

我认为我的问题是我无法在同一功能链中两次调用block().

I think my issue is that I cannot call block() twice in the same function chain.

我的代码段如下:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

基本上我想根据标头中定义的MediaType来不同地处理ClientResponse.

Basically I want to process the ClientResponse differently based on the MediaType which is defined in the header.

这可能吗?

推荐答案

首先,几件事将帮助您理解解决此用例的代码段.

First, a few things that will help you understand the code snippet solving this use case.

  1. 永远不要在返回反应类型的方法中调用阻塞方法;您将阻塞应用程序的几个线程之一,这对应用程序来说是非常糟糕的
  2. 从Reactor 3.2开始,无论如何,在反应式管道内进行阻塞抛出错误
  3. 如注释中所述,
  4. 调用 subscribe 也不是一个好主意.它或多或少像在单独的线程中以一项任务开始该任务.完成后,您将获得一个回调(可以给 subscribe 方法指定lambdas),但实际上您正在将当前管道与该任务分离.在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端HTTP响应并清理资源.
  5. 如果您不想将整个响应缓冲在内存中,Spring会提供 DataBuffer (认为可以缓冲的ByteBuffer实例).
  6. 如果正在实现的方法本身是阻塞的(例如,返回 void ),例如在测试用例中,则可以调用阻塞.
  1. You should never call a blocking method within a method that returns a reactive type; you will block one of the few threads of your application and it is very bad for the application
  2. Anyway as of Reactor 3.2, blocking within a reactive pipeline throws an error
  3. Calling subscribe, as suggested in the comments, is not a good idea either. It is more or less like starting that job as a task in a separate thread. You'll get a callback when it's done (the subscribe methods can be given lambdas), but you're in fact decoupling your current pipeline with that task. In this case, the client HTTP response could be closed and resources cleaned before you get a chance to read the full response body to write it to a file
  4. If you don't want to buffer the whole response in memory, Spring provides DataBuffer (think ByteBuffer instances that can be pooled).
  5. You can call block if the method you're implementing is itself blocking (returning void for example), for example in a test case.

以下是您可以用来执行此操作的代码段:

Here's a code snippet that you could use to do this:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

如您所见,我们没有在任何地方阻塞,并且处理I/O的方法返回了 Mono< Void> ,这与 done(error)回调,指示何时完成操作以及是否发生错误.

As you can see, we're not blocking anywhere and methods dealing with I/O are returning Mono<Void>, which is the reactive equivalent of a done(error) callback that signals when things are done and if an error happened.

由于我不确定 createErrorFile 方法应该做什么,因此我为 createSpreadsheet 提供了一个示例,该示例仅将主体字节写入文件中.请注意,由于数据缓冲区可能被回收/池化,因此我们需要在完成后释放它们.

Since I'm not sure what the createErrorFile method should do, I've provided a sample for createSpreadsheet that just writes the body bytes to a file. Note that since databuffers might be recycled/pooled, we need to release them once we're done.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过此实现,您的应用程序将在给定时间在内存中保存几个 DataBuffer 实例(由于性能原因,反应性运算符正在预取值),并将以反应性方式写入字节

With this implementation, your application will hold a few DataBuffer instances in memory at a given time (the reactive operators are prefetching values for performance reasons) and will write bytes as they come in a reactive fashion.

这篇关于调用bodyToMono AFTER exchange()后,block()/blockFirst()/blockLast()发生阻塞错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 18:03