1. AQS(AbstractQueuedSynchronizer)框架简介

AQS是java.util.concurrent.locks包中的一个抽象类,是实现同步器(如锁和其他多线程同步工具)的一个框架。Doug Lea设计了这个框架,旨在让开发者通过使用其提供的方法来构建线程之间的同步器,而不是直接处理线程本身。

1.1 AQS概念与重要性

AQS利用一个int成员变量表示同步状态,并通过一个FIFO线程等待队列来管理多个线程之间的协调工作,这些线程可能会竞争对同步状态的访问。AQS定义了一系列保护共享资源访问的方法,可以实现独占访问(Exclusive)和共享访问(Shared)两种模式。
AQS非常重要,因为它简化了并发编程的难度,使得开发者能够通过使用简单的回调方法(如tryAcquire、tryRelease等)来实现复杂的并发控制逻辑。许多并发工具类,如ReentrantLock, Semaphores, CountDownLatches和CyclicBarriers等,都是基于AQS构建的。

1.2 AQS在并发控制中的作用

AQS提供内部支持,用于实现大范围的同步器。而同步器的功能归结于:在多线程访问某个资源时,能够确保任一时刻,资源被单线程访问(排他性),或者被多线程按照特定的方式访问(如某些线程获取资源后必须释放才能让其他线程继续获取)。AQS通过其状态变量和等待队列,管理和调度线程,从而完成这一任务。
接下来,我们将讨论由AQS派生的几个主要同步器,并通过代码示例详细说明它们的使用和工作机制。

2. CountDownLatch:并发协调工具

2.1 CountDownLatch简介

CountDownLatch是一种同步工具类,它允许一个或多个线程等待其他线程完成一组操作之后再继续执行。CountDownLatch通过一个计数器实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减一。当计数器值降至零时,意味着所有的线程都完成了任务,等待的线程就可以恢复执行任务。

2.2 CountDownLatch的工作机制

CountDownLatch的工作机制非常简单。创建时需指定一个整数,在CountDownLatch上调用await()方法的线程会被阻塞,直到其他线程调用countDown()方法足够次数,计数器减至零,所有等待的线程才会被释放,并继续执行。

2.3 使用场景与案例

CountDownLatch的一个常见使用场景是控制并发测试,在多个线程准备好之前阻塞测试的主线程,直至所有线程都报告说它们准备完毕,主线程才继续执行。
另一个使用场景是等待服务的一部分初始化。例如,应用程序的启动过程可能会发起多个服务,而主服务需要等待所有其他服务都完成初始化之后才能启动。

2.4 代码示例与解析

import java.util.concurrent.CountDownLatch;

public class ServiceStarter {
    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(3);
        
        // 启动第一个服务
        new Thread(() -> {
            System.out.println("Starting service 1");
            // 模拟服务启动耗时
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 1 started");
            latch.countDown();
        }).start();

        // 启动第二个服务
        new Thread(() -> {
            System.out.println("Starting service 2");
            // 模拟服务启动耗时
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 2 started");
            latch.countDown();
        }).start();

        // 启动第三个服务
        new Thread(() -> {
            System.out.println("Starting service 3");
            // 模拟服务启动耗时
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 3 started");
            latch.countDown();
        }).start();

        // 主线程等待其他服务启动
        System.out.println("Main thread waiting for services to start.");
        latch.await();
        System.out.println("All services started. Main thread continuing execution.");
    }
}

在这个例子中,主线程创建了一个初始计数为3的CountDownLatch,并启动了三个服务线程。每个服务在启动过程结束后调用countDown(),主线程在调用await()后被阻塞,直到三个服务都已启动。

3. Semaphore:信号量控制

3.1 Semaphore简介

Semaphore,也称为信号量,是用于控制同时访问某个特定资源的操作数量的同步工具,它管理一系列许可(permits)。Semaphore通常用于资源池,如数据库连接池,其中有限数量的资源需要被多个线程共享。
Semaphore的核心方法包括acquire()用于获取许可,如果无可用许可则阻塞线程,以及release()用于释放许可,增加可用许可数量。

