Java并发编程实践

一、简介

1 并发编程的概念与特点

并发编程是指通过设计和实现多线程程序来利用计算机资源和提高程序执行效率的编程方法。它的主要特点是多线程之间的竞争和协作关系。

2 进程与线程基本概念

一个进程可以包含多个线程每个线程拥有自己的堆栈空间和程序计数器,共享该进程所拥有的内存和文件句柄等资源。线程是CPU调度的最小单位相对于进程而言线程的创建、切换和销毁的开销要小得多。

3 Java线程模型

Java采用“抢占式调度”方式,线程自主地让出CPU资源的机会很少,大部分情况下是由操作系统进行调度的。Java线程模型中可以通过实现Runnable接口或继承Thread类来创建线程。

4 线程状态的转换及线程调度模型

Java线程状态主要包括新建状态、就绪状态、运行状态、阻塞状态和死亡状态。线程状态的转换主要取决于线程的活动和外部干预。线程调度模型是由Java虚拟机来实现的,调度策略可能因操作系统和硬件环境而异。

二、线程安全性

1 线程安全性的定义和实现方式

线程安全性是指并发编程中,在多个线程访问共享数据时,对数据的访问不会产生意料之外的后果。实现线程安全性需要从数据的状态变化过程进行分析,并采用相应的线程安全策略。

2 synchronized实现线程安全性

synchronized关键字可以用来修饰方法或代码块,保证同一时间只有一个线程访问该方法或代码块。当一个线程访问synchronized修饰的方法或代码块时,该线程会尝试获取锁,其他线程只有等待该线程释放锁后才能继续执行。

public synchronized void setValue(int value) {
    // synchronized修饰的方法
    this.value = value;
}
public void increment() {
    // synchronized修饰的代码块
    synchronized(this) {
        this.count++;
    }
}

3 volatile修饰符实现线程安全性

volatile修饰符可以用来保证共享变量的可见性和禁止指令重排优化。volatile修饰的变量在每次读取时都会从主内存中获取最新值,当变量的值发生改变时,会立即将新值写回主内存中。

public class VolatileExample {
    private volatile int count = 0;
    
    public void increment() {
        this.count++;
    }
}

4 原子类实现线程安全性

Java并发包中提供了多个原子类以实现线程安全性,这些原子类操作都是基于原语CAS(Compare And Swap)实现的。在多线程并发环境下,CAS能够保证共享变量的原子性操作。

public class AtomicExample {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        this.count.incrementAndGet();
    }
}

三、Java并发包中的工具类

1 实现限流的Semaphore类

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 初始化Semaphore,允许两个线程同时执行
        Semaphore semaphore = new Semaphore(2);
        
        // 模拟同时有5个任务需要执行
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    // 申请执行许可
                    semaphore.acquire();
                    
                    // 执行任务
                    System.out.println(Thread.currentThread().getName() + "开始执行任务");
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "完成任务");
                    
                    // 释放许可
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

2 实现定时器的Timer类

import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

public class TimerDemo {
    public static void main(String[] args) {
        Timer timer = new Timer();

        // 延迟1秒后,每隔2秒执行一次任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("执行任务,时间:" + new Date());
            }
        }, 1000, 2000);
    }
}

3 实现任务执行的Executor框架

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorDemo {
    public static void main(String[] args) {
        // 创建固定线程池,最多同时执行5个任务
        ExecutorService executor = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "开始执行任务");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "完成任务");
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

4 实现多线程计算的Fork/Join框架

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo {
    public static void main(String[] args) {
        int[] array = new int[100_000_000];

        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1;
        }

        ForkJoinPool pool = new ForkJoinPool();

        long start = System.currentTimeMillis();
        int result = pool.invoke(new SumTask(array, 0, array.length - 1));
        long end = System.currentTimeMillis();

        System.out.println("结果:" + result);
        System.out.println("耗时:" + (end - start) + "ms");
    }
}

class SumTask extends RecursiveTask<Integer> {
    private int[] array;
    private int start;
    private int end;

    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start < 1000) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            SumTask leftTask = new SumTask(array, start, middle);
            SumTask rightTask = new SumTask(array, middle + 1, end);
            leftTask.fork();
            rightTask.fork();
            return leftTask.join() + rightTask.join();
        }
    }
}

四、 线程间通信

1 线程间通信的基本方式

  • 共享变量:多个线程同时操作同一份数据
  • 管道:通过输入/输出流进行线程通信

