4 Quartz 集成到 Spring

Spring-quartz 工程
Spring 在 spring-context-support.jar 中直接提供了对 Quartz 的支持
分布式之任务调度学习二-LMLPHP
可以在配置文件中把 JobDetail、Trigger、Scheduler 定义成 Bean。

4.1 定义 Job

<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
	<property name="name" value="my_job_1"/>
	<property name="group" value="my_group"/>
	<property name="jobClass" value="com.gupaoedu.quartz.MyJob1"/>
	<property name="durability" value="true"/>
</bean>

4.2 定义 Trigger

<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
	<property name="name" value="my_trigger_1"/>
	<property name="group" value="my_group"/>
	<property name="jobDetail" ref="myJob1"/>
	<property name="startDelay" value="1000"/>
	<property name="repeatInterval" value="5000"/>
	<property name="repeatCount" value="2"/>
</bean>

4.3 定义 Scheduler

<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
	<property name="triggers">
		<list>
			<ref bean="simpleTrigger"/>
			<ref bean="cronTrigger"/>
		</list>
	</property>
</bean>

既然可以在配置文件配置,当然也可以用@Bean 注解配置。在配置类上加上@Configuration 让 Spring 读取到。

public class QuartzConfig {
	@Bean
	public JobDetail printTimeJobDetail(){
		return JobBuilder.newJob(MyJob1.class)
		.withIdentity("gupaoJob")
		.usingJobData("gupao", "职位更好的你")
		.storeDurably()
		.build();
	}
	@Bean
	public Trigger printTimeJobTrigger() {
		CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");
		return TriggerBuilder.newTrigger()
		.forJob(printTimeJobDetail())
		.withIdentity("quartzTaskService")
		.withSchedule(cronScheduleBuilder)
		.build();
	}
}

5 动态调度的实现

springboot-quartz 工程
传统的 Spring 方式集成,由于任务信息全部配置在 xml 文件中,如果需要操作任务或者修改任务运行频率,只能重新编译、打包、部署、重启,如果有紧急问题需要处理,会浪费很多的时间。
有没有可以动态调度任务的方法?比如停止一个 Job?启动一个 Job?修改 Job 的触发频率?
读取配置文件、写入配置文件、重启 Scheduler 或重启应用明显是不可取的。
对于这种频繁变更并且需要实时生效的配置信息,我们可以放到哪里?
ZK、Redis、DB tables。
并且,我们可以提供一个界面,实现对数据表的轻松操作。

5.1 配置管理

这里我们用最简单的数据库的实现。
问题 1:建一张什么样的表?参考 JobDetail 的属性

CREATE TABLE `sys_job`(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', `job_name` varchar(512) NOT NULL COMMENT '任务名称', `job_group` varchar(512) NOT NULL COMMENT '任务组名', `job_cron` varchar(512) NOT NULL COMMENT '时间表达式', `job_class_path` varchar(1024) NOT NULL COMMENT '类路径,全类型', `job_data_map` varchar(1024) DEFAULT NULL COMMENT '传递 map 参数', `job_status` int(2) NOT NULL COMMENT '状态:1 启用 0 停用', `job_describe` varchar(1024) DEFAULT NULL COMMENT '任务功能描述', PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

5.2 数据操作与任务调度

操作数据表非常简单,SSM 增删改查。
但是在修改了表的数据之后,怎么让调度器知道呢?
调度器的接口:Scheduler
在我们的需求中,我们需要做的事情:
1、 新增一个任务
2、 删除一个任务
3、 启动、停止一个任务
4、 修改任务的信息(包括调度规律)

5.3 容器启动与 Service 注入

5.3.1 容器启动

因为任务没有定义在 ApplicationContext.xml 中,而是放到了数据库中,Spring Boot 启动时,怎么读取任务信息?
或者,怎么在 Spring 启动完成的时候做一些事情?
创建一个类,实现 CommandLineRunner 接口,实现 run 方法。
从表中查出状态是 1 的任务,然后构建。

5.3.2 Service 类注入到 Job 中

Spring Bean 如何注入到实现了 Job 接口的类中?
例如在 TestTask3 中,需要注入 ISysJobService,查询数据库发送邮件。
如果没有任何配置,注入会报空指针异常。
原因:
因为定时任务 Job 对象的实例化过程是在 Quartz 中进行的,而 Service Bean 是由Spring 容器管理的,Quartz 察觉不到 Service Bean 的存在,所以无法将 Service Bean装配到 Job 对象中。
分析:
Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其实现了 InitializingBean方法,在唯一的方法 afterPropertiesSet()在 Bean 的属性初始化后调用。调度器用 AdaptableJobFactory 对 Job 对象进行实例化。所以,如果我们可以把这个 JobFactory 指定为我们自定义的工厂的话,就可以在 Job 实例化完成之后,把 Job纳入到 Spring 容器中管理。
解决这个问题的步骤:
1、定义一个 AdaptableJobFactory,实现 JobFactory 接口,实现接口定义的newJob 方法,在这里面返回 Job 实例
分布式之任务调度学习二-LMLPHP
2、定义一个 MyJobFactory,继承 AdaptableJobFactory。
使用 Spring 的 AutowireCapableBeanFactory,把 Job 实例注入到容器中。

@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
	private AutowireCapableBeanFactory capableBeanFactory;
		protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
		Object jobInstance = super.createJobInstance(bundle);
		capableBeanFactory.autowireBean(jobInstance);
		return jobInstance;
	}
}

