【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP

【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP

一、什么是定时器?

1、概念和应用

场景:在进行网络编程时候,经常会遇到网络连不上的情况,此时就可以使用定时器,到一段时间后就取消连接网络,达到及时"止损"的效果。
功能:指定一段时间后执行一段代码。

2、使用标准库的定时器

import java.util.Timer;
import java.util.TimerTask;

public class Main {
    public static void main(String[] args) {
        System.out.println("开始计时:");
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("任务执行");
            }
        },3000);
    }
}
  • 在我们上述代码中,timer是定义的一个定时器,定时器主要的方法就是安排任务执行(schedule方法)
    我们在schedule方法中传入两个参数:任务,时间段
    任务:在上述代码中,这个任务我们使用TimerTask来指定,其实这就是一个Runnable的任务,因为TimerTask是一个抽象类,实现了Runnable接口,只是把这个任务包装了一下,改了个名。创建线程时继承Runnable重写run使用匿名内部类时指定的任务是一样的。
    时间段:我们在TimerTask中写的就是任务的具体执行,当程序从计时开始,过了我们传入的这个时间段后,在TimerTask中的任务代码就会执行。
  • 此外,timer.schedule()可以多次执行,即可以指定多个任务。

二、自定义实现定时器

1、分析和写代码思考

我们想要写一个定时器,首先我们得明白,这个定时器具体执行哪些功能?
1> 一个定时器注册N个任务,N个任务会按照指定的顺序执行。
处理: 需要一个数据结构来存储这N个任务
2> 注册好的任务,到了指定时间时,要能够执行。
处理:可以使用一个线程来扫描数据结构,哪个任务到时间了,就执行

       那么问题来了,用什么数据结构?数组?链表?我们想一想,这个时间段是有大小的,时间段较小的会先执行,那此时就可以使用一个有序的数据结构存储,数组,链表可以,但是呢,如果中途插入数据,数组还得移动数据插入,链表呢得去查找该插入到哪个位置,都很麻烦,且效率较低。所以我们这里使用了一个优先级队列(堆),我们使用一个小堆,这样每次出队的元素就是时间段最小的任务,即最先执行的任务。除此之外,我们的这个堆要实现阻塞功能,因为当队列中任务满时,要阻塞入队的线程,队空时是相似的道理。

       这个计时器我们写到这个阻塞式优先级队列时,我们需要指定一个参数类型,按照上面的分析,我们知道这个参数是一个任务,因为在标准库中schedule方法提供了两个参数,而我们也需要两个参数,一个是描述任务的Runnable,一个是时间段,所以我们把这个任务又单独封装成一个类。
到了这里我们的代码如下:

class MyTask{
    // 描述任务
    private Runnable runnable;
    // 毫秒时间戳描述任务开始到执行的时间段
    private long time;
    public MyTask(Runnable runnable, Long time){
        this.runnable = runnable;
        this.time = time;
    }
}

class MyTimer{
    // 扫描线程
    private Thread t = null;
    // 阻塞的优先级队列存储任务
    BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
}

       封装了任务后,接下来,我们来实现schedule方法,就是往阻塞的优先级队列中添加任务。尤其要注意时间的换算。

	public void schedule(Runnable runnable, Long after) throws InterruptedException {
        // 注意:把时间换成当前时间点 + 时间段的形式 存入MyTask中,这个时间就是等会要执行的时间段
        MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
        queue.put(myTask);
    }

       上述操作完成后,我们就把任务包装成一个单独的类,然后提供了schedule方法,用来安排任务放入阻塞式优先级队列。那接下来我们要操作的,就是从队列中取元素,看看当前任务执行时间是否已经到了,也就是实现扫描线程。但是要注意:扫描线程写完后,一定要记得start()
       在扫描线程中,我们把它写在构造方法中,为了在对象创建时就启动线程,以便在后续使用时能够保证线程正常运行。如果不在构造方法中启动线程,在使用时需要手动调用start方法来启动线程,容易忘记或者出现线程启动时机不对等的问题,导致线程无法正常运行。
       我们可以取出阻塞式优先级队列队首元素时间属性和当前的时间比较,如果当前时间小,那就得把任务元素重新塞回优先级队列中(由于阻塞队列是带有阻塞功能的,所以只有put和take两个方法,而不提供拷贝队首元素但不出队的操作,即peek()操作)。把时间未到的任务塞回优先级队列后,队列重新插入元素,然后向上调整,最终该任务又回到队首的位置。而如果当前时间大于等于队首的任务的时间,那就执行任务。