2 wait()和notify()/notifyAll()方法解决等待/通知机制

public class WaitNotifyDemo {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();

        new Thread(() -> {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + "开始等待");
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "等待结束");
            }
        }).start();

        Thread.sleep(1000);

        new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "开始唤醒");
                lock.notify();
                System.out.println(Thread.currentThread().getName() + "唤醒完成");
            }
        }).start();
    }
}

3 使用Lock对象的Condition类

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {
    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "开始等待");
                condition.await();
                System.out.println(Thread.currentThread().getName() + "等待结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
        }).start();

        Thread.sleep(1000);

        new Thread(() -> {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "开始唤醒");
            condition.signal();
            System.out.println(Thread.currentThread().getName() + "唤醒完成");
            lock.unlock();
        }).start();
    }
}

五、Java内存模型

1 Java内存模型的概念及作用

Java 内存模型(Java Memory Model, JMM)是一种抽象的概念,定义了 Java 虚拟机(JVM) 在执行多线程程序时,各个线程之间共享变量的访问规则。JMM 的主要目的是定义程序中各个变量值读写时的具体行为。通过定义 synchronized、volatile 等关键字来保证线程对共享变量的操作的有序性、可见性和原子性等特性,从而避免了多线程程序中出现的各种数据不一致等并发问题。

2 Java内存模型之间的关系

在 Java 内存模型中每一个线程都有自己的本地内存,在本地内存中保存了该线程所使用的共享变量的副本。当线程访问共享变量时,它首先从主内存中获取该共享变量的最新版本,然后将该变量的值复制到本地内存中,并对该值进行操作。最后线程将操作结果同步回主内存,从而保证其它线程可见该共享变量的值已经被修改。

补充说明:

  • 主内存:所有线程共享的内存区
  • 本地内存:每个线程私有的内存区,存储该线程所使用的共享变量的副本

3 Java内存模型实现线程的可见性

在多线程场景下线程间并不总是能感知到最新的共享变量值。原因是有可能在线程的本地内存中保存了过期的共享变量副本,而未能及时从主内存中获取该变量的最新值。这种情形属于线程间的可见性问题。

Java 内存模型定义了 volatile 关键字用来解决可见性问题。对一个变量添加volatile关键字,就是告诉 JVM在读取该变量时,必须从主内存中读取,而不是从本地内存中读取,保证了该变量的最新值能被及时的更新到所有线程的本地内存中。

六、同步器

1 同步器的重要性及分类

同步器在多线程编程中起着至关重要的作用,它可以保证多个线程之间的数据同步,防止数据 Corruption。另外同步器还能够控制例如:程序执行顺序、死锁等操作。

在Java中,同步器可以分为以下两类:

  • 内置同步器:synchronized、volatile
  • 并发包同步器:ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock 等

2 实现同步器的核心类AQS

在并发包中AbstractQueuedSynchronizer (AQS) 这个类是同步器的核心,它可以用来实现自定义的同步器。AQS 通过一个等待队列来管理阻塞线程,有两个内部静态类用来描述 结点 (Node) 及等待队列。

  • Node:描述了等待队列中节点的数据结构
  • WaitQueue:描述了等待队列的数据结构

AQS 还提供了一些主要方法:

  • getState():获取同步状态
  • setState(int newState):设置同步状态
  • compareAndSetState(int expect, int update):原子修改同步状态,可用于实现锁定与解锁操作
  • acquire(int arg):获取同步状态
  • acquireShared(int arg):获取共享同步状态
  • tryAcquire(int acquires):尝试获取同步状态,成功返回 true,失败返回 false
  • tryAcquireShared(int acquires):尝试获取共享同步状态,成功返回大于等于 0 的数,失败返回 -1
  • release(int releases):释放同步状态,唤醒等待队列中的阻塞线程
  • releaseShared(int releases):释放共享同步状态,唤醒等待队列中的阻塞线程

七、java并发编程中的常见问题及解决方案

1 状态同步问题及解决方案

Java的多线程并发编程中,一个共享资源被多个线程访问时可能会导致状态不同步的问题。比如两个线程对同一个变量进行累加操作,但由于时间上的先后顺序和线程调度的执行先后顺序的差异,最终的计算结果可能是不确定的。为了避免这种情况,需要采用同步机制来进行线程之间的协调

  • 解决方案:使用synchronized关键字或Lock接口等锁机制控制多个线程对共享资源的访问顺序。

示例代码:

