• 有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

    所以该模型实现了生产者和消费者之间的解藕异步

    比如说你和你女朋友打电话,就得等她接了电话你们才能说话,这是同步。

    但是如果你跟她发微信,并不需要等她回复,她也不需要立刻回复,而是等她有空了再回,这就是异步。

    但是呢,生产者和消费者之间也不能完全没有联系的。

    所以它们之间还需要有协作,最经典的就是使用 Object 类里自带的 wait()notify() 或者 notifyAll() 的消息通知机制。

    上述描述中的等着,其实就是用 wait() 来实现的;

    通知,就是 notify() 或者 notifyAll()

    那么基于这种消息通知机制,我们还能够平衡生产者和消费者之间的速度差异

    如果生产者的生产速度很慢,但是消费者消费的很快,就像是我们每月工资就发两次,但是每天都要花钱,也就是 1:15.

    那么我们就需要调整生产者(发工资)为 15 个线程,消费者保持 1 个线程,这样是不是很爽~

    总结下该模型的三大优点:
    解藕,异步,平衡速度差异。

    wait()/notify()

    接下来我们需要重点看下这个通知机制。

    wait()notify() 都是 Java 中的 Object 类自带的方法,可以用来实现线程间的通信。

    上一节讲的 11 个 APIs 里我也提到了它,我们这里再展开讲一下。

    wait() 方法是用来让当前线程等待,直到有别的线程调用 notify() 将它唤醒,或者我们可以设定一个时间让它自动苏醒。

    调用该方法之前,线程必须要获得该对象的对象监视器锁,也就是只能用在加锁的方法下。

    而调用该方法之后,当前线程会释放锁。(提示:这里很重要,也是下文代码中用 while 而非 if 的原因。)

    notify() 方法只能通知一个线程,如果多个线程在等待,那就唤醒任意一个。

    notifyAll() 方法是可以唤醒所有等待线程,然后加入同步队列。

    这里我们用到了 2 个队列:

    这里需要注意,从等待状态线程无法直接进入 Q2,而是要先重新加入同步队列,再次等待拿锁,拿到了锁才能进去 Q2;一旦出了 Q2,锁就丢了。

    Q2 里,其实只有一个线程,因为这里我们必须要加锁才能进行操作。

    实现

    这里我首先建了一个简单的 Product 类,用来表示生产和消费的产品,大家可以自行添加更多的 fields

    public class Product  {
        private String name;

        public Product(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

    主函数里我设定了两类线程,并且这里选择用普通的 ArrayDeque 来实现 Queue,更简单的方式是直接用 Java 中的 BlockingQueue 来实现。

    BlockingQueue 是阻塞队列,它有一系列的方法可以让线程实现自动阻塞,常用的 BlockingQueue 有很多,后面会单独出一篇文章来讲。

    这里为了更好的理解并发协同的这个过程,我们先自己处理。

    public class Test {
        public static void main(String[] args) {
            Queue<Product> queue = new ArrayDeque<>();

            for (int i = 0; i < 100; i++) {
                new Thread(new Producer(queue, 100)).start();
                new Thread(new Consumer(queue, 100)).start();
            }
        }
    }

    然后就是 ProducerConsumer 了。

    public class Producer implements Runnable{
        private Queue<Product> queue;
        private int maxCapacity;

        public Producer(Queue queue, int maxCapacity) {
            this.queue = queue;
            this.maxCapacity = maxCapacity;
        }

        @Override
        public void run() {
            synchronized (queue) {
                while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
                    try {
                        System.out.println("生产者" + Thread.currentThread().getName() + "等待中... Queue 已达到最大容量,无法生产");
                        wait();
                        System.out.println("生产者" + Thread.currentThread().getName() + "退出等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (queue.size() == 0) { //队列里的产品从无到有,需要通知在等待的消费者
                    queue.notifyAll();
                }
                Random random = new Random();
                Integer i = random.nextInt();
                queue.offer(new Product("产品"  + i.toString()));
                System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + i.toString());
            }
        }
    }

    其实它的主逻辑很简单,我这里为了方便演示加了很多打印语句才显得有点复杂。

    我们把主要逻辑拎出来看:

     public void run() {
            synchronized (queue) {
                while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解释
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (queue.size() == 0) {
                    queue.notifyAll();
                }
                queue.offer(new Product("产品"  + i.toString()));
            }
        }
    }

    这里有 3 块内容,再对照这个过程来看:

    这里有个问题,为什么只能用 while 而不是 if

    其实在这一小段,生产者线程经历了几个过程:

    那么为什么可能又满了呢?

    因为线程没有一直拿着锁,在被唤醒之后,到拿到锁之间的这段时间里,有可能其他的生产者线程先拿到了锁进行了生产,所以队列又经历了一个从不满到满的过程。

    总结:在使用线程的等待通知机制时,一般都要在 while 循环中调用 wait() 方法。

    消费者线程是完全对称的,我们来看代码。

    public class Consumer implements Runnable{
        private Queue<Product> queue;
        private int maxCapacity;

        public Consumer(Queue queue, int maxCapacity) {
            this.queue = queue;
            this.maxCapacity = maxCapacity;
        }

        @Override
        public void run() {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    try {
                        System.out.println("消费者" + Thread.currentThread().getName() + "等待中... Queue 已缺货,无法消费");
                        wait();
                        System.out.println("消费者" + Thread.currentThread().getName() + "退出等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (queue.size() == maxCapacity) {
                    queue.notifyAll();
                }

                Product product = queue.poll();
                System.out.println("消费者" + Thread.currentThread().getName() + "消费了:" + product.getName());
            }
        }
    }

    结果如下:

    小结

    生产者 - 消费者问题是面试中经常会遇到的题目,本文首先讲了该模型的三大优点:解藕,异步,平衡速度差异,然后讲解了等待/通知的消息机制以及在该模型中的应用,最后进行了代码实现。

    文中所有代码已经放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE

    这个 Github 汇总了我所有的文章和资料,之后也会一直更新和维护,还希望大家帮忙点个 Star,你们的支持和认可,就是我创作的最大动力,我们下篇文章见!

    我是小齐,纽约程序媛,终生学习者,每天晚上 9 点,云自习室里不见不散!

    09-14 12:08