1、LTS初步认识

1.1、lts是什么?

LTS(light-task-scheduler)是一个分布式任务调度框架,支持设置节点组搭建多节点集群水平拓展框架架构,提供多种任务类型包括实时任务、定时任务、corn任务的支持, 主要用于系统中的任务调度以及调度信息监控等

1.2、lts能够解决什么问题?

  • 支持分布式,避免单点故障,支持动态扩容,容错重试
  • Spring扩展支持,SpringBoot支持,Spring Quartz Cron任务的无缝接入支持
  • 节点监控、任务执行状态监控、jvm等信息监控
  • 多节点任务单一执行、故障转移

1.3、lts相比于其它任务调度框架的优势?

  • 参考文档LTS业务场景说明.pdf

2、LTS架构说明

2.1、lts节点类型

  • JobClient:主要负责提交任务, 并接收任务执行反馈结果。
  • JobTracker:负责接收并分配任务,任务调度。
  • TaskTracker:负责执行任务,执行完反馈给JobTracker。
  • LTS-Admin:(管理后台)主要负责节点管理,任务队列管理,监控管理等。

2.2、注册中心支持

  • zookeeper
  • redis

2.3、节点通信方式

  • netty

2.4、任务队列和日志存储

  • mysql
  • mongodb

2.5、任务类型

  • 实时任务:提交了之后立即就要执行的任务。
  • 定时任务:在指定时间点执行的任务,譬如 今天3点执行(单次)。
  • Cron任务:CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * ?

2.6、官方架构图

2.7、任务执行流程图

3、LTS简单实战示例

3.1、JobClient提交示例

@Before
public void before(){
    job = new Job();
    job.setTaskId(randomTaskId);
    job.setParam("shardValue", shardValue);
    job.setSubmitNodeGroup(nodeGroup);
    job.setTaskTrackerNodeGroup("taskTracker_node_group");
    job.setNeedFeedback(true);
    // 当任务队列中存在这个任务的时候,是否替换更新
    job.setReplaceOnExist(true);
}

/** 提交实时job */
@Test
public void submitRealTimeJob(){
    response = jobClient.submitJob(job);
}

/** 提交corn任务 */
@Test
public void submitCronJob() {
    // corn表达式, 每5s执行一次
    job.setCronExpression("0/5 * * * * ? ");
    response = jobClient.submitJob(job);
}

/** 提交重复任务 */
@Test
public void submitRepeatJob() {
    // 一共执行5次
    job.setRepeatCount(5);
    // 10s执行一次
    job.setRepeatInterval(10 * 1000L);
    response = jobClient.submitJob(job);
}

/** 提交定时触发任务 */
@Test
public void submitTriggerTimeJob() {
    // 1分钟后执行
    job.setTriggerTime(DateUtils.addMinute(new Date(), 1).getTime());
    Response response = jobClient.submitJob(job);
    System.out.println(response);
}

3.2、TaskTracker job定义说明

@LTS
public class JobApiScheduler {

    public static final Logger logger = Logger.getLogger(JobApiScheduler.class);

    @JobRunnerItem(shardValue = "apiTestJob")
    public Result apiTestJob(JobContext jobContext){
        try {
            logger.info(">>>>>>>>> apiTestJob run...");
            BizLogger bizLogger = jobContext.getBizLogger();
            // 会发送到 LTS (JobTracker上)
            bizLogger.info("run apiTestJob success");
        } catch (Exception e) {
            logger.info(">>>>>>>>> run apiTestJob failed!", e);
            return new Result(Action.EXECUTE_LATER, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "success");
    }
}

3.3、web控制台功能演示

3.4、多节点任务单一执行

  • 启动多个TaskTracker节点, 发起一个任务查看是否只有一个TaskTracker执行

3.5、故障转移

  • 一个TaskTracker节点挂掉后, 重新选举Master节点, 其它任务负载在未故障的节点

3.6、failStore

  • TaskTracker节点挂掉后未能成功接收到JobTracker发送的任务, 当TaskTracker重新上线后恢复执行

3.7、用户自定义参数

@LTS
public class JobParamScheduler {

    public static final Logger logger = Logger.getLogger(JobParamScheduler.class);

    @JobRunnerItem(shardValue = "paramJob")
    public Result paramJob(JobContext jobContext){
        try{
            logger.info(">>>>>>>>> paramJob run...");
            // 获取用户输入参数
            Map<String, String> extParams = jobContext.getJob().getExtParams();
            Integer orgId = Integer.parseInt(extParams.get("orgId").toString());
            logger.info(String.format("开始租户%s相关任务处理..."));
            BizLogger bizLogger = jobContext.getBizLogger();
            // 会发送到 LTS (JobTracker上)
            bizLogger.info("run paramJob success");
        }catch(Exception e) {
            logger.info(">>>>>>>>> run paramJob failed!", e);
            return new Result(Action.EXECUTE_LATER, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "success");
    }
}

4、原理分析, 相关类介绍

4.1、配置常量

