一、前言


二、用法

1、引入

如下图所示,把下载的源码拷贝进来即可,核心代码放到java目录里面,测试代码放到test目录里面。

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


2、编写worker

1)、二十四节气worker

package com.example.async.worker;

import cn.hutool.http.HttpUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 二十四节气worker
 * </p>
 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 18:01
 */
@Slf4j
public class TwentyFourWorker implements IWorker<String, String>, ICallback<String, String> {

   public static final String APPKEY = "xxxxxx";// 你的appkey
   public static final String URL = "https://api.jisuapi.com/jieqi/query";

   @Override
   public void begin() {
      // System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
   }

   @Override
   public void result(boolean success, String param, WorkResult<String> workResult) {
      if (success) {
         System.out.println("callback twentyFourWorker success--" + SystemClock.now() + "----" + workResult.getResult()
               + "-threadName:" +Thread.currentThread().getName());
      } else {
         System.err.println("callback twentyFourWorker failure--" + SystemClock.now() + "----"  + workResult.getResult()
               + "-threadName:" +Thread.currentThread().getName());
      }
   }

   /**
    * 查询二十四节气
    */
   @Override
   public String action(String object, Map<String, WorkerWrapper> allWrappers) {
      String url = URL + "?appkey=" + APPKEY;
      String result = HttpUtil.get(url);

      // 模拟时长
      try {
         TimeUnit.SECONDS.sleep(5);
      } catch (Exception e) {
         log.error("[二十四节气]>>>> 异常: {}", e.getMessage(), e);
      }

      return result;
   }

   @Override
   public String defaultValue() {
      return "twentyFourWorker";
   }
}

2)、星座worker

package com.example.async.worker;

import cn.hutool.http.HttpUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 星座worker
 * </p>
 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 18:01
 */
@Slf4j
public class ConstellationWorker implements IWorker<String, String>, ICallback<String, String> {

   public static final String APPKEY = "xxxxxx";// 你的appkey
   public static final String URL = "https://api.jisuapi.com/astro/all";

   @Override
   public void begin() {
      // System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
   }

   @Override
   public void result(boolean success, String param, WorkResult<String> workResult) {
      if (success) {
         System.out.println("callback constellationWorker success--" + SystemClock.now() + "----" + workResult.getResult()
               + "-threadName:" +Thread.currentThread().getName());
      } else {
         System.err.println("callback constellationWorker failure--" + SystemClock.now() + "----"  + workResult.getResult()
               + "-threadName:" +Thread.currentThread().getName());
      }
   }

   /**
    * 查询星座
    */
   @Override
   public String action(String object, Map<String, WorkerWrapper> allWrappers) {
      String url = URL + "?appkey=" + APPKEY;
      String result = HttpUtil.get(url);

      // 模拟异常
      //    int i = 1/0;

      // 模拟时长
      try {
         TimeUnit.SECONDS.sleep(5);
      } catch (Exception e) {
         log.error("[星座]>>>> 异常: {}", e.getMessage(), e);
      }

      return result;
   }

   @Override
   public String defaultValue() {
      return "constellationWorker";
   }
}

3、异步编排

package com.example.async.service;

import com.example.async.worker.ConstellationWorker;
import com.example.async.worker.TwentyFourWorker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
 * <p>
 * AsyncTools服务
 * </p>
 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 17:56
 */
@Service
@Slf4j
public class AsyncToolService {

   /**
    * 异步返回结果
    *     ---- 方式:AsyncTool并行处理
    *
    * @return 结果
    */
   public Map<String, Object> queryAsync() throws ExecutionException, InterruptedException {
      // 声明worker
      TwentyFourWorker twentyFourWorker = new TwentyFourWorker();
      ConstellationWorker constellationWorker = new ConstellationWorker();

      // 构建二十四节气worker
      WorkerWrapper<String, String> twentyFourWrapper =  new WorkerWrapper.Builder<String, String>()
            .worker(twentyFourWorker)
            .callback(twentyFourWorker)
            .param("0")
            .build();

      // 构建星座worker
      WorkerWrapper<String, String> constellationWrapper =  new WorkerWrapper.Builder<String, String>()
            .worker(constellationWorker)
            .callback(constellationWorker)
            .param("1")
            .build();

      // 开始工作,这里设定超时时间10s,测试时可以设短一点看效果。
      Async.beginWork(10000, twentyFourWrapper, constellationWrapper);

      // 打印当前线程数
      log.debug("----------------- 当前线程数 ----------------");
      log.debug(Async.getThreadCount());

      // 打印结果
      log.debug("----------------- 二十四节气 ----------------");
      log.debug("结果: {}", twentyFourWrapper.getWorkResult());
      log.debug("----------------- 星座 ----------------");
      log.debug("结果: {}", constellationWrapper.getWorkResult());

      // 返回
      Map<String, Object> map = new HashMap<>();
      map.put("twentyFour", twentyFourWrapper.getWorkResult());
      map.put("constellation", constellationWrapper.getWorkResult());

      // 关闭(spring web类应用不用关闭,否则第二次执行会报线程池异常。)
      // Async.shutDown();

      return map;
   }
}

4、测试效果

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


三、常用编排

AsyncTool的QuickStart.md已经做了简洁的说明:https://gitee.com/jd-platform-opensource/asyncTool/blob/master/QuickStart.md

1)、任务并行

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


2)、串行+并行

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


3)、依赖其他任务的结果

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


四、避坑经验

1、勿关闭线程池

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


2、自定义线程池

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP

【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。-LMLPHP


五、示例代码



原创文章纯手打,觉得有点帮助就请点个推荐吧。


不定期分享工作经验和趣事,喜欢的话就请关注一下吧。


喜欢就点一下推荐吧~~
04-28 09:57