一、背景

接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢?

二、方案设计

在 Spring 中,AsyncContext 是用于支持异步处理的一个重要的特性。它允许我们在 servlet 请求处理过程中,将长时间运行的操作放在一个单独的线程中执行,而不会阻塞其他请求的处理。

AsyncContext 在以下两种情况下特别有用:

  1. 长时间运行的操作:当我们需要执行一些耗时的操作,例如网络请求、数据库查询或其他 I/O 操作时,通过将这些操作放在一个新的线程中,可以避免阻塞 servlet 容器中的线程,提高应用的并发性能。

  2. 推送异步响应:有时候,我们可能需要推送异步产生的响应,而不是等到所有操作都完成后再下发响应。通过 AsyncContext,我们可以在任何时间点上触发异步响应,将结果返回给客户端。

使用 AsyncContext 的步骤如下:

  1. 在 servlet 中启用异步模式:在 servlet 中,通过调用 startAsync() 方法,可以获取到当前请求的 AsyncContext 对象,从而启用异步处理模式。
HttpServletRequest request = ...;
AsyncContext asyncContext = request.startAsync();
  1. 指定异步任务:通过调用 AsyncContext 对象的 start() 方法,在新的线程中执行需要异步处理的任务。
asyncContext.start(() -> {
    // 异步任务逻辑
});
  1. 提交响应:在异步任务完成后,可以调用 AsyncContext 对象的 complete() 方法,以表示异步操作完成。
asyncContext.complete();

需要注意的是,我们在使用 AsyncContext 时需要特别注意线程安全。由于异步任务在单独的线程中执行,所以可能存在并发问题。因此,在编写异步任务逻辑时,需要注意线程安全性,使用合适的同步措施。

另外,AsyncContext 也支持超时设置、错误处理、事件监听等功能,这些可以通过相应的方法和回调进行配置。可以根据具体的需求使用这些功能来优化异步处理的逻辑。

总结来说,Spring 的 AsyncContext 提供了方便的异步处理机制,可以提高应用的并发性能,并支持推送异步响应,使得应用更具有响应性和可伸缩性。

三、代码1

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.*;

@RestController
@RequestMapping("/api/byai/transform")
@Slf4j
public class AsyncTestController {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();


//    private static boolean result = false;

    @PostMapping("/async")
    public void async(HttpServletRequest request, HttpServletResponse response) {
        // 创建AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        // 设置处理超时时间8s
        asyncContext.setTimeout(8000L);
        // asyncContext监听
        JdAsyncTestListener asyncListener = new JdAsyncTestListener(redisTemplate,asyncContext);
        asyncContext.addListener(asyncListener);
        // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
        asyncContext.start(asyncListener);
    }

    // 模拟业务处理完成
    @PostMapping("/set")
    public ResultModel notify(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
        return ResultModel.success();
    }

    @PostMapping("/get")
    public ResultModel get(String key) {
        String s = redisTemplate.opsForValue().get(key);
        return ResultModel.success(s);
    }

    @PostMapping("/del")
    public ResultModel del(String key) {
        redisTemplate.delete(key);
        return ResultModel.success();
    }
}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.*;

@RestController
@RequestMapping("/api/test")
@Slf4j
public class AsyncTestController {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();


//    private static boolean result = false;

    @PostMapping("/async")
    public void async(HttpServletRequest request, HttpServletResponse response) {
        // 创建AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        // 设置处理超时时间8s
        asyncContext.setTimeout(8000L);
        // asyncContext监听
        JdAsyncTestListener asyncListener = new JdAsyncTestListener(redisTemplate,asyncContext);
        asyncContext.addListener(asyncListener);
        // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
        asyncContext.start(asyncListener);
    }

    // 模拟业务处理完成
    @PostMapping("/set")
    public ResultModel notify(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
        return ResultModel.success();
    }

    @PostMapping("/get")
    public ResultModel get(String key) {
        String s = redisTemplate.opsForValue().get(key);
        return ResultModel.success(s);
    }

    @PostMapping("/del")
    public ResultModel del(String key) {
        redisTemplate.delete(key);
        return ResultModel.success();
    }
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import java.io.IOException;


@Slf4j
public class JdAsyncTestListener implements AsyncListener,Runnable {
    boolean isComplete;

    private RedisTemplate<String, String> redisTemplate;
    private AsyncContext asyncContext;
    public JdAsyncTestListener(RedisTemplate<String, String> redisTemplate, AsyncContext asyncContext) {
        this.redisTemplate = redisTemplate;
        this.asyncContext = asyncContext;
    }

    @Override
    public void run() {
        try {
            while(true){
                if(isComplete){
                    log.info("已经退出");
                    break;
                }
                boolean b = redisTemplate.opsForValue().get(1) != null;
                log.info("获取标志位:"+b);
                Thread.sleep(300);
                if (b) {
                    asyncContext.getResponse().getWriter().print(1);
                    asyncContext.complete();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onComplete(AsyncEvent asyncEvent) throws IOException {
        log.info("结束了");
        isComplete = true;

    }

    @Override
    public void onTimeout(AsyncEvent asyncEvent) throws IOException {
        log.info("超时了");
        isComplete = true;
    }

    @Override
    public void onError(AsyncEvent asyncEvent) throws IOException {

    }

    @Override
    public void onStartAsync(AsyncEvent asyncEvent) throws IOException {

    }
}

四、代码二

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

@Validated
@RestController
@RequestMapping("/api/test")
@Slf4j
public class TestController {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();

    private static boolean result = false;

    private final boolean isTimeout = false;


    /**
     * 消息
     *
     * @return
     */
    @PostMapping("/test")
    public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
        // 创建AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        String jdCustomerId = jdLongPollRequest.getJdCustomerId();
        // 设置处理超时时间8s
        asyncContext.setTimeout(8000L);
        // asyncContext监听
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent asyncEvent) throws IOException {
                log.info("onComplete={}", asyncEvent);
            }

            @Override
            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
                log.info("onTimeout={}", asyncEvent);
                ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
                map.put("code", "500");                asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
                asyncContext.complete();
            }

            @Override
            public void onError(AsyncEvent asyncEvent) throws IOException {
                log.info("onError={}", asyncEvent);
            }

            @Override
            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
                log.info("onStartAsync={}", asyncEvent);
            }
        });
        // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
        timeoutChecker.scheduleAtFixedRate(() -> {
            try {
                String redisKey = getcustomerProcessRes(customerId);
                String redisValue = redisTemplate.opsForValue().get(redisKey);
                result = StringUtils.isNotBlank(redisValue);
                if (result) {
                    //todo 长轮询查询数据库。通过customerId查询
                    send(customerId, redisValue);
                    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
                    map.put("code", "200");
                    map.put("msg", redisValue);
                    asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
                    asyncContext.complete();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, 0, 100L, TimeUnit.MILLISECONDS);
    }

    /**
     * 发送消息
     */
    private void send(String customerId, String content) {
        
    }
  
}
09-05 11:41