引言

使用Condition实现生产者-消费者模型,并与wait和notify实现的效果相对比。

wait/notify模拟生产者-消费者

面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法能够支持2个生产线程以及10个消费者线程的阻塞调用。

在《Effective Java》一书中提到:wait()方法()绝大多数情况下都是和while一起使用的。这是因为,当wait()执行完成后,会立刻释放当前锁,如果这时其他线程立刻获得锁并对变量进行操作,会出现数据不一致的情况,使用while可以确保数据不会出现不一致的情况。

public class ProducerConsumer1<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10;// 最多10个元素
    private int count = 0;

    public synchronized void put(T t) {
        while (lists.size() == this.MAX) {// 想想为什么用while而不是if
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        lists.add(t);
        count++;
        this.notifyAll();// 通知消费者线程进行消费
    }

    public synchronized T get() {
        T t = null;

        while (lists.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        t = lists.removeFirst();
        count--;
        this.notifyAll();// 通知生产者进行生产
        return t;
    }

    public static void main(String[] args) {
        ProducerConsumer1<String> c = new ProducerConsumer1<>();
        // 启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(c.get());
                }
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 25; j++) {
                    c.put(Thread.currentThread().getName() + " " + j);
                }
            }).start();
        }
    }
}

Condition模拟生产者-消费者

使用Lock和Condition来实现类似需求时,可以更加精确的指定哪些线程被唤醒,这比notifyAll()效率更高一些。

将上面的程序代码进行改写:

public class ProducerConsumer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10;
    private int count = 0;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public void put(T t) {
        try {
            lock.lock();
            while (lists.size() == MAX) {
                producer.await();
            }

            lists.add(t);

            ++count;
            consumer.signalAll(); // 通知消费者线程进行消费
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public T get() {
        T t = null;
        try {
            lock.lock();
            while (lists.size() == 0) {
                consumer.await();
            }

            t = lists.removeFirst();
            count--;
            producer.signalAll();// 通知生产者进行生产
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        ProducerConsumer2<String> c = new ProducerConsumer2<>();
        // 启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(c.get());
                }
            }, "c" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 25; j++) {
                    c.put(Thread.currentThread()
                                .getName()
                            + " " + j);
                }
            }).start();
        }
    }
}

鸣谢

马士兵老师高并发编程系列

10-04 21:34