3.2 Semaphore的工作原理

Semaphore内部维护了一定数量的许可。线程通过acquire()方法请求许可,如果Semaphore内的许可数大于0,则允许该线程执行且内部可用许可数减一;如果许可数为0,则阻塞该线程直至有许可可用。当线程使用完资源后,通过调用release()来归还许可,许可数增一。

3.3 使用场景与案例

Semaphore主要用于限流,如网络连接数、数据库连接数的限制等。例如,在数据库连接池中可能会使用Semaphore来限制池中的连接数。
另一个使用场景是实现某些资源的公平访问。在多线程环境中,线程可能需要以有序的方式访问资源,利用Semaphore可以保证没有单一线程会连续无限制地占有资源。

3.4 代码示例与解析

import java.util.concurrent.Semaphore;

public class ResourceAccessControl {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(10); // 假设只允许10个线程同时访问资源

        for (int i = 0; i < 100; i++) {
            final int threadNumber = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 请求资源
                    System.out.println("Thread " + threadNumber + " is accessing the resource.");
                    // 模拟资源访问时长
                    Thread.sleep((long)(Math.random() * 1000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放资源
                }
            }).start();
        }
    }
}

在这个例子中,我们创建了一个带有10个许可的Semaphore,然后启动了100个线程。这些线程在执行资源访问之前尝试获取许可,如果获取成功,则模拟资源访问完成后释放许可,如果许可不足则线程会等待,直到有许可变为可用。

4. CyclicBarrier:循环屏障

4.1 CyclicBarrier简介

CyclicBarrier,字面意思为循环屏障,是一种同步工具,它允许一组线程相互等待,达到一个公共屏障点(Common Barrier Point)后再继续执行。CyclicBarrier在完成一组线程间的相互等待后,可以重置再次使用,这是其“循环”(Cyclic)的由来。

4.2 CyclicBarrier的工作方式

CyclicBarrier的构造函数接受一个整型数值,指定等待的线程数量,当指定数量的线程都执行了await()方法后,这些线程会被释放并允许继续执行。CyclicBarrier也支持一个可选的Runnable命令,在屏障被触发时执行,常用于屏障点后的一些集体操作。

4.3 CyclicBarrier与CountDownLatch的对比

CyclicBarrier与CountDownLatch都用于线程间的协调,但主要区别在于CyclicBarrier可重用,而CountDownLatch不能。CountDownLatch是一次性的,计数值达到零便无法再次使用,而CyclicBarrier在等待的线程释放后可以重置计数,以便下一轮的使用。

4.4 使用场景与案例

CyclicBarrier适用于这样的场景:多个线程必须同时到达预设屏障点才能继续执行,例如,多个玩家必须都准备好后才能开始游戏。
它也常用于多阶段计算的场景中,计算过程被分为多个步骤,而每个步骤的执行依赖于前一个步骤的完成,即分阶段的并行计算。

4.5 代码示例与解析

import java.util.concurrent.CyclicBarrier;

public class ParallelTask {
    // 创建一个新的CyclicBarrier,当4个参与者线程到达时将触发Runnable
    private static final CyclicBarrier barrier = new CyclicBarrier(4, () ->
        System.out.println("All threads reached the barrier point, let's continue!")
    );

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " reached the barrier.");
                    barrier.await(); // 等待其他线程
                    // 以下代码将在所有线程都到达屏障后执行
                    System.out.println(Thread.currentThread().getName() + " crossed the barrier.");
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

在这个例子中,我们设置了一个CyclicBarrier,等待四个线程。每个线程在执行到barrier.await()时将会等待,直到所有四个线程都到达了屏障点。达到屏障点之后,首先执行Barrier的Runnable命令,然后所有线程将同时继续执行。

05-03 08:56