  • com.github.ltsopensource.core.constant.ExtConfig

4.2、mysql相关

  • com.github.ltsopensource.monitor.access.mysql.MysqlAbstractJdbcAccess
  • com.github.ltsopensource.monitor.access.mysql.MysqlJobClientMAccess
  • com.github.ltsopensource.monitor.access.mysql.MysqlJobTrackerMAccess
  • com.github.ltsopensource.monitor.access.mysql.MysqlTaskTrackerMAccess

4.3、任务分发

  • com.github.ltsopensource.spring.tasktracker.JobDispatcher

4.4、任务处理器

  • com.github.ltsopensource.jobtracker.support.JobReceiver

4.5、负载均衡

  • com.github.ltsopensource.core.loadbalance.LoadBalance
  • com.github.ltsopensource.core.loadbalance.AbstractLoadBalance

4.6、任务拉取

  • com.github.ltsopensource.tasktracker.support.JobPullMachine

4.7、任务推送

  • com.github.ltsopensource.jobtracker.processor.JobPullProcessor

5、任务执行结果

LTS框架提供四种执行结果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,并对每种结果采取相应的处理机制,譬如重试。

  • EXECUTE_SUCCESS: 执行成功,这种情况,直接反馈客户端(如果任务被设置了要反馈给客户端)。
  • EXECUTE_FAILED:执行失败,这种情况,直接反馈给客户端,不进行重试。
  • EXECUTE_LATER:稍后执行(需要重试),这种情况,不反馈客户端,重试策略采用30s的策略,默认最大重试次数为10次,用户可以通过参数设置修改这些参数。
  • EXECUTE_EXCEPTION:执行异常, 这中情况也会重试(重试策略,同上)

6、任务调度方案比较

主要根据LTS支持的几种任务(实时任务、定时任务、Cron任务,Repeat任务)和其他一些开源框架在应用场景上做比较

这种场景下,当任务量比较小的时候,单机都可以完成的时候.自己采用线程池或者去轮训数据库取任务的方式(或者其他方式)就可以解决 · 但如果是任务执行时间比较长或者任务量比较大,单机不足以满足需求的时候,就需要自己去做分布式的功能,还有很重要的是,怎么做容错,怎么保证同一个任务只被一个节点执行,怎么解决执行失败异常的情形等等,你就需要自己去做很多事情,头可能就大了。这时候 LTS 就派上用场了.因为这些问题 LTS 都帮你解决了,你只需关注你的业务逻辑,而不用为上面的这些事情而烦恼。当然这时候有人可能会想到如果用 MQ 去解决,利用 MQ 的异步去解耦,也同时可以实现分布还有容错等。当然有时候是可以的,为什么说是可以的呢,因为 LTS 的架构也和 MQ 的类似, JobClient 相当于 MQ 的 Producer , JobTracker 相当于 MQ 的 Broker , TaskTracker 相当于 MQ 的 Consumer ,经过我这么一说,是不是觉得貌似是很像哈。但是我又为什么说是有时候是可以的呢,而不是一定是可以的呢,因为如果你同一个任务(消息)提交 MQ 两次. MQ 队列中有两条同样的任务消息,那么当你这个任务不能有两个节点同时执行的时候(同时执行一个任务可能出现各种问题) , MQ 就不能满足了,因为他不知道你这两条消息是同一个任务,它会把这两条消息可能会发给两个不同的节点同时执行(或者同一个节点的不同线程去执行),这时候你就需要自己去做一些事情去保证同一个任务不能同时被两个线程(或节点)去执行问题,这时候头又大了,那么 LTS 又派上用场了,以为 LTS 就可以保证这一点。说到任务调度.很多人一下就想到了 QuartZ ,对于这种实时任务的情况. QuartZ 是不太适合的,它也不能很好的解决故障转移(譬如执行中的节点突然停掉了, QuartZ 不能将这个执行中的任务立马分配给其他节点执行,最多设置了 QuartZ 的可恢复性,在停掉的节点重启之后重新执行该任务.但如果这个节点再也不启动起来了呢?那就只能呵呵了)等问题,这类场景 QuartZ 就不做比较了。有些人可能问,说了这么多,你倒是举个例子呀。嗯,我举几个例子: 1 .发短信验证码,这种可以用 MQ 去实现,也可以单机去实现(如果你量不大的话),当然 LTS 也是可以的.如果你量非常非常大的话,建议还是用性能比较好的 MQ 代替 2 .实时的给在线用户算数据,触发者是用户自己(自己手动点),但是算任务的只能同时由一个线程去执行,这是就可以用 LTS 了

某个时间点触发这种场景下,和实时任务相比,只有一个不同,就是要指定一个时间点去执行,可能是 1 个小时之后,可能是 1 天之后.也可能是 1 天,小时之后。有些人可能用轮训的业务数据库的方式去解决,轮训业务数据库有什么问题呢.当然你数据量很小我就不说了。如果你数据量还比较大的情况下,轮训数据库,势必会影响业务查询,如果有其他业务查询的话。还有就是对于分布式的支持不是很好,还有当表中存在多种不同执行(延迟)时间的任务,这个轮训频率就比较关键了,太短,影响性能,太长,影响业务,执行不及时.导致任务执行延迟太久,等等。这时候如果用MQ ,虽然有些 MQ 支持延迟队列 (RabbitMQ , RocketMQ 等).但他们都是指定的一些特定的 Level 级别延迟,但是不支持任意时间精度.譬如, 1 min , 5 min . 10 min 等等,但如果是 7 分钟,或者 20 天之后呢。如果 MQ 支持任意时间精度,那么它的性能就只能呵呵了,这种情况 MQ 就排除了,但是如果 MQ 的这些特定的 Level 刚好满足你的需求,那么选 MQ 也是可以的。再说说 Quartz吧, Quartz 可以支持定时任务.支持某个时间点触发,也支持集群,它在架构上是分布式的,没有负责几种管理的节点。 Quartz 是通过数据库行级锁的方式实现多线程之间任务争用的问题。行锁有嘟些特点呢,开销大,加锁慢,会出现死锁,并发度相比表级锁,页级锁高一点。但是在任务量比较大的时候,并发度较大的时候,行级锁就显得比较吃力了,而且很容易发生死锁。那么 LTS 是怎么解决并发性的问题的呢, LTS 采用预加载和乐观锁的方式,批量的将部分要执行的任务预加载到内存中,所以在取任务的时候只需要两步: 1 .从内存中取到一个任务,当然内存中保证同一个线程拿到同一个任务是很容易的也是很高效的, 2 .将拿到的这个任务对应的数据库记录锁住,那么这里采用乐观锁, CAS 的方式去修改记录(如果任务己经被别的节点拿走了,那么重新执行 1 , 2 步,这种己经被别的节点拿走的情况,主要是在多个 JobTracker 的情形下,单个 JobTracker 不会出现这种情况,但是在多个 JobTracker 下,内存中的预加载数据采用不同步长的方式来减小两个 JobTracker 内存中数据重复的概率,很好的解决了这个问题,这里稍微提下 》 ,所以这个时候LTS相对于QuartZ 的优势一下就体现出来了。还有就是上面说的 Quartz 对故障转移做的不是很好。还有就是当 QuartZ 对应的 MySQL 数据库挂了,这时候问题就来了客户端提交的任务提交不成功了,那么有人会想将这些数据保存在内存中,等 MySOL 重启起来了再重试提交,那么如果你当前节点也挂了呢,你内存中的数据就会全部丢失了.所以这时候你需要自己额外的去做一些失败任务本地持久化的工作.不过如果你用LTS的话, LTS 支持Failstore ,任务提交失败了,自动帮你本地持久化,然后待 JobTracker 可用的时候重试,不管你是 JobTracker 挂了,还是 JobTracker 对应的数据库挂了,都是 ok 的。举个例子吧,在一个小时之后给某些用户发短信,或者当用户点击退款操作之后,从点击退货的这个时间点开始, n 天后将这个退款关闭

这种场景下,和定时任务相比,不一样的地方,就只有.这个是会重复执行的,相当于重复执行的定时任务。所以这种场景下的对比,可以继续考照定时任务的对比。 LTS在这种场景下提供的特性有,提供统一的后台监控和后台管理。当某次定时任务执行失败,会执行重试操作,并提供执行日志

模块说明

  • 开发中主要需要进行mogu-lts-taskTracker模块代码进行更新, 其它模块基本不需要进行任何改动 

开发步骤

I、编写Java代码

@LTS
public class SampleJob {

    @JobRunnerItem(shardValue = "sampleJob")
    public Result sampleJob(JobContext jobContext){
        try {
            BizLogger bizLogger = jobContext.getBizLogger();
            // 会发送到 LTS (JobTracker上)
            bizLogger.info(jobContext.getJob().getExtParams().get("shardValue") + " execute completed!");
        } catch (Exception e) {
            return new Result(Action.EXECUTE_LATER, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "success");
    }
}
  • @LTS用来标注该类为一个job类, 只有添加了该注解, 在这个类下面的job方法才会被扫描到
  • @JobRunnerItem用来指定一个job方法, 需要通过该注解的shardValue属性来指定该job方法的ID, 后续添加任务会用到, 需保证唯一性
  • 通过com.github.ltsopensource.tasktracker.runner.JobContext可以获取添加任务时指定的任务参数, 也可以获取BizLogger对象, 用来打印日志, 只有通过该类打印的日志才会被反馈到jobTracker去, 并被记录到job日志信息去
  • 方法返回信息请参考如上写法
01-08 00:56