本文介绍了一旦我的线程中断,我怎么能中断RestTemplate调用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要创建一个库,在其中我将具有同步和异步功能。

I need to make a library in which I will have synchronous and asynchronous feature.


  • executeSynchronous() - 等到我有结果,返回结果。

  • executeAsynchronous() - 返回一个Future如果需要,可以在其他事情完成后立即处理。

  • executeSynchronous() - waits until I have a result, returns the result.
  • executeAsynchronous() - returns a Future immediately which can be processed after other things are done, if needed.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将使用该 DataKey 对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,并在我们将响应作为JSON字符串返回后,我们将通过创建 DataResponse 对象将JSON字符串发送回我们的客户。有些客户会调用 executeSynchronous(),有些可能会调用 executeAsynchronous(),这就是为什么我需要单独提供两种方法在我的图书馆。

The customer will use our library and they will call it by passing DataKey builder object. We will then construct a URL by using that DataKey object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse object. Some customer will call executeSynchronous() and some might call executeAsynchronous() so that's why I need to provide two method separately in my library.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我的 DataClient 实现上述客户端界面:

And then I have my DataClient which implements the above Client interface:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this looks right?
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

将执行实际任务的简单类:

Simple class which will perform the actual task:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

当我开始研究这个解决方案时,我不是终止已超时的任务。我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一)。所以我在网上进行了一些研究,我发现我可以通过在将来使用取消来取消已经超时的任务,如下所示 -

When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found out that I can cancel my tasks those have timed out by using cancel on future as shown below -

future.cancel(true);

但如果我按照上面的解决方案中所示这样做,那么我是否需要关闭线程中断后,其他资源如 RestTemplate ?如果是,那我该怎么做?另外,我们可以打断 RestTemplate 来电吗?因为我一旦任务超时就尝试取消取消我的未来,但我想我的线程没有被打断。

But if I am doing like this as shown in my above solution, then do I need to close any other resources like RestTemplate as soon as thread is interrupted? If yes, then how would I do that? Also, can we interrupt RestTemplate calls? Since I tried calling cancel on my future as soon as the task got timed out but I guess my thread doesn't got interrupted.

我们是否应该始终终止任务已经超时了?如果我们不这样做那么可能会产生什么影响?它会影响我的表现吗?

Should we always be terminating the tasks that have got timed out? If we don't do that then what might be the impact I will have? Will it affect my performance?

我目前的设置是否有更好的解决方案来处理这种情况?

Is there any better solution to deal with this case with my current setup?

推荐答案

有时无法中断线程,尤其是当线程在Socket上执行阻塞操作时。

Sometimes it is not possible to interrupt thread especially when thread performs blocking operations on Socket.

因此,不要在超时时取消任务,而应该在http连接上设置超时。

So instead of cancelling the task when it timeouts, you should rather set timeouts on http connection.

不幸的是timeousts是根据Connection Factory和RestTemplate设置的,因此每个请求必须使用它自己的RestTemplate。

Unfortunately timeousts are set per Connection Factory and RestTemplate, thus each request must use it's own RestTemplate.

您可以为每个任务创建新的RestTemplate,或者使用ThreadLocal或资源重用previusly创建的模板池。

You can create new RestTemplate per task, or reuse previusly created templates using ThreadLocal or resource pooling.

例如,使用Thread local的任务可能如下所示:

For example the task using Thread local might look like below:

    public class Task implements Callable<DataResponse> {

    private DataKey key;

    private ThreadLocal<RestTemplate> restTemplateThreadLocal =
            ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));

    public Task(DataKey key) {
        this.key = key;
    }

    private SimpleClientHttpRequestFactory getConnectionFactory(){
        return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            //it is up to you, how to set connection and read timeouts from provided key.getTimeout
            getConnectionFactory().setConnectTimeout(1000);
            getConnectionFactory().setReadTimeout(key.getTimeout());
            response = restTemplateThreadLocal.get().getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
   }

BTW。
Spring还提供了AsyncRestTemplate,它可以使您的代码更简单。
如果与Netty4ClientHttpRequestFactory一起使用,您可以获得基于NIO的客户端连接。在这种情况下,即使在进行Http连接时,您也应该能够中断任务。

BTW.Spring also provides AsyncRestTemplate, which may make your code simpler.If used with Netty4ClientHttpRequestFactory you can get NIO based client connections. In such case, you should be able to interrupt your tasks even while it makes Http connection.

下面的简短示例。它使用NIO,因此您无需关心超时后是否真的取消了请求。

Short sample below. It uses NIO thus you does not have to care if the request is really cancelled after Timeout.

        URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
        Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
        AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
        ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
        System.out.println("entity.get() = " + entity.get());
        asyncRequestFactory.destroy();

这篇关于一旦我的线程中断,我怎么能中断RestTemplate调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-21 15:08