扫描线程的代码如下:

	public MyTimer(){
        // 创建扫描线程并执行
        Thread t = new Thread(()->{
            while (true){
                // 取出队首元素,和当前时间比较
                // 如果队首元素的时间大于当前时间,就把任务塞回队列
                // 如果队首元素的时间等于当前时间,那就执行当前任务
                try {
                    MyTask task = queue.take();
                    long curTime = System.currentTimeMillis();
                    if(curTime < task.getTime()){
                        queue.put(task);
                    }else {
                        task.run();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

       从上面的分析中,我们也可知道,需要两个方法:取到任务的getTime方法,还有执行当前任务的方法:我们把这两个方法都放在MyTask类中,因为我们把任务的元素runable和time都封装成private了。

class MyTask{
    // 描述任务
    private Runnable runnable;
    // 毫秒时间戳描述任务开始到执行的时间段
    private long time;
    public MyTask(Runnable runnable, Long time){
        this.runnable = runnable;
        this.time = time;
    }
    // 获取任务时间
    public long getTime(){
        return this.time;
    }
    // 运行任务代码(点进runnable.run()这个方法发现是一个抽象方法,其实就是执行我们在schedule方法中往队列里放时描述的任务)
    public void run(){
        runnable.run();
    }
}

那接下来我们继续来看看扫描线程,因为上述代码中还存在问题!
问题1:优先级队列还没有指定比较的方式,因为我们传进队列的是MyTask,它本身没有大小,无法比较,所以我们需要让它可比较,第一种就是实现Comparable,重写compareTo,要注意的是泛型传参。第二种就是写一个比较器实现Comparator或者直接传入优先级队列并初始化比较器。(代码等会一起展示)

问题2:我们现在的任务元素,存在反复从队列出,又塞回队列的情况,这就造成"忙等",即占用CPU资源,但又没干什么事。见下图
【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP
那么怎么打破这种情况?
       我们可以让扫描线程wait,等到队首元素执行的时间到了,我们再把它唤醒,这样行吗?很显然,不行,因为如果中途我们要插入任务时,那样,此时线程任然在WAITING,中途插入的任务就被错过了。所以我们得考虑到中途插入的问题。怎么实现呢?
处理方式:取出队首元素后如果当前时间小于队首任务的执行时间,那就把任务塞回队列后wait(这里的wait使用指定最大时间的方法),等schedule中有任务插入队列时,再使用notify唤醒。这样就可以保证两点:1》如果中途不插入,扫描线程按照最大时间等待,那就刚好到队首元素要执行时。2》如果中途有任务插入队列,那就会在插入任务后将扫描线程唤醒,扫描当前的队首元素,再进行判断。
修改后代码实现如下:
【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP

你以为到这里就结束了吗?接下来还有一个非常隐蔽的问题!!!
       下图中位置时,或者说还没执行wait,但是CPU从扫描线程切走了,然后进行了schedule的任务入队操作。
【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP

那么,因为扫描线程中,线程没有wait,所以此时CPU切到schedule入队任务时,通知操作this.notify()是空打一炮的情况,不会有什么唤醒。
那么CPU切回到扫描线程时,线程才会进入wait操作,然后就有可能错过刚插入的任务。那么我们想一个极端但很可能会出现的情况:在扫描线程中,如果刚开始取出队首任务执行时间是12:00,if判断那里curTime是10:00,而新插入的任务的时间是11:00。那因为入队列任务时notify空打了一炮,就会错过新加入的任务。
【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP

       因此,我们要确保消除这种情况。怎么消除呢?我们好好想想,就会发现,上面的情况产生的万恶之源,其实是扫面线程执行take后,CPU被切走,就是说,这两个操作不是原子的,所以导致这样的结构。所以保证原子性,就是加锁,让它变成原子的操作。具体操作可以把this.wait()外面的锁范围扩大。
【Java EE】-多线程编程(七) 多线程案例之定时器-LMLPHP
完成上述所有操作后即可得到完整的代码,如下:

2、正确完整代码

  • 最后还要注意一点:this对象是否改变和线程的写法有关。如果扫描线程使用lambda表达式,this始终是同一个,那可以对this加锁;但是如果使用匿名内部类的方式,打印this发现,this不一样,不能synchronized(this),而是应该给一个成员变量privaite Object locker = new Object();对这个对象加锁,确保加锁对象是同一个。
    下面的代码就是使用lambda表达式。
    代码如下:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

// 任务类
class MyTask implements Comparable<MyTask>{
    // 描述任务
    private Runnable runnable;
    // 毫秒时间戳描述任务开始到执行的时间段
    private long time;

    public MyTask(Runnable runnable, Long time){
        this.runnable = runnable;
        this.time = time;
    }

    // 获取任务时间
    public long getTime(){
        return this.time;
    }
    // 运行任务代码(点进runnable.run()这个方法发现是一个抽象方法,其实就是执行我们在schedule方法中往队列里放时描述的任务)
    public void run(){
        runnable.run();
    }

    @Override
    public int compareTo(MyTask o) {
        return (int) (this.time - o.time);
    }
}

// 自定义定时器
class MyTimer{
    // 扫描线程
    private Thread t = null;
    // 阻塞的优先级队列存储任务
    BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();


    public MyTimer(){
        // 真正的创建扫描线程并执行
        t = new Thread(()->{  //lambda表达式
            while (true){
                // 注意:取出队首元素,和当前时间比较,如果队首元素的时间大于当前时间,就把任务塞回队列;如果队首元素的时间等于当前时间,那就执行当前任务
                try {
                    synchronized (this) {
                        MyTask task = queue.take();
                        long curTime = System.currentTimeMillis();
                        if (curTime < task.getTime()) {
                            queue.put(task);
                            // 在put之后进行wait
                            this.wait(task.getTime() - curTime);
                        } else {
                            task.run();
                        }
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 注意:扫描线程写完后,一定要记得start()
        t.start();
    }

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        // 注意:把时间换成当前时间点 + 时间段的形式 存入MyTask中,这个时间就是等会要执行的时间段
        MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
        queue.put(myTask);
        synchronized (this){
            this.notify();
        }
    }
}

// Main测试类
public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("开始计时:");
        MyTimer timer = new MyTimer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("任务执行1");

            }
        },3000);
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("任务执行2");

            }
        },2000);
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("任务执行3");

            }
        },1000);
    }
}

结果:

开始计时:
任务执行3
任务执行2
任务执行1
// 进程未结束,因为扫描线程还在持续运行,它是前台线程,阻止进程结束
04-08 03:28