public class SynchronizedTest {
    private int count = 0;
    private Object lock = new Object();
    public void addCount() {
        synchronized (lock) { // 对count进行加锁
            count++;
        }
    }
}

2 死锁及避免死锁的策略

多个线程竞争多个资源时,若某个线程获取了部分资源而另一个线程占用了另一部分资源,两个线程将无法继续进行下去,进入死锁状态。这是多线程并发编程中的一种常见问题。

  • 解决方案:避免死锁通常采用以下策略:
    • 避免使用嵌套锁:尽量使用一种锁机制来控制线程访问共享资源,避免嵌套使用不同的锁机制。
    • 加锁顺序:对共享资源进行访问时,应保证所有线程都按照相同的顺序获取锁,避免多个线程以不同的顺序获取锁导致死锁。
    • 超时机制:在获取锁时增加超时等待机制,若线程不能在指定时间内获取到锁,则释放已经获取到的锁,避免死锁。
    • 死锁检测:通过程序检测死锁状态并纠正,避免死锁。

示例代码:

public class DeadLockTest {
    private Object lock1 = new Object();
    private Object lock2 = new Object();
    public void method1() {
        synchronized (lock1) {
            synchronized (lock2) { // 锁的顺序和method2方法中的相反
                System.out.println("method1");
            }
        }
    }
    public void method2() {
        synchronized (lock2) {
            synchronized (lock1) { // 锁的顺序和method1方法中的相反
                System.out.println("method2");
            }
        }
    }
}

3 线程间协作问题及实现方案

Java多线程编程中,线程之间需要进行通讯和协作才能完成一些复杂的任务,如多个线程合作完成一个生产者-消费者模型。

  • 解决方案:使用wait(), notify()和notifyAll()等方法实现线程之间的协作。

示例代码:

public class ProducerConsumerTest {
    private Queue<String> queue = new LinkedList<>();
    private final int MAX_SIZE = 10;
    public void produce() throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == MAX_SIZE) { // 队列已满,则等待消费者取走数据
                queue.wait();
            }
            queue.offer("data"); // 向队列中添加数据
            queue.notifyAll(); // 唤醒所有正在等待队列的线程(消费者)
        }
    }
    public void consume() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) { // 队列为空,则等待生产者生成数据
                queue.wait();
            }
            queue.poll(); // 从队列中取出数据
            queue.notifyAll(); // 唤醒所有正在等待队列的线程(生产者)
        }
    }
}

4 并发性并行陷阱及解决方案

在写并发程序时常会遇到一些并发性和并行性的问题,比如线程异步执行,代码无法保证原子性,读取操作的结果不可预知等等。

  • 解决方案:以下是一些并发性、并行性陷阱的解决方案:
    • 原子性问题:使用Atomic包提供的原子变量或synchronized关键字等方式实现对原子性操作的控制。
    • 内存可见性问题:通过使用synchronized或volatile关键字等控制数据的写入和读取顺序来确保程序中不同线程之间的可见性。
    • 死锁问题:使用死锁避免策略,如避免使用嵌套锁、锁的顺序、超时机制等。
    • 竞态条件问题:通过使用synchronized、Lock接口或乐观锁等方式控制多个线程对共享资源的访问顺序,避免在竞争态势下导致计算结果错误的问题。

八、Java并发模型与并发设计

1 并发模型的分类

Java的并发模型主要分为两种:共享内存模型和消息传递模型。

  • 共享内存模型:所有线程共享同一个内存地址空间,实现线程间的通信和协作。Java的并发模型属于共享内存模型。
  • 消息传递模型:通过发送和接收消息来进行线程之间的通信和协作。

2 基于并发模型的并发设计

并发模型可以用来帮助设计并发程序中的各个阶段,其中最常见的是单生产者-多消费者和多生产者-单消费者。

  • 单生产者-多消费者:在单一的线程中生成产品,然后由多个线程处理这些产品。
  • 多生产者-单消费者:多个线程同时生成产品,然后由单一的线程来处理这些产品。

3 使用线程池进行并发编程设计

使用线程池可以有效减少线程创建和销毁的开销,并且可以根据任务变化动态调节线程数量,提高整个应用程序的性能。

示例代码:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10); // 创建线程池
        for (int i = 0; i < 100; i++) { // 执行100个任务
            pool.execute(() -> {
                // 执行具体任务
            });
        }
        pool.shutdown(); // 关闭线程池
    }
}
05-30 15:06