3、指定 Scheduler 的 JobFactory 为自定义的 JobFactory。

scheduler.setJobFactory(myJobFactory);

6 Quartz 集群部署

springboot-quartz 工程

6.1 为什么需要集群?

1、防止单点故障,减少对业务的影响
2、减少节点的压力,例如在 10 点要触发 1000 个任务,如果有 10 个节点,则每个节点之需要执行 100 个任务

6.2 集群需要解决的问题?

1、任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行相同的操作,引起数据混乱。比如跑批,绝对不能执行多次。
2、任务漏跑,假如任务是平均分配的,本来应该在某个节点上执行的任务,因为节点故障,一直没有得到执行。
3、水平集群需要注意时间同步问题
4、Quartz 使用的是随机的负载均衡算法,不能指定节点执行
所以必须要有一种共享数据或者通信的机制。在分布式系统的不同节点中,我们可以采用什么样的方式,实现数据共享?
两两通信,或者基于分布式的服务,实现数据共享。
例如:ZK、Redis、DB。
在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了。
同样的问题:建什么表?哪些字段?依旧使用系统自带的 11 张表

6.3 集群配置与验证

quartz.properties 配置。
四个配置:集群实例 ID、集群开关、数据库持久化、数据源信息
注意先清空 quartz 所有表、改端口、两个任务频率改成一样
验证 1:先后启动 2 个节点,任务是否重跑
验证 2:停掉一个节点,任务是否漏

7 Quartz 调度原理

问题:
1、Job 没有继承 Thread 和实现 Runnable,是怎么被调用的?通过反射还是什么?
2、任务是什么时候被调度的?是谁在监视任务还是监视 Trigger?
3、任务是怎么被调用的?谁执行了任务?
4、任务本身有状态吗?还是触发器有状态?
看源码的入口

Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

7.1 获取调度器实例

7.1.1 读取配置文件
public Scheduler getScheduler() throws SchedulerException {
	if (cfg == null) {
		// 读取 quartz.properties 配置文件
		initialize();
	}
	// 这个类是一个 HashMap,用来基于调度器的名称保证调度器的唯一性
	SchedulerRepository schedRep = SchedulerRepository.getInstance();
	Scheduler sched = schedRep.lookup(getSchedulerName());
	// 如果调度器已经存在了
	if (sched != null) {
		// 调度器关闭了,移除
		if (sched.isShutdown()) {
			schedRep.remove(getSchedulerName());
		} else {
			// 返回调度器
			return sched;
		}
	}
	// 调度器不存在,初始化
	sched = instantiate();
	return sched;
}

instantiate()方法中做了初始化的所有工作:

// 存储任务信息的 JobStore
JobStore js = null;
// 创建线程池,默认是 SimpleThreadPool
ThreadPool tp = null;
// 创建调度器
QuartzScheduler qs = null;
// 连接数据库的连接管理器
DBConnectionManager dbMgr = null;
// 自动生成 ID
// 创建线程执行器,默认为 DefaultThreadExecutor
ThreadExecutor threadExecutor;
01-04 11:50