Skip to content

并发编程

字数: 0 字 时长: 0 分钟

第 1 章 概念解释

1.1 进程与线程

  1. 进程
  • 程序由指令和数据组成,但这些指令要运行、数据要读写,就必须将指令加载至 CPU、数据加载至内存,在指令运行过程中还需要用到磁盘、网络等设备,进程就是用来加载指令、管理内存、管理 IO 的
  • 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程
  • 进程就可以视为程序的一个实例,大部分程序可以同时运行多个实例进程(比如记事本、浏览器等),也有的程序只能启动一个实例进程(例如微信、网易云音乐等)
  1. 线程
  • 一个进程之内可以分为一到多个线程
  • 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
  • Java 中,线程作为最小的调度单位,进程作为资源分配的最小单位,在 Windows 中进程是不活动的,只是作为线程的容器
  1. 二者对比
  • 进程基本上是相互独立的,而线程存在于进程内,是进程的一个子集
  • 进程拥有共享的资源,如内存空间等,供其内部的线程共享
  • 进程间通信较为复杂
    • 同一台计算机的进程通信称为 IPC
    • 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
  • 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
  • 线程更轻量,线程上下文切换成本一般要比进程上下文切换低

1.2 并发与并行

  1. 并发

单核 CPU 下,线程实际还是串行执行的,操作系统中有一个组件叫做任务调度器,将 CPU 的时间片(Windows 下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 CPU 在线程间时间片很短,切换的非常快,人类感觉是同时运行的,一般会将这种线程轮流使用 CPU 的做法称为并发

2. 并行

多核 CPU 下,每个核都可以调度运行线程,这时候线程可以是并行的

  1. 二者对比
  • 并发是同一时间应对多件事情的能力
  • 并行是同一时间动手做多件事情的能力

第 2 章 线程

2.1 创建和运行线程

  1. 直接使用 Thread
java
//创建线程对象
Thread t = new Thread() {
	public void run() {
		//要执行的任务
		
	}
};

//启动线程
t.start();

例如:

java
//构造方法的参数是给线程指定名字
Thread t1 = new Thread("t1") {
	@Override
	//run 方法内实现了要执行的任务
	public void run() {
		log.debug("hello");
	}
};
t1.start();

结果:

  1. 使用 Runnable 配合 Thread

把线程和任务(要执行的代码)分开:

  • Thread 代表线程
  • Runnable 代表可运行的任务(线程要执行的代码)
java
Runnable runnable = new Runnable() {
	public void run() {
		//要执行的任务
		
	}
};
//创建线程对象
Thread t = new Thread(runnable);
//启动线程
t.start();

例如:

java
//创建任务对象
Runnable task2 = new Runnable() {
	@Override
	public void run() {
		log.debug("hello");
	}
};

//参数 1 是任务对象,参数 2 是线程名字
Thread t2 = new Thread(task2, "t2");
t2.start();

Java 8 以后可以使用 Lambda 精简代码

java
//创建任务对象
Runnable task2 = () -> log.debug("hello");

//参数 1 是任务对象,参数 2 是线程名字
Thread t2 = new Thread(task2, "t2");
t2.start();
  1. FutureTask 配置 Thread

FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况

java
//创建任务对象
FutureTask<Integer> task3 = new FutureTask<>(() -> {
	log.debug("hello");
	return 100;
});

//参数 1 是任务对象,参数 2 是线程名字
new Thread(task3, "t3").start();

//主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();
log.debug("结果是: {}", result);

结果:

2.2 线程运行原理

2.2.1 栈与栈帧

JVM 中由堆、栈、方法区组成,其中栈内存是给线程用的,每个线程启动后,虚拟机就会为其分配一块栈内存

  • 每个栈由多个栈帧组成,对应每次方法调用时所占用的内存
  • 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法

2.2.2 线程上下文切换

因为以下一些原因导致 CPU 不再执行当前的线程,转而执行另一个线程的代码:

  • 线程的 CPU 时间片用完
  • 垃圾回收
  • 有更高优先级的线程需要运行
  • 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法

当线程发生上下文切换时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应概念就是程序计数器,它的作用是记住下一条 JVM 指令的执行地址,是线程私有的

  • 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
  • 线程上下文切换频繁发生会影响性能

2.3 线程常用方法

2.3.1 start() 方法:

  • 启动一个新线程,在新的线程运行 run 方法中的代码
  • start 方法只是让线程进入就绪,里面的代码不一定立刻运行(因为 CPU 的时间片还没分给它),每个线程对象的 start 方法只能调用一次,如果调用了多次会出现异常

2.3.2 run() 方法:

  • 新线程启动后会调用的方法
  • 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作,但可以创建 Thread 的子类对象来覆盖默认行为

2.3.3 join() 方法:

等待线程运行结束

有下面这段代码的执行,打印 r 是什么?

java
static int r = 0;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
    }

    private static void test1() throws InterruptedException {
        log.debug("开始");
        Thread t1 = new Thread(() -> {
            log.debug("开始");
            try {
                sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.debug("结束");
            r = 10;
        });
        t1.start();
        log.debug("r 结果为: {}", r);
        log.debug("结束");
    }

结果 r 为 0

分析:

  • 因为主线程和线程 t1 是并行执行的,t1 线程需要 1 秒后才能算出 r = 10
  • 而主线程一开始就要打印 r 的结果,所以只能打印出 r = 0

解决方法:

  • 用 sleep 让主线程先休眠,然后等待 t1 线程算完 r = 10?这样行不行?不行,因为 t1 线程在多少时间后才能执行完是不确定的,那么主线程的休眠时间就是无法确定的
  • 可以使用 join 方法,让主线程等待 t1 线程执行完后再执行
java
static int r = 0;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
    }

    private static void test1() throws InterruptedException {
        log.debug("开始");
        Thread t1 = new Thread(() -> {
            log.debug("开始");
            try {
                sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.debug("结束");
            r = 10;
        });
        t1.start();
        //让主线程等待 t1 执行完后再执行之后的代码
        t1.join();
        log.debug("r 结果为: {}", r);
        log.debug("结束");
    }

结果:

2.3.3.1 join() 方法详解
2.3.3.1.1 join() 方法的核心作用

join() 方法的核心作用就是让当前执行线程(通常是主线程)等待调用了 join() 方法的线程执行完毕后再继续执行

简单来说,就是插队或同步,它提供了一种简单的线程间同步机制,确保一个线程必须等待另一个线程完成其任务

2.3.3.1.2 为什么需要 join()

想象一个场景:你(主线程)启动了三个工人(子线程 T1, T2, T3)去分别完成三项准备工作(比如泡茶、准备点心、布置桌椅),然后你自己才能开始派对(主线程继续后续工作)。

如果没有 join(),会发生什么? 你一声令下“开始干活!”(thread.start()),三个工人立刻跑去干活了,但你不会等他们,你会立刻宣布“派对开始!”。这时,茶还没泡好,点心也没准备,派对显然无法正常进行。

join() 的作用就是让你(主线程)等待。 你可以对每个工人说:“你(T1),干完活来我这里报到一下(t1.join()),等你报到了我再继续。” 这样,你就能确保所有准备工作都完成后,再开始派对。

2.3.3.1.3 如何使用 join()

join() 方法来自 Thread 类,它有几个重载版本:

  • join()
    • 最常用,无限期等待,直到目标线程执行完毕
  • join(long millis)
    • 等待目标线程执行完毕,但最多等待 millis 毫秒
  • join(long millis, int nanos)
    • 等待目标线程执行完毕,但最多等待 millis 毫秒 + nanos 纳秒

代码示例:

java
public class JoinExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建并启动第一个线程
        Thread t1 = new Thread(() -> {
            try {
                Thread.sleep(2000); // 模拟耗时任务,睡眠2秒
                System.out.println("线程1执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 创建并启动第二个线程
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(3000); // 模拟耗时任务,睡眠3秒
                System.out.println("线程2执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long start = System.currentTimeMillis();

        t1.start(); // 启动线程1
        t2.start(); // 启动线程2

        System.out.println("主线程等待子线程执行完毕...");

        // 主线程在此等待,直到t1和t2都执行完成
        t1.join(); // 主线程阻塞,等待t1结束
        t2.join(); // 主线程阻塞,等待t2结束

        long end = System.currentTimeMillis();

        System.out.println("所有子线程都已执行完毕!");
        System.out.println("总共耗时: " + (end - start) + "ms"); // 输出大约是3000ms,因为t1和t2是并行执行的
    }
}

输出结果:

关键点:

  • t1 和 t2 是同时启动、并行执行的。总耗时由较慢的线程 t2(3秒)决定,而不是 2+3=5 秒。
  • 主线程执行到 t1.join() 时会阻塞,不再继续向下执行。
  • 只有当 t1 执行完毕(死亡),t1.join() 才会返回,主线程才能继续执行 t2.join()。
  • 同样,主线程在 t2.join() 处阻塞,直到 t2 死亡。
  • 最后,主线程打印最终信息。
2.3.3.1.4 join() 的底层原理

join() 本质上是基于 Object 类的 wait() 和 notify() 机制实现的

可以理解为,在 Thread 类的实现中:

  • 当一个线程 t 启动后,系统会将其视为一个监视器对象(Monitor)
  • 当主线程调用 t.join() 时,主线程会获取线程 t 的对象锁,然后调用 t.wait(),这意味着主线程会释放 CPU 资源,并在 t 对象的等待集中等待
  • 当线程 t 的 run() 方法执行完毕,JVM 底层会自动调用 t.notifyAll() 方法来唤醒所有在 t 对象上等待的线程(也就是调用了 t.join() 的主线程)
  • 主线程被唤醒后,从 wait() 方法返回,join() 方法也就执行完毕了

简化版源码:

java
public final synchronized void join(long millis) throws InterruptedException {
    ...
    while (isAlive()) { // 如果目标线程还活着
        wait(0); // 就让当前线程(调用join的线程)无限等待
    }
    ...
}

注意这个方法是 synchronized 的,它锁住的就是当前线程实例,[[到底是锁 t 还是主线程?]]

2.3.3.1.5 注意事项
  1. 异常处理

join 方法会抛出 InterruptedException,这意味着在等待过程中,如果等待的线程被其它线程中断(调用 interrupt()),join 方法会立刻抛出异常并返回,这是一种优雅的取消等待机制,必须处理这个受检异常

  1. join() 与 synchronized 的区别

synchronized 关键字用于保护共享资源,实现互斥访问,防止数据竞争

join 方法用于协调线程执行的顺序,实现同步,一个线程等待另一个线程结束,它们的目的是完全不同的

2.3.4 interrupt() 方法

2.3.4.1 概念

每个 Java 线程内部都有一个布尔类型的标志位,用于表示其中断状态

  • false(默认值):表示线程未被中断,或者更准确的说,没有中断请求
  • true:表示线程已被中断,即有其它线程向该线程发出了中断请求

这个状态本身并不会强制停止一个正在运行的线程,它只是一个友好的信号或请求,需要被中断的线程自己来检查并决定如何响应

2.3.4.2 相关方法
  1. interrupt()
  • 这是设置中断状态的方法
  • 当一个线程调用 A.interrupt() 时,它的作用是将线程 A 的中断状态设置为 true,即打上中断标记
  • 此外,如果线程 A 正因调用 Object.wait()、Thread.join()、Thread.sleep() 等方法而处于阻塞状态,那么它会立即抛出 InterruptedException,并且在抛出异常后,中断状态会被立即清除(重置为 false)
  1. isInterrupted()
  • 这是检查中断状态的方法
  • 它只检查当前线程的中断状态是 true 还是 false,检查后不会改变中断状态的原值
  • 通常用于线程在自己的业务逻辑中主动检查是否有中断请求
  1. interrupted()
  • 这也是检查中断状态的方法,但它是静态的,且有副作用
  • 它检查当前正在执行的线程的中断状态
  • 关键区别在于:调用它之后,它会清除中断状态,即如果当前线程的中断状态为 true,调用 interrupted() 会返回 true,但同时会将中断状态重置为 false
2.3.4.3 true 和 false 代表的场景
  1. 中断状态为 true 时代表:
  • 协作式中断请求已发出:另一个线程通过调用 yourThread.interrupt() 方法,请求该线程中断其当前操作。
  • 线程应该准备结束:这是一种通信机制,告诉目标线程“请你现在停下来”。但具体是否停止、何时停止、如何清理,由目标线程的代码决定。
  • 可能影响某些阻塞的 I/O 操作:如果线程阻塞在 InterruptibleChannel 的 I/O 操作上,通道会被关闭,线程会收到 ClosedByInterruptException,并且中断状态会被设置。
  1. 中断状态为 false 时代表:
  • 初始状态:线程刚被创建,尚未收到任何中断请求。
  • 请求已被处理:线程已经通过 catch (InterruptedException e) 或调用 Thread.interrupted() 的方式检测到了中断,并且已经清除了状态(这是处理中断的常规操作)。
  • 从未被中断:线程一直在正常运行,没有收到任何中断信号。
2.3.4.4 场景
  1. 如果被打断线程正在 sleep、wait、join 等
  • 行为:会导致被打断的线程立即抛出 InterruptedException 异常,并且中断状态会被清除(重置为 false)

  • 为什么这样设计?

    • 当线程在 sleep, wait, join 时,它处于一种非运行的阻塞状态。它自己无法主动去检查中断标志位。为了让它能立即响应中断请求,Java 选择了最直接的方式:让这些阻塞方法抛出一个异常,从而立刻中断阻塞状态,将执行权交还给 catch 块。
    • 抛出异常后,中断状态被清除,这是一种“状态复位”。意味着:“中断信号已经以异常的形式送达并处理了,现在将这个标志位清零”。

代码示例:

java
private static void test1() throws InterruptedException {
	Thread t1 = new Thread(() -> {
		sleep(1);
	}, "t1");
	t1.start();

	sleep(0.5);
	t1.interrupt();
	log.debug("打断状态: {}", t1.isInterrupted());
}

  1. 如果打断的正在运行的线程
  • 行为:则会设置中断标记(将中断状态设置为 true)。

  • 为什么这样设计?

    • 一个正在正常运行的线程,拥有CPU时间片,它可以主动地、在合适的时候去检查自己的中断状态。所以,interrupt() 方法不需要做任何额外操作,只需默默设置好标志位即可。如何处理这个信号,完全由目标线程的代码决定。
  • 比喻:

    • 你(目标线程)正在专心写代码(运行状态)。我(另一个线程)想让你停下来,于是给你发了一条微信“该开会了”(调用 interrupt() 设置标志位)。你看到了消息(isInterrupted() 返回 true),但你可以选择立即保存代码去开会,也可以选择把这行代码写完再去。

代码示例:

java
private static void test2() throws InterruptedException {
	Thread t2 = new Thread(() -> {
		while (true) {
			Thread current = Thread.currentThread();
			boolean interrupted = current.isInterrupted();
			if (interrupted) {
				log.debug("打断状态: {}", interrupted);
				break;
			}
		}
	}, "t2");

	t2.start();
	sleep(0.5);
	t2.interrupt();
}

  1. park 的线程被打断
  • 行为:也会设置打断标记。
  • LockSupport.park() 是另一种让线程阻塞的方法(synchronized 和 ReentrantLock 的底层会用到它)。
  • 它的行为比较特殊:
    • 如果线程因调用 park() 而被阻塞,此时调用它的 interrupt() 方法,它会立即被唤醒(解除阻塞),并且,它的中断状态会被设置为 true。
    • 重要特性:LockSupport.park() 方法不会抛出 InterruptedException!它只会默默地解除阻塞。
  • 这意味着什么?下一次该线程再调用 LockSupport.park() 时,会因为中断状态已经是 true 而立刻失效,不会再次阻塞。这可以防止线程刚被中断唤醒,又不小心立刻再次 park 自己。

代码示例:

java
private static void test3() throws InterruptedException {
	Thread t1 = new Thread(() -> {
		log.debug("park...");
		LockSupport.park();
		log.debug("unpark...");
		log.debug("打断状态: {}", Thread,currentThread().isInterrupted());
	}, "t1");
	t1.start();

	sleep(0.5);
	t1.interrupt();
}

如果打断标记已经是 true,则 park 会失效

java
private static void test4() {
	Thread t1 = new Thread(() -> {
		for (int i = 0; i < 5; i++) {
			log.debug("park...");
			LockSupport.park();
			log.debug("打断状态: {}", Thread.currentThread().isInterrupted());
		}
	});
	t1.start();

	sleep(1);
	t1.interrupt();
}

2.3.5 sleep(long n) 方法

  1. 调用 sleep(long n) 方法让当前执行的线程休眠 n 毫秒,休眠时让出 CPU 时间片给其它线程
  2. 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
  3. 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出异常
  4. 睡眠结束后的线程未必会立刻得到执行
  5. 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性

2.3.6 yield() 方法

  1. 提示线程调度器让出当前线程对 CPU 的使用,主要是为了测试和调试
  2. 调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
  3. 具体的实现依赖于操作系统的任务调度器

2.4 线程基本使用

2.4.1 变同步为异步

以调用方角度来讲,如果:

  • 需要等待结果返回才能继续运行就是同步
  • 不需要等待结果返回就能继续运行就是异步

多线程可以让方法执行变为异步的(即不需要一直等着),比如读取磁盘文件时,假设读取操作花费了 5 秒钟,如果没有线程调度机制,这 5 秒 CPU 什么都做不了,其它代码都得暂停

比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程

2.4.2 提高效率

充分利用多核 CPU 的优势,提高运行效率,有下面的场景,执行 3 个计算,最后将计算结果汇总

  • 如果是串行执行,那么总共花费的时间是:10+11+9+1 = 31ms
  • 如果是四核 CPU,各个核心分别使用线程 1 执行计算 1,线程 2 执行计算 2,线程 3 执行计算 3,那么 3 个线程是并行的,花费时间只取决于最长的那个线程运行的时间,即 11ms,最后加上汇总时间只会花费 12ms
  • 注意需要在多核 CPU 下才能提高效率,单核仍然是轮流执行

2.4.3 结论

  • 单核 CPU 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用 CPU,不至于一个线程总占用 CPU,别的线程没法干活
  • 多核 CPU 可以并行跑多个线程,但能否提高程序运行效率还是要分情况:
    • 有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率,但不是所有计算任务都能拆分
    • 也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
  • IO 操作不占用 CPU,只是我们一般拷贝文件使用的是阻塞 IO,这时相当于线程虽然不用 CPU,但需要一直等待 IO 结束,没能充分利用线程,所以才有后面的非阻塞 IO 和异步 IO 优化

2.5 模式之两阶段终止

在一个线程 T1 中如何优雅终止线程 T2,这里的优雅指的是给 T2 一个料理后事的机会

使用 Interrupt:

java
class TPTInterrupt {
	private Thread thread;
	
	public void start() {
		thread = new Thread(() -> {
			while(true) {
				Thread current = Thread.currentThread();
				if (current.isInterrupted()) {
					log.debug("料理后事");
					break;
				}
				try {
					Thread.sleep(1000);
					log.debug("将结果保存");
				} catch(InterruptedException e) {
					current.interrupt(); //这里重新将 current 的打断标记设置为 true 是为了考虑到线程休眠时被打断的情况,因为当此线程正在休眠时,Interrupt 的状态就是 true,当外部调用 interrupt 方法想要打断休眠时的线程时会将 Interrupt 的状态改为 false 并抛出异常,这时就可以在这里被捕获,然后再将状态改为 true 就可以进入料理后事的代码,然后线程料理后事后就 break
				}
				//执行监控操作
			}
		}, "监控线程");
		thread.start();
	}
	
	public void stop() {
		thread.interrupt();
	}
}

调用:

TPTInterrupt t = new TPTInterrupt();
t.start();
Thread.sleep(3500);
log.debug("stop");
t.stop();

结果:

2.6 用户线程与守护线程

守护线程是一种特殊的线程,它的作用是为其它线程提供服务,Java 中的线程分为两类,一种是守护线程,另外一种是用户线程,JVM 启动时会调用 main 方法,main 方法所在的线程就是一个用户线程,在 JVM 内部,同时还启动了很多守护线程,比如垃圾回收线程

守护线程和用户线程的区别是当最后一个非守护线程结束时,JVM 会正常退出,不管当前是否存在守护线程,也就是说守护线程是否结束并不影响 JVM 退出,换而言之,只要有一个用户线程还没结束,正常情况下 JVM 就不会退出

2.7 线程 5 种状态

2.7.1 从操作系统层面

  • 初始状态:仅是在语言层面创建了线程对象,还未与操作系统线程关联
  • 可运行状态(就绪状态):指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
  • 运行状态:指获取了 CPU 时间片运行中的状态
    • 当 CPU 时间片用完,会从运行状态转换至可运行状态,会导致线程的上下文切换
  • 阻塞状态:
    • 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入阻塞状态
    • 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至可运行状态
    • 与可运行状态的区别是,对阻塞状态的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
  • 终止状态:表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

2.7.2 从 Java API 层面

根据 Thread.State 枚举,分为六种状态:

  • new:线程刚被创建,但是还没有调用 start() 方法
  • Runnable:当调用了 start() 方法之后,注意,Java API 层面的 Runnable 状态涵盖了操作系统层面的可运行状态、运行状态和阻塞状态(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行的)
  • Blocked、Waiting、Timed_Waiting 都是 Java API 层面对阻塞状态的细分
  • Terminated:当线程代码运行结束

第 3 章 管程

3.1 什么是管程

3.1.1 基本理解

管程(Monitor),直译为监视器,是一种用于实现多线程同步的机制,它的核心思想是将共享资源极其对资源的操作(即临界区)封装起来,并提供一个互斥的、安全的访问接口

可以把它想象成一个带锁的房间:

  • 房间(Monitor):里面包含了一些共享的数据和操作这些数据的方法。
  • 锁(Lock):房间每次只允许一个线程进入。线程进入前必须拿到这把锁。
  • 等待队列(Wait Set):如果一个进入房间的线程发现某些条件不满足(例如,队列已空,无法取数据),它可以暂时释放锁并进入这个等待区,让其他线程进来工作。
  • 入口队列(Entry Set):多个想进入房间的线程在门口排队。

管程的目标是让开发者无需关心复杂的底层同步细节(如信号量操作),只需关注业务逻辑,就能写出线程安全的代码。

结论:管程就是通过锁(Lock)和条件变量(Condition)来管理对共享资源的访问和线程间协作的一套机制

  • 可以简单的认为 synchronized + wait()/notify() 就是一套完整的管程
  • 而 ReentrantLock + Condition 是一套功能更丰富、更灵活的管程

3.1.2 Java 中管程的实现

Java 语言在设计之初就内置了管程的概念,主要通过两种方式实现:

  • synchronized 关键字:这是最直观、最常用的管程实现
  • java.util.concurrent.locks.Lock 接口及其实现(如 ReentrantLock):这是一个更灵活、功能更强大的管程实现
3.1.2.1 基于 synchronized 的管程

在 Java 中,每一个对象都自带一个内置锁(Intrinsic Lock)和一个内置条件队列(Intrinsic Condition Queue),当你使用 synchronized 修饰一个方法或代码块时,你就在使用这个内置的管程

  • 锁:就是对象的内置锁(也叫监视器锁)
  • 入口队列(EntrySet):所有竞争这把锁的线程都在这里排队
  • 等待队列(Wait Set):通过 Object.wait()Object.notify()Object.notifyAll() 方法操作的条件队列

示例代码:

java
public class SynchronizedMonitorExample {
    private final Object lock = new Object(); // 任何一个对象都可以作为管程的锁
    private int sharedData = 0;

    // 进入此方法即表示进入了管程(获得了lock对象的锁)
    public void performAction() {
        synchronized (lock) { // 尝试进入“房间”,获取锁
            // 临界区开始
            while (/* 条件不满足 */) {
                try {
                    lock.wait(); // 条件不满足,释放锁,当前线程进入lock的等待队列
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            // 操作共享资源
            sharedData++;
            // 操作完成,通知其他等待线程(比如通知等待队列中的线程条件可能已改变)
            lock.notifyAll(); // 或 lock.notify();
        } // 临界区结束,自动释放锁
    }
}
3.1.2.2 基于 ReentrantLock 的管程

java.util.concurrent.locks 包提供了更强大的管程实现,ReentrantLock 作为锁,Condition 对象作为条件队列,提供了比 synchronized 更精细的控制能力

  • 灵活性:一个锁可以关联多个 Condition 条件队列,而 synchronized 只有一个,这在生产者-消费者模型中非常有用,可以分别让生产者和消费者在不同的条件上等待
  • 可中断、可超时、非阻塞的获取锁能力
  • 有公平锁选项

代码示例:生产者-消费者模型

java
import java.util.concurrent.locks.*;

public class ExplicitMonitorExample {
    private final Lock lock = new ReentrantLock();
    // 为锁创建两个条件队列:”非满“和”非空“
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new LinkedList<>();
    private int capacity = 10;

    public void produce(int value) throws InterruptedException {
        lock.lock(); // 显式获取锁,进入管程
        try {
            while (queue.size() == capacity) {
                // 队列已满,生产者在notFull条件上等待
                notFull.await();
            }
            queue.add(value);
            // 生产了一个,队列肯定非空了,唤醒一个在notEmpty条件上等待的消费者
            notEmpty.signal();
        } finally {
            lock.unlock(); // 必须在finally块中显式释放锁
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                // 队列已空,消费者在notEmpty条件上等待
                notEmpty.await();
            }
            int value = queue.poll();
            // 消费了一个,队列肯定没满,唤醒一个在notFull条件上等待的生产者
            notFull.signal();
            return value;
        } finally {
            lock.unlock();
        }
    }
}

3.1.3 MESA 管程模型

Java 采用的管程模型主要是 MESA 模型,理解这个模型有助于理解 wait() 和 notify() 的行为

  • 核心特点:当某个条件变量被通知(signal 或 notify)时,正在等待的线程并不会立即执行,它只是从等待队列移到了入口队列,它需要重新竞争锁,成功获取锁之后才能从当初 wait() 的地方继续执行
  • 重要编程范式:正因为被唤醒后不会立即执行,条件检查必须放在 while 循环中,而不是 if 语句中
java
// 正确做法 (MESA模型)
while (conditionIsNotMet) {
    lock.wait();
}

// 错误做法 (可能导致虚假唤醒或条件再次变化)
if (conditionIsNotMet) {
    lock.wait();
}

这是因为从被通知到重新获取锁的这段时间里,条件可能又被其它线程改变了,while 循环会进行重新检查,确保条件真正满足后才继续执行

3.2 同步 vs 异步和互斥 vs 同步

3.2.1 同步 vs 异步

这组概念关注的是任务执行的模式和消息通信的机制,描述的是调用者是否等待结果。

同步 (Synchronous)

  • 含义:调用者发起一个调用后,必须等待这个调用执行完毕并返回结果后,才能继续执行后续代码。任务执行是顺序的、阻塞的。
  • 比喻:就像你去奶茶店点单,你必须在柜台前等着,直到店员做完奶茶递给你,你才能离开去做下一件事。

代码示例:

java
// 同步方法调用
int result = calculateExpensiveValue(); // 调用这个方法后,线程就在这里阻塞等待
System.out.println("Result: " + result); // 必须等上一行执行完才能执行

异步 (Asynchronous)

  • 含义:调用者发起一个调用后,无需等待这个调用执行完毕,可以立刻继续执行后续代码。被调用的任务通常在另一个线程中执行,执行完成后通过回调、通知等方式告知调用者。
  • 比喻:就像你在餐厅点餐,点完餐后服务员给你一个号牌,你就可以回到座位上玩手机(执行后续任务),等餐做好了(任务完成),会通过广播叫号(回调通知)让你去取。

代码示例(使用 CompletableFuture):

java
// 异步调用
CompletableFuture.supplyAsync(() -> calculateExpensiveValue())
               .thenAccept(result -> System.out.println("Result: " + result)); // 回调函数

System.out.println("I'm free to do other things!"); // 这行代码会立刻执行,不会阻塞

3.2.2 互斥 vs 同步

这组概念关注的是多线程环境下对共享资源访问的协调与控制,是解决并发问题的核心手段。这里的同步和第一组的含义完全不同。

互斥 (Mutex - Mutual Exclusion)

  • 含义:保证同一时间只有一个线程可以访问共享资源。它是一种规则,一种约束,目的是解决“数据竞争”问题。它不关心线程之间的执行顺序,只关心“独占访问”。
  • 核心:“你我不能同时用”。
  • 实现工具:锁(Lock),例如 synchronized 关键字、ReentrantLock、Mutex(在其他语言中常见)。
  • 比喻:厕所的隔间。一个人(线程)进去后会把门锁上(获取锁),其他人(线程)必须在外面等待(阻塞),直到里面的人出来(释放锁),下一个人才能进去。
  • 要解决的问题:避免多个线程同时写、或同时读写一个变量导致的数据错乱。

同步 (Synchronization)

  • 含义:在互斥的基础上,协调线程执行的先后顺序。它比互斥的要求更高,不仅要求不能同时做,还要求按照某种特定的顺序来做
  • 核心:“你做完,通知我一下,我再来做”。
  • 实现工具:等待/通知机制(Wait/Notify),条件变量(Condition),信号量(Semaphore),计数器(CountDownLatch)等。
  • 比喻:餐厅厨师和服务员的协作。厨师(线程A)做好的菜(共享资源)必须放在出餐台(缓冲区)。服务员(线程B)不能直接冲进厨房拿菜(互斥),必须等待厨师叫一声“菜好了”(通知),才能过来取走(执行后续操作)。这个过程就是同步。
  • 要解决的问题:解决“线程协作”问题。例如经典的生产者-消费者模型。

互斥同步下的生产者-消费者模型

java
import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumerExample {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int CAPACITY = 5;
    private final Object lock = new Object(); // 用于互斥的锁

    // 生产者方法
    public void produce(int value) throws InterruptedException {
        synchronized (lock) { // 1. 获取锁(实现互斥,防止和消费者同时访问队列)
            while (queue.size() == CAPACITY) {
                // 2. 条件不满足:队列满了,生产者等待(实现同步:等待消费者消费)
                lock.wait(); // 释放锁并等待
            }
            queue.add(value); // 3. 生产数据
            System.out.println("Produced: " + value);
            lock.notifyAll(); // 4. 通知可能正在等待的消费者(实现同步:有数据了,可以来消费了)
        } // 5. 自动释放锁
    }

    // 消费者方法
    public void consume() throws InterruptedException {
        synchronized (lock) { // 获取互斥锁
            while (queue.isEmpty()) {
                lock.wait(); // 条件不满足:队列空了,同步等待生产者
            }
            int value = queue.poll(); // 消费数据
            System.out.println("Consumed: " + value);
            lock.notifyAll(); // 通知可能正在等待的生产者
        }
    }
}

这个例子中:

  • synchronized 关键字实现了互斥,保证了生产者和消费者不会同时修改队列
  • wait() 和 notifyAll() 方法实现了同步,协调了队列满时生产者等消费者和队列空时消费者等生产者的执行顺序

3.3 共享带来的问题

有个故事:

  • 老王(操作系统)有一个功能强大的算盘(CPU),现在想把它租出去,赚一点外快

  • 小南、小女(线程)来使用这个算盘来进行一些计算,并按照时间给老王支付费用
  • 但小南不能一天 24 小时使用算盘,他经常要小憩一会(sleep),又或是去吃饭上厕所(阻塞 IO 操作),有时还需要一根烟,没烟时思路全无(wait)这些情况统称为阻塞

  • 在这些时候,算盘没利用起来(不能收钱了),老王觉得有点不划算
  • 另外,小女也想用用算盘,如果总是小南占着算盘,让小女觉得不公平
  • 于是,老王灵机一动,想了个办法:让他们每人用一会,轮流使用算盘
  • 这样,当小南阻塞的时候,算盘可以分给小女使用,不会浪费,反之亦然
  • 最近执行的计算比较复杂,需要存储一些中间结果,而学生们的脑容量(工作内存)不够,所以老王申请了一个笔记本(主存),把一些中间结果先记在本上
  • 计算流程是这样的

  • 但是由于分时系统,有一天还是发生了事故
  • 小南刚读取了初始值 0 做了个 +1 运算,还没来得及写回结果
  • 老王说:小南你的时间到了,该别人了,记住结果走吧;于是小南念叨着:结果是 1,结果是 1;不甘心的到一边待着去了(上下文切换)
  • 老王说:小女,该你了;小女看到了笔记本上还写着 0,做了一个 -1 运算,将结果 -1 写入笔记本
  • 这时小女的时间也用完了,老王又叫醒了小南:小南,把你上次的题目算完吧;小南将他脑海中的结果 1 写入了笔记本

  • 小南和小女都觉得自己没做错,但笔记本里的结果是 1 而不是 0

故事结束

在 Java 中,两个线程对初始值为 0 的静态变量一个做自增,一个做自减,各做 5000 次,结果是 0 吗?

java
static int counter = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter++;
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter--;
            }
        }, "t2");

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        log.debug("{}", counter);
    }

输出结果可能是正数、负数、零,为什么?因为 Java 中对静态变量的自增、自减并不是原子操作:

例如对于 i++ 而言(i 为静态变量),实际会产生如下的 JVM 字节码指令:

而对应 i-- 也是类似:

而 Java 的内存模型如下,完成静态变量的自增、自减需要在主存和工作内存中进行数据交换:

如果是单线程执行以上代码是顺序执行(不会交错)没有问题:

但在多线程下代码可能会交错运行:

3.3.1 临界区

  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源
    • 多个线程读共享资源其实没有问题
    • 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
  • 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区

例如下面代码中的临界区:

  • 竞态条件:多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

3.4 使用 Synchronized 解决问题

为了避免临界区的竞态条件发生,有多种手段可以达到目的:

  • 阻塞式的解决方案:synchronized、Lock
  • 非阻塞式的解决方案:原子变量

这里先使用阻塞式的解决方案 synchronized 来解决问题,即俗称的对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个对象锁时就会阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换

语法:

java
synchronized(对象) {
	//临界区
}

解决自增和自减问题:

java
static int counter = 0;  
static final Object room = new Object();  
  
public static void main(String[] args) throws InterruptedException {  
    Thread t1 = new Thread(() -> {  
        for (int i = 0; i < 5000; i++) {  
            synchronized (room) {  
                counter++;  
            }  
        }  
    }, "t1");  
  
    Thread t2 = new Thread(() -> {  
        for (int i = 0; i < 5000; i++) {  
            synchronized (room) {  
                counter--;  
            }  
        }  
    }, "t2");  
  
    t1.start();  
    t2.start();  
    t1.join();  
    t2.join();  
    log.debug("{}", counter);  
}

类比:

  • synchronized(对象) 中的对象,可以想象为一个房间(room),有唯一入口(门),房间只能一次进入一人进行计算,线程 t1、t2 想象成两个人
  • 当线程 t1 执行到 synchronized(room) 时就好比 t1 进入了这个房间,并锁住了门拿走了钥匙,在门内执行 count++ 代码
  • 这时候如果 t2 也运行到了 synchronized(room) 时,它发现门被锁住了,只能在门外等待,发生了上下文切换,阻塞住了
  • 这中间即使 t1 的 CPU 时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去),这时门还是锁住的,t1 仍拿着钥匙,t2 线程还在阻塞状态进不来,只有下次轮到 t1 自己再次获得时间片时才能开门进入
  • 当 t1 执行完 synchronized{} 块内的代码,这时候才会从 obj 房间出来并解开门上的锁,唤醒 t2 线程把钥匙给他,t2 线程这时才可以进入 obj 房间,锁着了门拿上钥匙,执行它的 count-- 代码

面向对象改进:

把需要保护的共享变量放入一个类

java
class Room {
    int value = 0;

    public void increment() {
        synchronized (this) {
            value++;
        }
    }

    public void decrement() {
        synchronized (this) {
            value--;
        }
    }

    public int get() {
        synchronized (this) {
            return value;
        }
    }
}

@Slf4j
public class Test1 {
    
    public static void main(String[] args) throws InterruptedException {
        Room room = new Room();
        Thread t1 = new Thread(() -> {
            for (int j = 0; j < 5000; j++) {
                room.increment();
            }
        }, "t1");
        
        Thread t2 = new Thread(() -> {
            for (int j = 0; j < 5000; j++) {
                room.decrement();
            }
        }, "t2");
        
        t1.start();
        t2.start();
        
        t1.join();
        t2.join();
        log.debug("count: {}", room.get());
    }
}

3.4.1 线程八锁

方法上的 synchronized:

java
class Test {
	public synchronized void test() {
	
	}
}

等价于:

class Test {
	public void test() {
		synchronized(this) {
		
		}
	}
}
java
class Test {
	public synchronized static void test() {
	
	}
}

等价于

class Test {
	public static void test() {
		synchronized(Test.class) {
		
		}
	}
}

考察 synchronized 锁住的是哪个对象:

情况 1:12 或 21

分析:这里 synchronized 锁的是普通方法,相当于锁的是 this 对象,这里是 new 了一个 n1 对象,然后同时调用 n1.a() 和 n1.b() 因为 this 对象表示谁调用这个方法,this 对象就是谁,因为都是 n1 对象调用方法,所以 this 就是 n1,所以就是锁的 n1 对象,所以两者拿的是同一把锁,那么就看谁先抢到锁了

情况 2:1s 后 12 或 2 1s 后 1

分析:这里锁的对象的分析和情况 1 相同

情况 3:3 1s 后 12 或 23 1s 1 或 32 1s 1

分析:这里调用 a 方法和 b 方法是同一把锁(和情况 1 一致),这里调用 c 方法时是没有锁的,所以这里是 a 方法和 b 方法去抢同一把锁,然后 c 没有锁,不用抢锁

情况 4:2 1s 后 1

分析:这里 synchronized 锁的是 this,所以要看哪个对象调用了方法,这里是 n1 调用 a 方法,所以 a 方法的锁锁的是 n1 对象;这里是 n2 调用 b 方法,所以 b 方法的锁锁的是 n2 对象。两者是不同的锁,所以不存在抢锁

情况 5:2 1s 后 1

分析:这里 a 方法是静态方法,所以锁的是 this.class;而 b 方法是普通方法,所以锁的是 this。这里不是同一把锁,所以不需要抢锁

情况 6:1s 后 12,或 2 1s 后 1

分析:这里都是静态方法,所以锁的都是 this.class,因为 n1 属于这个类,所以是同一把锁,所以要抢锁,就看谁先抢到了

情况 7:2 1s 后 1

分析:这里 a 方法是 this.class 锁,b 方法是 this 锁,所以不是同一把锁,所以不用抢锁

情况 8:1s 后 12,或 2 1s 后 1

分析:这里 a 方法和 b 方法都是静态方法,所以锁的是 this.class,所以锁的是 Number 这个类,n1、n2 都是这个类,所以是同一把锁,就要去抢锁

3.5 变量的线程安全分析

成员变量和静态变量是否线程安全?

  • 如果它们没有被共享,则线程安全
  • 如果它们被共享了,根据它们的状态是否能够改变,又分为两种情况:
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全

局部变量是否线程安全?

  • 局部变量是线程安全的
  • 但局部变量引用的对象则未必:
    • 如果该对象没有逃离方法的作用范围,它是线程安全的
    • 如果该对象逃离方法的作用范围,需要考虑线程安全
  1. 成员变量线程安全分析:
java
class ThreadUnsafe {
	ArrayList<String> list = new ArrayList<>();
	public void method1(int loopNumber) {
		for (int i = 0; i < loopNumber; i++) {
			//临界区,会产生竞态条件
			method2();
			
			method3();
		}
	}
	
	private void method2() {
		list.add("1");
	}
	
	private void method3() {
		list.remove(0);
	}
}

执行:

java
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args) {
	ThreadUnsafe test = new ThreadUnsafe();
	for (int i = 0; i < THREAD_NUMBER; i++) {
		new Thread(() -> {
			test.method1(LOOP_NUMBER);
		}, "Thread" + i).start();
	}
}

报错:

分析:无论哪个线程中的 method2 引用的都是同一个对象中的 list 成员变量,method3 与 method2 分析相同

将 list 修改为局部变量:

java
class ThreadUnsafe {
	public void method1(int loopNumber) {
		ArrayList<String> list = new ArrayList<>();
		for (int i = 0; i < loopNumber; i++) {
			//临界区,会产生竞态条件
			method2(list);
			
			method3(list);
		}
	}
	
	private void method2(ArrayList<String> list) {
		list.add("1");
	}
	
	private void method3(ArrayList<String> list) {
		list.remove(0);
	}
}

这样就不会出现问题:

分析:

  • list 是局部变量,每个线程调用时会创建其不同的实例,没有共享
  • 而 method2 的参数是从 method1 中传递过来的,与 method1 中引用同一个对象
  • method3 的参数分析与 method2 相同

  1. 局部变量线程安全分析:
java
public static void test1() {
	int i = 10;
	i++;
}

每个线程调用 test1() 方法时局部变量 i 会在每个线程的栈帧内存中被创建多份,因此不存在共享

3.6 常见的线程安全类

String、Integer、StringBuffer、Random、Vector、HashTable、JUC 包下的类都是线程安全的类

这里说它们是线程安全的是指多个线程调用它们同一个实例的某个方法时,是线程安全的,也可以理解为:

  • 它们的每个方法是原子的
  • 但注意它们多个方法的组合不是原子的

组合:

分析下面的代码是否是线程安全的?

不是线程安全的:

不可变类线程安全性分析:

  • String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的
  • 那么 String 有 replace、substring 等方法可以改变值,那么这些方法如何保证线程安全的呢?
    • 因为它们都是在原来的值上做修改,所以可以保证线程安全

3.7 Monitor

3.7.1 对象头

以 32 位虚拟机为例:

普通对象:

数组对象:

其中 Mark Word 的结构为:

64 位虚拟机的 Mark Word 的结构为:

3.7.2 Monitor 原理

每个 Java 对象都可以关联一个 Monitor 对象,如果使用 Synchronized 给对象上锁(重量级锁)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针

Monitor 结构如下:

  • 刚开始 Monitor 中的 Owner 为 null
  • 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor 中只能有一个 Owner
  • 在 Thread-2 上锁的过程中,如果 Thread-3、Thread-4、Thread-5 也来执行 synchronized(obj),就会进入 EntryList 被 Blocked
  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争是非公平的
  • 图中 WaitSet 中的 Thread-0、Thread-1 是之前获得过锁,但条件不满足进入 Waiting 状态的线程,后面学习 wait-notify 时会分析
  • synchronized 必须是进入同一个对象的 Monitor 才有上述的效果
  • 不加 synchronized 的对象不会关联监视器,不遵从以上规则

3.7.3 对象头与 Monitor

每个 Java 对象都可以关联一个 Monitor 对象,锁的信息就存放在 Java 对象头中

对象头包含两类信息:

  • Mark Word:存储对象自身的运行时数据,如哈希码、GC 分代年龄、锁状态标志、持有锁的线程 ID、偏向时间戳等
  • Klass Pointer:指向对象的元数据(它的类)的指针

3.7.4 synchronized 原理

浅析synchronized锁升级的原理与实现 - 小新成长之路 - 博客园

深入解析Java中synchronized:从原理到锁升级及历史演进-腾讯云开发者社区-腾讯云

【全网最细系列】synchronized锁详解,偏向锁与锁膨胀全流程-阿里云开发者社区

对应的字节码为:

注意方法级别的 synchronized 不会在字节码指令中有所体现

3.7.3.1 synchronized 锁流程分析
  1. 轻量级锁

轻量级锁的使用场景:如果一个对象虽然有多线程要加锁,但加锁时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化

轻量级锁对使用者是透明的,即语法仍然是 synchronized

假设有两个方法同步块,利用同一个对象加锁:

java
static final Object obj = new Object();
public static void method1() {
	synchronized(obj) {
		//同步块 A
		method2();
	}
}
public static void method2() {
	synchronized(obj) {
		//同步块 B
	}
}

流程分析:

  • 当有一个线程进来时,创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定的对象的 Mark Word

  • 让锁记录中的 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

  • 如果 cas 替换成功,对象头中存储了锁记录地址和状态 00(00 表示轻量级锁,01 表示无锁),表示由该线程给对象加锁:

  • 如果 cas 失败,有两种情况:
    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是自己执行又执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数,重入锁就是指一个线程已经获取了该对象的锁,但是又要获取该对象的锁,那么就叫做重入锁

  • 当退出 synchronized 代码块(解锁时),如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一

  • 当退出 synchronized 代码块(解锁时)的锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头
    • 解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
  1. 锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁

  • 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁,所以这里 Thread-1 会加锁失败

  • 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
    • 即为 Object 对象申请 Monitor 锁(重量级锁),让 Object 指向重量级锁地址
    • 然后自己进入 Monitor 的 EntryList Blocked
    • 这时 Object 的 Moniter 地址指向 Monitor 锁并改为 10(重量级锁)

  • 当 Thread-0 退出同步块解锁时,使用 CAS 将 Mark Word 的值恢复给对象头时失败,这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中的 Blocked 的线程
  1. 自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞

自旋的好处:自旋会使竞争的线程开始自旋,不会立马进入阻塞状态,这样就减少了上下文切换

自旋重试成功的情况:

自旋重试失败的情况:

  • 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势
  • 在 Java6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋
  • Java7 之后不能控制是否开启自旋功能
  1. 偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作

Java6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS,以后只要不发生竞争,这个对象就归该线程所有

例如:

java
static final Object obj = new Object();
public static void m1() {
	synchronized(obj) {
		m2();
	}
}

public static void m2() {
	synchronized(obj) {
		m3();
	}
}

public static void m3() {
	synchronized(obj) {
		
	}
}

以下情况偏向锁会被撤销:

  1. 调用对象 hashCode

调用了对象的 hashCode,但偏向锁的对象 MarkWord 中存储的是线程 id,没有 hashCode,如果调用 hashCode 会导致偏向锁被撤销

  • 轻量级锁会在锁记录中记录 hashCode
  • 重量级锁也会在 Monitor 中记录 hashCode
  1. 其它线程使用对象

当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁

  1. 调用 wait/notify

因为 wait/notify 是重量级锁里有的,所以要调用 wait/notify 会将偏向锁升级为重量级锁

  1. 批量重偏向

如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID

当撤销偏向锁阈值超过 20 次后,JVM 会觉得是不是偏向错了,于是会再给这些对象加锁时重新偏向至加锁线程

  1. 批量撤销

当撤销偏向锁阈值超过 40 次后,JVM 会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

synchronized 四种锁状态:

  1. 无锁状态,对象未被锁定,Mark Word 存储对象的哈希码等信息

  2. 偏向锁,当线程第一次获取锁时,会进入偏向模式,Mark Word 会记录线程 ID,后续同一线程再次获取锁时,可以直接进入 synchronized 加锁的代码,无需额外加锁

  3. 轻量级锁,当多个线程在不同时段获取同一把锁,即不存在锁竞争的情况时,JVM 会采用轻量级锁来避免线程阻塞

  4. 重量级锁,如果自旋超过一定的次数,或者一个线程持有锁,一个自旋,又有第三个线程进入 synchronized 加锁的代码时,轻量级锁就会升级为重量级锁

3.7.3.2 总结锁升级过程
  1. 从无锁到偏向锁 当一个线程首次访问同步代码块时,如果此对象处于无锁状态且偏向锁未被禁用,JVM 会将该对象头的锁标记改为偏向锁状态,并记录当前线程 ID。此时,对象头中的 Mark Word 中存储了持有偏向锁的线程 ID 如果另一个线程尝试获取这个已被偏向的锁,JVM 会检查当前持有偏向锁的线程是否活跃,如果持有偏向锁的线程不活跃,可以将锁偏向给新的线程;否则撤销偏向锁,升级为轻量级锁

  2. 偏向锁到轻量级锁 进行偏向锁撤销时,会遍历堆栈的所有锁记录,暂停拥有偏向锁的线程,并检查锁对象,如果这个过程中发现有其它线程试图获取这个锁,JVM 会撤销偏向锁,并将锁升级为轻量级锁 当有两个或以上线程竞争同一个偏向锁时,偏向锁模式不再有效,此时偏向锁会被撤销,对象的锁状态会升级为轻量级锁

  3. 轻量级锁到重量级锁 轻量级锁通过自旋来等待锁释放,如果自旋超过预定次数(自旋次数是可调的,并且是自适应的,失败次数多自旋次数就少),表明锁竞争激烈 当自旋多次失败,或者有线程在等待队列中等待相同的轻量级锁时,轻量级锁会升级为重量级锁,在这种情况下,JVM 会在操作系统层面创建一个互斥锁 Mutex,所有进一步尝试获取该锁的线程将会被阻塞,直到锁被释放

3.8 wait notify

为什么需要 wait?

  • 由于条件不满足,小南不能继续进行计算
  • 但小南如果一直占用着锁,其它人就得一直阻塞,效率太低

  • 于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,其它人可以由老王随机安排进屋
  • 直到小 M 将烟送来,大叫一声你的烟到了(调用 notify 方法)

  • 小南于是可以离开休息室,重新进入竞争锁的队列

3.8.1 wait notify 原理

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 Waiting 状态
  • Blocked 和 Waiting 的线程都处于阻塞状态,不占用 CPU 时间片
  • Blocked 线程会在 Owner 线程释放锁时唤醒
  • Waiting 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入 EntryList 重新竞争

3.8.2 API 介绍

  • obj.wait():让进入 object 监视器的线程到 waitSet 等待
  • obj.notify():在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll():让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法,必须获得此对象的锁,才能调用这几个方法

wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程有机会获取对象的锁。无限制等待,直到 notify 为止

3.8.3 wait notify 的使用

先看 sleep(long n) 和 wait(long n) 的区别:

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同时不会释放对象锁,但 wait 在等待的时候会释放对象锁
  • 它们的状态都是 Timed_Waiting

wait notify 的使用:

先看使用 sleep 会有什么问题:

todo 模式之保护性暂停 todo 模式之生产者消费者

3.9 park unpark

park 和 unpark 是 LockSupport 类中的方法:

java
//暂停当前线程
LockSupport.park();

//恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark:

java
Thread t1 = new Thread(() -> {
	log.debug("start...");
	sleep(1);
	log.debug("park...");
	LockSupport.park();
	log.debug("resume...");
}, "t1");
t1.start();

sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

先 unpark 再 park

java
Thread t1 = new Thread(() -> {
	log.debug("start...");
	sleep(2);
	log.debug("park...");
	LockSupport.park();
	log.debug("resume...");
}, "t1");
t1.start();

sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

3.9.1 特点

与 Object 的 wait 和 notify 相比:

  • wait、notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park、unpark 不需要
  • park、unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不是很精确
  • park、unpark 可以先 unpark,而 wait、notify 不能先 notify

3.9.2 原理

每个线程都有自己的一个 Parker 对象,由三部分组成:_counter_cond_mutex

可以这样理解:

  • 线程就像一个旅人,Parker 就像它随身携带的背包,条件变量就好比背包中的帐篷,_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
  • 调用 park 就是要看需不需要停下来休息
    • 如果备用干粮耗尽,那么钻进帐篷休息
    • 如果备用干粮充足,那么不需要休息,继续前进
  • 调用 unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让它继续前进
    • 如果这时线程还在运行,那么下次它调用 park 时,仅是消耗掉备用干粮,不需要停留而是继续前进
      • 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮

3.10 使用多把锁

使用多把不相干的锁:

一间大屋子有两个功能:睡觉、学习,两者互不相干

现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低

解决方法是准备多个房间(多个对象锁)现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低

解决方法是准备多个房间(多个对象锁)

例如:

java
class BigRoom {
	public void sleep() {
		synchronized (this) {
			log.debug("sleeping 2 小时");
			Sleeper.sleep(2);
		}
	}
	
	public void study() {
		synchronized (this) {
			log.debug("study 1 小时");
			Sleeper.sleep(1);
		}
	}
}

执行:

java
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
	bigRoom.study();
}, "小南").start();

new Thread(() -> {
	bigRoom.sleep();
}, "小女").start();

输出:

使用多把锁进行改进:

java
class BigRoom {
	private final Object studyRoom = new Object();
	private final Object bedRoom = new Object();
	
	public void sleep() {
		synchronized (bedRoom) {
			log.debug("sleeping 2 小时");
			Sleeper.sleep(2);
		}
	}
	
	public void study() {
		synchronized (studyRoom) {
			log.debug("study 1 小时");
			Sleeper.sleep(1);
		}
	}
}

输出:

这里就是将锁的细粒度细分:

  • 好处:可以增强并发度
  • 坏处:如果一个线程需要同时获得多把锁,就容易发生死锁

3.11 锁的活跃性

3.11.1 死锁

情况:一个线程需要同时获取多把锁,这时就容易发生死锁

t1 线程获得 A 对象锁,接下来想获取 B 对象的锁,t2 线程获得 B 对象锁,接下来想获取 A 对象的锁

例:

java
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
	synchronized(A) {
		log.debug("lock A");
		sleep(1);
		synchronized (B) {
			log.debug("lock B");
			log.debug("操作...");
		}
	}
}, "t1");

Thread t2 = new Thread(() -> {
	synchronized(B) {
		log.debug("lock B");
		sleep(0.5);
		synchronized(A) {
			log.debug("lock A");
			log.debug("操作...");
		}
	}
}, "t2");

t1.start();
t2.start();

输出:

死锁发生的四个条件:

  • 互斥:资源不能被多个线程共享,一次只能由一个线程使用,如果一个线程已经占用了一个资源,其它请求该资源的线程必须等待,直到资源被释放
  • 持有并等待:一个线程已经持有一个资源,并且在等待获取其它线程持有的资源
  • 不可抢占:资源不能被强制从线程中夺走,必须等线程自己释放
  • 循环等待:存在一种线程等待链,线程 A 等待线程 B 持有的资源,线程 B 等待线程 C 持有的资源,直到线程 N 又等待线程 A 持有的资源

如何避免死锁:

  • 所有线程都按照固定的顺序来申请资源,例如,先申请 R1 再申请 R2
  • 如果线程发现无法获取某个资源,可以先释放已经持有的资源,重新尝试申请
3.11.1.1 定位死锁:

首先从系统级别上排查,比如说在 Linux 生产环境中,可以先使用 topps 等命令查看进程状态,看看是否有进程占用了过多的资源

接着,使用 JDK 自带的一些性能监控工具进行排查,比如说使用 jps -l 查看当前进程,然后使用 jstack 进程号 查看当前进程的线程堆栈信息,看看是否有线程在等待锁资源

也可以使用一些可视化的性能监控工具,比如说 JConsole、VisualVM 等,查看线程的运行状态、锁的竞争情况等

3.11.1.2 哲学家就餐问题

有五位哲学家,围坐在圆桌旁:

  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考
  • 吃饭时要用两根筷子吃,桌上一共有 5 根筷子,每位哲学家左右手边各有一根筷子
  • 有一个时刻每个哲学家都牢牢握着自己左边的筷子,并无限期地等待右边的哲学家放下筷子。而右边的哲学家也在做同样的事。没有任何一个哲学家能够进餐,也没有任何一个哲学家会放下自己已经拿到的筷子。程序完全卡住,这就是死锁。

3.11.2 活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如:

java
public class TestLiveLock {
	static volatile int count = 10;
	static final Object lock = new Object();
	
	public static void main(String[] args) {
		new Thread(() -> {
			//期望减到 0 退出循环
			while (count > 0) {
				sleep(0.2);
				count--;
				log.debug("count: {}", count);
			}
		}, "t1").start();
		
		new Thread(() -> {
			//期望超过 20 退出循环
			while (count < 20) {
				sleep(0.2);
				count++;
				log.debug("count: {}", count);
			}
		}, "t2").start();
	}
}

3.11.3 饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示

第 4 章 ReentrantLock

相对于 synchronized 它具备如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

基本语法:

java
//获取锁
reentrantLock.lock();
try {
	//临界区
} finally {
	//释放锁
	reentrantLock.unlock();
}

4.1 可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁;如果是不可重入锁,那么第二次获得锁时,自己会被自己的锁挡住

java
static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
	method1();
}

public static void method1() {
	lock.lock();
	try {
		log.debug("execute method1");
		method2();
	} finally {
		lock.unlock();
	}
}

public static void method2() {
	lock.lock();
	try {
		log.debug("execute method2");
		method3();
	} finally {
		lock.unlock();
	}
}

public static void method3() {
	lock.lock();
	try {
		log.debug("execute method3");
	} finally {
		lock.unlock();
	}
}

输出:

4.2 可打断

可打断是指线程在阻塞等待锁的时候是可以被打断的

java
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
	log.debug("启动...");
	try {
		lock.lockInterruptibly(); //可打断锁
	} catch (InterruptedException e) {
		e.printStackTrace();
		log.debug("等锁的过程中被打断");
		return;
	}
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(1);
	t1.interrupt();
	log.debug("执行打断");

} finally {
	lock.unlock();
}

输出:

如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断:

java
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
	log.debug("启动...");
	lock.lock();
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(1);
	t1.interrupt();
	log.debug("执行打断");
	sleep(1);
} finally {
	log.debug("释放了锁");
	lock.unlock();
}

输出:

4.3 锁超时

立刻失败:

java
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
	log.debug("启动...");
	if (!lock.tryLock()) {
		log.debug("获取锁立刻失败,返回");
		return;
	}
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(2);
} finally {
	lock.unlock();
}

输出:

超时失败:

java
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
	log.debug("启动...");
	try {
		if (!lock.tryLock(1, TimeUnit.SECONDS)) {
			log.debug("获取等待 1s 后失败,返回");
			return;
		}
	} catch (InterruptException e) {
		e.printStackTrace();
	}
	try {
		log.debug("获得了锁");
	} finally {
		lock.unlock();
	}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
	sleep(2);
} finally {
	lock.unlock();
}

输出:

使用 tryLock 解决哲学家就餐问题:

java
class Chopstick extends ReentrantLock {
	String name;
	
	public Chopstick(String name) {
		this.name = name;
	}
	
	@Override
	public String toString() {
		return "筷子 {" + name + "}"
	}
}

class Philosopher extends Thread {
	Chopstick left;
	Chopstick right;
	
	public Philosopher(String name, Chopstick left, Chopstick right) {
		super(name);
		this.left = left;
		this.right = right;
	}
	
	@Override
	public void run() {
		while (true) {
			//尝试获得左手筷子
			if (left.tryLock()) {
				try {
					//尝试获得右手筷子
					if (right.tryLock()) {
						try {
							eat();
						} finally {
							right.unlock();
						}
					}
				} finally {
					left.unlock();
				}
			}
		}
	}
	
	private void eat() {
		log.debug("eating...");
		Sleeper.sleep(1);
	}
}

4.4 公平锁

4.5 条件变量

synchronized 中也有条件变量,就是 waitSet 休息室,当条件不满足时进入 waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,就好比:

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • ReentrantLock 支持多件休息室,有专门等烟的休息室、专门等早餐的休息室,唤醒时也是按休息室来唤醒

使用要点:

  • await 前需要获得锁
  • await 执行后会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)后重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行

代码示例:

java
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waibreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args) {
	new Thread(() -> {
		try {
			lock.lock();
			while (!hasCigrette) {
				try {
					waitCigaretteQueue.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			log.debug("等到了它的烟");
		} finally {
			lock.unlock();
		}
	}).start();
	
	new Thread(() -> {
		try {
			lock.lock();
			while (!hasBreakfast) {
				try {
					waitbreakfastQueue.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			log.debug("等到了它的早餐");
		} finally {
			lock.unlock();
		}
	}).start();
	
	sleep(1);
	sendBreakfast();
	sleep(1);
	sendCigarette();
}

private static void sendCigarette() {
	lock.lock();
	try {
		log.debug("送烟来了");
		hasCigrette = true;
		waitCigaretteQueue.signal();
	} finally {
		lock.unlock();
	}
}

private static void sendBreakfast() {
	lock.lock();
	try {
		log.debug("送早餐的来了");
		hasBreakfast = true;
		waitbreakfastQueue.signal();
	} finally {
		lock.unlock();
	}
}

输出:

todo 同步模式之顺序控制

第 5 章 Java 内存模型 JMM

上两个章节讲解的 Monitor 主要关注的是访问共享变量时,保证临界区代码的原子性

这一章深入学习共享变量在多线程间的可见性问题与多条指令执行时的有序性问题

JMM 它在 Java 层面定义了主存(主要存放共享变量,如静态变量、成员变量)、工作内存(主要存放私有变量,如局部变量等)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等

JMM 体现在以下几个方面:

  • 原子性:保证指令不会受到线程上下文切换的影响
  • 可见性:保证指令不会受 CPU 缓存的影响
  • 有序性:保证指令不会受 CPU 指令并行优化的影响

5.1 可见性

现象:退不出的循环

main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

java
static boolean run = true;

public static void main(String[] args) throws InterruptedException {
	Thread t = new Thread(() -> {
		while (run) {
			//...
		}
	});
	
	t.start();
	sleep(1);
	run = false; //线程 t 不会如预想的停下来
}

分析:

  1. 初始状态,t 线程刚开始从主内存读取了 run 的值到工作内存

  1. 因为 t 线程要频繁从主内存中读取 run 的值,JIT(Java 即时编译器)会将 run 的值缓存至自己的工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率

  1. 1 秒之后,main 线程修改了 run 的值并同步至主存,而 t 是从自己工作内存中的高速缓存中读取的这个变量的值,结果永远是旧值

解决方法:

使用 volatile 关键字

  • volatile 关键字可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存

可见性 vs 原子性:

  • 可见性保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见,不能保证原子性,仅用在一个写线程、多个读线程的情况

  • 比较一下之前学习的例子:两个线程一个 i++、一个 i--,只能保证看到最新值,不能解决指令交错:

  • 注意:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性,但缺点是 synchronized 是属于重量级操作,性能相对较低

todo 模式之两阶段终止

todo 模式之 Balking

5.2 有序性

JVM 会在不影响正确性的前提下,调整语句的执行顺序:

java
static int i;
static int j;

//在某个线程内执行如下赋值操作
i = ...;
j = ...;

可以看到,至于是先执行 i 还是先执行 j,对最终的结果不会产生影响,所以,上面代码真正执行时,既可以是:

java
i = ...;
j = ...;

也可以是:

java
j = ...;
i = ...;

这种特性称之为指令重排,多线程下指令重排会影响正确性,[[为什么要有重排指令这项优化呢]]:

指令重排序优化:

现代处理器会设计一个时钟周期完成一条执行时间最长的 CPU 指令,为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如:每条指令都可以分为:取指令-指令译码-执行指令-内存访问-数据写回这 5 个阶段

在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,也就是分阶段以提升效率

现代 CPU 支持多级指令流水线,例如支持同时执行取指令-指令译码-执行指令-内存访问-数据写回的处理器就可以称之为五级指令流水线,这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相的提高了指令的吞吐率

由问题引出有序性的重要性:

java
int num = 0;
boolean ready = false;

//线程 1 执行此方法
public void actor1(I_Result r) {
	if (ready) {
		r.r1 = num + num;
	} else {
		r.r1 = 1;
	}
}

//线程 2 执行此方法
public void actor2(I_Result r) {
	num = 2;
	ready = true;
}

问 r1 可能的结果有几种?

结果有很多种:我们着重分析因为指令重排导致的结果:

线程 2 先执行 ready = true,切换到线程 1,进入 if 分支,相加为 0,再切回线程 2 执行 num = 2,这时 r1 = 0

这种现象就叫做指令重排,指令重排的问题就是程序员编写多线程程序时,通常会默认代码是按照书写顺序执行的,我们会对不同线程的操作交错顺序有一种逻辑假设,并依赖这种假设来保证数据的正确性,问题就在于,编译器和 CPU 的重排操作,完全违背了程序员的这种逻辑假设

解决办法:

java
int num = 0;
volatile boolean ready = false;

//线程 1 执行此方法
public void actor1(I_Result r) {
	if (ready) {
		r.r1 = num + num;
	} else {
		r.r1 = 1;
	}
}

//线程 2 执行此方法
public void actor2(I_Result r) {
	num = 2;
	ready = true;
}

5.3 volatile 原理

volatile 的底层实现原理是内存屏障:

  • 对 volatile 变量的写指令的后面会加入写屏障
  • 对 volatile 变量的读指令的前面会加入读屏障

5.3.1 如何保证可见性

写屏障保证在该屏障之前的对共享变量的改动,都同步到主存中

java
public void actor2(I_Result r) {
	num = 2;
	ready = true; //ready 是 volatile 的,赋值带写屏障
	//写屏障
}

读屏障保证在该屏障之后对共享变量的读取,加载的是主存中最新的数据

java
public void actor1(I_Result r) {
	//读屏障
	if (ready) { //ready 是 volatile 的,赋值带写屏障
		r.r1 = num + num;
	} else {
		r.r1 = 1;
	}
}

volatile 可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存,即一个线程对 volatile 变量的修改,对另一个线程可见

5.3.2 如何保证有序性

写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后

读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

5.4 双重检查锁定 (Double-Checked Locking, DCL)

这是一个用于延迟初始化(Lazy Initialization)的高效设计模式,但它充满了陷阱,是理解 Java 内存模型(JMM)、指令重排序和 volatile 关键字重要性的绝佳案例。


1. 目标:延迟初始化与线程安全

我们想实现一个单例模式(Singleton),并且希望只有在真正使用时才创建实例(延迟初始化),同时保证在多线程环境下的线程安全。

最直接的方法是给获取实例的方法加上 synchronized

java
public class Singleton {
    private static Singleton instance;

    public static synchronized Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton(); // 延迟初始化
        }
        return instance;
    }
}

缺点: 每次调用 getInstance() 都需要获取锁,即使实例早已创建完成。这造成了不必要的同步开销。


2. DCL 的初衷:降低同步开销

DCL 的目的是减少同步的范围,理想情况是:只有第一次创建实例时才需要同步,之后所有调用都应该无锁地直接返回已创建的实例。

它的基本结构如下:

java
// 版本一:天真的 DCL (这是错误的!)
public class Singleton {
    private static Singleton instance; // 注意:这里没有 volatile

    public static Singleton getInstance() {
        if (instance == null) {              // 第一次检查 (无锁)
            synchronized (Singleton.class) { // 加锁
                if (instance == null) {      // 第二次检查 (加锁状态下)
                    instance = new Singleton(); // 问题的根源!
                }
            }
        }
        return instance;
    }
}
  • 第一次检查 (if (instance == null)): 在同步块之外。如果实例已经存在,则直接返回,避免了绝大多数情况下的锁竞争。
  • 同步块 (synchronized): 确保只有一个线程能进入创建实例的代码。
  • 第二次检查 (if (instance == null)): 在同步块内部。为了防止在第一个线程进入同步块并创建实例的过程中,有其他线程已经在同步块外等待。当等待的线程进入同步块时,实例可能已被创建,所以需要再次检查。

这个逻辑看起来完美,但在 Java 1.5 之前,这是一个广为人知的反模式。即使在 Java 1.5 之后,如果缺少一个关键修饰符,它仍然是错误的。


3. 问题的根源:指令重排序

关键问题出在这一行:

java
instance = new Singleton();

这行代码(字节码指令)并不是一个原子操作。它可以抽象为以下三个步骤:

  1. memory = allocate(); // 1. 分配对象的内存空间
  2. ctorInstance(memory); // 2. 初始化对象(调用构造函数)
  3. instance = memory; // 3. 将 instance 引用指向刚分配的内存地址

问题在于:步骤 2 和步骤 3 可能会被编译器或处理器进行重排序,变成:

  1. memory = allocate(); // 1. 分配对象的内存空间
  2. instance = memory; // 3. 将引用指向内存空间 (此时 instance != null,但对象还未初始化!)
  3. ctorInstance(memory); // 2. 初始化对象

现在,我们来看一个并发场景:

  1. 线程 A 进入同步块,执行 new Singleton()
  2. 线程 A 发生了重排序,先执行了步骤 1 和 3。此时 instance 已经不再为 null,但它指向的是一块尚未初始化的原始内存
  3. 线程 B 执行 getInstance(),进行第一次检查 if (instance == null)
  4. 线程 B 发现 instance 不是 null(因为线程 A 已经执行了步骤 3),于是跳过同步块,直接返回 instance
  5. 线程 B 开始使用这个未完全初始化的 Singleton 实例,导致不可预料的错误(如读取到默认值 0 或 null)。

4. 解决方案:使用 volatile

从 Java 5 开始,JMM 增强了 volatile 的内存语义。对一个 volatile 变量的写操作会与后续的读操作建立 happens-before 关系,并且会禁止编译器对围绕 volatile 变量的指令进行重排序。

正确的 DCL 实现:

java
public class Singleton {
    // 关键:使用 volatile 修饰实例变量
    private static volatile Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) {              // 第一次检查 (无锁)
            synchronized (Singleton.class) { // 加锁
                if (instance == null) {      // 第二次检查 (加锁状态下)
                    instance = new Singleton(); // 现在安全了
                }
            }
        }
        return instance;
    }
}

volatile 在这里起到了两个至关重要的作用:

  1. 禁止重排序

    • 写入一个 volatile 变量(instance = new Singleton())就像一个内存屏障(Memory Barrier)。
    • 它确保在 volatile 操作之前的所有操作(步骤 1 和 步骤 2:初始化对象)都不会被重排序到写操作之后。
    • 它也确保在 volatile 操作(if (instance == null)之后的所有操作,都不会被重排序到读操作之前。
    • 这就强制保证了对象的初始化(步骤 2)一定在将引用赋值给 instance(步骤 3)之前完成。
  2. 保证可见性

    • 当线程 A 完成对 volatile 变量 instance 的写入后,它会立即将修改后的值强制刷新到主内存。
    • 当线程 B 读取 volatile 变量 instance 时,它会强制从主内存重新加载最新值,确保能看到线程 A 初始化完成后的完整对象。

5. 替代方案:基于类初始化的解决方案 (Holder Class Idiom)

由于 DCL 的实现略显复杂且容易出错,Java 社区推荐一种更简洁、线程安全且无同步开销的实现方式——静态内部类 Holder

java
public class Singleton {

    // 私有构造函数
    private Singleton() {}

    // 静态内部类
    private static class SingletonHolder {
        // JVM 在类加载初始化阶段会保证这个操作的线程安全性
        private static final Singleton INSTANCE = new Singleton();
    }

    public static Singleton getInstance() {
        return SingletonHolder.INSTANCE;
    }
}

原理:

  • JVM 在加载外部类 Singleton 时,不会立即加载其内部类 SingletonHolder
  • 只有当第一次调用 getInstance() 方法时,才会触发 JVM 加载并初始化 SingletonHolder 类。
  • JVM 在类的初始化阶段(执行静态字段赋值和静态块)会自动获取一个锁,并保证多个线程同时初始化一个类时,只有一个线程能执行,其他线程会阻塞等待。
  • 这个过程天然地保证了线程安全,且 static final 保证了 INSTANCE 只被实例化一次。

优点: 无需同步锁,代码简洁,由 JVM 保证绝对安全,是实现线程安全单例的最佳实践之一。


总结

特性双重检查锁定 (DCL)静态内部类 (Holder)
延迟初始化
线程安全是 (必须加 volatile)是 (由 JVM 保证)
性能高 (绝大多数情况无锁)高 (无锁)
实现复杂度复杂,易出错简单,清晰
Java 版本要求正确实现需要 Java 5+ 的 volatile无特殊要求

结论:

  1. 永远不要使用没有 volatile 的 DCL
  2. 如果你决定使用 DCL,务必用 volatile 修饰实例变量
  3. 在大多数情况下,优先选择使用静态内部类 (Holder) 的方式来实现延迟加载的单例,它更简单、更安全。DCL 通常只在初始化成本极高且需要极致性能的罕见场景下才被考虑。

5.5 happens-before

happens-before 是 Java 内存模型定义的一种保证线程间可见性和有序性的规则

核心思想:一种内存可见性保证

happens-before 关系的核心定义是:

如果操作 A happens-before 操作 B,那么:

  • 操作 A 的结果对操作 B 可见
  • 操作 A 在时间上先于操作 B 执行

换句话说,如果 A happens-before B,那么 A 的修改必须对 B 可见,并且 B 不能重排序到 A 之前

这听起来简单,但它的威力在于它建立了一种跨线程的内存可见性保证。即使操作 A 和 B 发生在不同的线程上,只要它们之间存在确定的 happens-before 关系,JVM 就有义务保证 A 的结果对 B 是可见的,无需开发者手动担心缓存不一致等问题。

为什么需要 happens-before?—— JMM 的简化

没有 happens-before 之前,你可能会想:“我在线程 1 先写了变量 x=1,然后线程 2 去读,肯定读到 1 吧?”

很遗憾,在现代计算机体系结构下(多级缓存、指令重排序),答案是不一定。线程 1 的写入可能暂时停留在自己的本地缓存(CPU Cache)中,没有及时刷回主内存;或者编译器/处理器为了优化对指令进行了重排序,导致写操作“看起来”延迟了。

Java 内存模型(JMM)通过 happens-before 规则向程序员提供了一个强大的、跨平台的承诺:只要你按照这些规则编写代码,JVM 就会保证正确的可见性。至于底层是如何实现的(插入内存屏障、禁止重排序等),那是 JVM 的事情,程序员无需关心。

可以把它想象成一个“江湖规矩” 或者 “承诺书”。在多线程这个江湖里,如果没有规矩,各个线程(Thread大哥们)各自为政,数据就会乱成一锅粥。而 happens-before 就是 Java 立下的几条铁律,只要遵守这些规矩,JVM 就向你保证:你的数据不会错乱!

核心思想:一种“看见”的保证

“A happens-before B” 最直接的意思就是:A 做完的事,B 一定能看见。

它不是严格保证 A 在时间上一定比 B 早发生(虽然大部分时候是),它保证的是 “可见性”:只要 B 发生了,那 A 留下的“烂摊子”(修改的数据),B 推门进来一眼就能看到,不会出现“诶?刚才谁都没来过吗?”这种灵异事件。


八大江湖规矩(Happens-Before规则)

现在,我们来看看 Java 江湖里的八大规矩:

规矩一:代码顺序规矩 (Program Order Rule)

同一个线程里,前面写的代码 happens-before 后面写的代码。 单线程内,代码按顺序执行,比如 a = 1; b = 2; a 先于 b 执行 比喻: 你给自己泡杯茶,肯定是先烧水 -> 再放茶叶 -> 最后倒水。这个顺序不能乱,乱了你就喝不到茶了。JVM 保证在同一个线程里,你的代码逻辑顺序不会出问题。

规矩二:锁规矩 (Monitor Lock Rule)

解锁 happens-before 后续的加锁。unlock() happens-before lock() 比如 synchronized 释放锁后,获取锁的线程能够看到最新的数据 比喻: 一个公共厕所(共享资源)只有一个坑位(锁)。线程 A 上完厕所出来了(解锁),线程 B 才能进去(加锁)。B 进去后,肯定能看到 A 冲干净了的马桶(A 修改过的数据),而不是一个没冲的马桶。这就是 synchronized 关键字的工作原理。

规矩三:Volatile 变量规矩 (Volatile Variable Rule)

写一个 volatile 变量 happens-before 后续读这个变量。 写 volatile 变量 happens-before 读 volatile 比喻: volatile 变量就像一个信号灯。线程 A 把信号灯调成绿色(写操作),然后大喊一声:“绿灯啦!”。这个喊声特别厉害,能确保所有其他线程(线程 B, C, D...)都立刻能听到并看到这个绿灯(读操作)。只要 B 看到了绿灯,那它就知道在亮绿灯之前,A 做的所有事(比如修好了路)都已经完成了。

规矩四:线程启动规矩 (Thread Start Rule)

thread.start() happens-before 这个新线程的任何操作。 线程 A 执行操作 ThreadB.start(),那么 A 线程的 ThreadB.start() 操作 happens-before 于线程 B 中的任意操作 比喻: 妈妈(主线程)在叫孩子(新线程)起床吃饭之前,已经把饭菜都做好摆在桌上了(数据初始化)。孩子起床后(start()),肯定能看到一桌做好的饭菜,而不用自己再做一遍。

规矩五:线程终止规矩 (Thread Termination Rule)

线程里的所有操作 happens-before 其他线程发现它死了(比如用 thread.join() 等到它结束)。 线程的所有操作 happens-before Thread.join(),例如 t.join() 之后,主线程一定能看到 t 的 比喻: 一个工人(子线程)在工地干活,他干完所有活(所有操作),写好最后的报告(最终数据状态),然后下班走了(线程终止)。老板(主线程)第二天早上来工地(调用 join() 方法返回),拿起报告,肯定能看到工人最终完成的全部工作成果。

规矩六:中断规矩 (Thread Interruption Rule)

调用 thread.interrupt() happens-before 线程发现自己被中断了。比喻: 你女朋友给你发微信说“我们分手吧”(interrupt()),这个发送动作 happens-before 你看到消息(isInterrupted())。不可能出现你先看到消息,她后发送的情况。

规矩七:对象终结规矩 (Finalizer Rule)

对象的构造函数执行完毕(生完了) happens-before 它的 finalize() 方法开始(收尸开始)。比喻: 一定要等一个人彻底出生、构造完整了,才能给他办葬礼(finalize())。不能人还没生出来就埋了。

规矩八:传递律 (Transitivity)

如果 A happens-before B,且 B happens-before C,那么 A happens-before C。 A happens-before B 且 B happens-before C,则 A happens-before C,例如 a = 1 先于 b = 2,b = 2 先于 c = 3,则 a = 1 先于 c = 3 比喻: 你爷爷 happened-before 你爸爸,你爸爸 happened-before 你。那么根据传递律,你爷爷 happened-before 你。你肯定继承了你爷爷的遗产(能看到你爷爷和爸爸留下的所有数据)。


为什么要立这些规矩?

如果没有这些规矩,江湖就大乱了:

  • 可见性问题:线程 A 改了数据,线程 B 却看不见,还以为数据是旧的。
  • 有序性问题:代码写的顺序是 A->B->C,但JVM为了优化,可能偷偷改成 C->B->A 来执行,导致意想不到的错误。

happens-before 规矩就是 JVM 给你的一个承诺: “老弟,你只要按我的规矩写代码,我保证你的数据在各线程之间看得见、摸得着,不会出乱子。至于我底层是怎么实现(比如插入了多少内存屏障阻止重排序)的,你不用管,那是我的事!”

总结

下次你再看到 “happens-before”,别去想那些复杂的概念,就想:

“一条强制的【看见】的保证。”

它建立了操作之间的可见性关系,是 synchronizedvolatileLock 等所有同步工具能够生效的理论基础。理解了它,你才能真正理解 Java 并发编程的精髓。

第 6 章 无锁

6.1 问题提出

有如下需求,保证 account.withdraw 取款方法的线程安全

java
interface Account {
	//获取余额
	Integer getBalance();
	
	//取款
	void withdraw(Integer amount);
	
	static void demo(Account account) {
		List<Thread> ts = new ArrayList<>();
		long start = System.nanoTime();
		for (int i = 0; i < 1000; i++) {
			ts.add(new Thread(() -> {
				account.withdraw(10);
			}));
		}
		ts.forEach(Thread::start);
		ts.forEach(t -> {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});
		long end = System.nanoTime();
		System.out.println(account.getBalance() + "cost: " + (end - start) / 1000_000 + "ms");
	}
}
java
class AccountUnsafe implements Account {
	private Integer balance;
	
	public AccountUnsafe(Integer balance) {
		this.balance = balance;
	}
	
	@Override
	public Integer getBalance() {
		return balance;
	}
	
	@Override
	public void withdraw(Integer amount) {
		balance -= amount;
	}
}

执行测试代码:

java
public static void main(String[] args) {
	Account.demo(new AccountUnsafe(10000));
}

输出:

解决思路 - 锁:

首先想到的是给 Account 对象加锁:

java
class AccountUnsafe implements Account {
	private Integer balance;
	
	public AccountUnsafe(Integer balance) {
		this.balance = balance;
	}
	
	@Override
	public synchronized Integer getBalance() {
		return balance;
	}
	
	@Override
	public synchronized void withdraw(Integer amount) {
		balance -= amount;
	}
}

结果为:

解决思路 - 无锁:

java
class AccountUnsafe implements Account {
	private AtomicInteger balance;
	
	public AccountUnsafe(Integer balance) {
		this.balance = new AtomicInteger(balance);
	}
	
	@Override
	public Integer getBalance() {
		return balance.get();
	}
	
	@Override
	public void withdraw(Integer amount) {
		while (true) {
			int prev = balance.get();
			int next = prev - amount;
			if (balance.compareAndSet(prev, next)) {
				break;
			}
		}
	}
}

6.2 CAS

前面使用的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全,那么它是怎么实现的?

java
public void withdraw(Integer amount) {
		while (true) { //需要不断尝试,直到成功为止
			//这里先拿到 balance 此刻的值,快照
			int prev = balance.get();
			//这里对共享变量做运算
			int next = prev - amount;
			//compareAndSet 是一个原子操作:我认为值应该是 A,如果是,就把它改成 B;否则,不要修改它
			//这里就是认为 balance 的值应该是 prev,如果是,说明之前没有线程来修改过,那么我就可以把 balance 的值修改为 next,返回 true;如果不是,说明之前有线程修改过 balance 的值,那么 cas 就失败,返回 false
			if (balance.compareAndSet(prev, next)) {
				break;
			}
		}
	}

其中的关键是 compareAndSet,简称 CAS,它必须是原子操作:

注意:

  • 其实 CAS 的底层是 lock cmpxchg 指令,在单核 CPU 和多核 CPU 下都能够保证比较-交换的原子性
  • 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线,这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的

获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰,CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果

CAS 特点:

  • 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
  • CAS 体现的是无锁并发、无阻塞并发
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

6.3 为什么无锁效率高

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞,打个比喻:
  • 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速、恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换

6.4 原子整数

以下是 JUC 并发包提供了的很多的重要工具,它们都基于 CAS 操作实现无锁线程安全

  • AtomicBoolean

  • AtomicInteger

  • AtomicLong

  • 原子整数是专门用于对基本类型整数进行原子操作的类,最常见的代表是 AtomicInteger

  • 我们都知道,即便是 i++ 这种看似简单的操作,在底层也是非原子的(包含读取、增加、写入),在多线程下会导致竞态条件,使用 synchronized 可以解决,但重量级锁开销大,AtomicInteger 等提供了一种更轻量级、更高效的解决方案

  • AtomicInteger 封装了一个 volatile int value 并提供了一系列原子操作的方法:

    • get() / set(int newValue):普通的获取和设置值(本身具有 volatile 的读写语义)
    • 核心方法:compareAndSet(int expect, int update):CAS 操作,是所有方法的基石

6.5 原子引用

  • AtomicReference

  • AtomicMarkableReference

  • AtomicStampedReference

  • 原子引用是用于对对象引用进行原子操作的类,核心代表是 AtomicReference<V>,其中 V 是泛型参数,代表引用的对象类型

  • 我们有时候需要原子的更新以一个复杂的对象,而不仅仅是简单的整数,例如更新一个配置对象、一个链表的下一个节点指针、或者以一个代表状态的对象,synchronized 同样可以做到,但原子引用提供了无锁的可能性

  • 它封装了一个 volatile V reference

6.5.1 ABA 问题及解决

ABA 问题:

java
static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
	log.debug("main start...");
	//获取值 A
	String prev = ref.get();
	other();
	sleep(1);
	//尝试改为 C
	log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}

private static void other() {
	new Thread(() -> {
		log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
	}, "t1").start();
	sleep(0.5);
	new Thread(() -> {
		log.debug("change B-> A {}", ref.compareAndSet(ref.get(), "A"));
	}, "t2").start();
}

输出:

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望只要有其它线程动过了共享变量,那么自己的 CAS 就失败,这时,仅比较值是不够的,需要再加一个版本号,可以使用 AtomicStampedReference 或者 AtomicMarkableReference

6.5.2 AtomicStampedReference

java
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

public static void main(String[] args) throws InterruptedException {
	log.debug("main start...");
	//获取值 A
	String prev = ref.getReference();
	//获取版本号
	int stamp = ref.getStamp();
	log.debug("版本 {}", stamp);
	//如果中间有其它线程干扰,发生了 ABA 现象
	other();
	sleep(1);
	//尝试改为 C
	log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}

private static void other() {
	new Thread(() -> {
		log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp, ref.getStamp() + 1));
		log.debug("更新版本为 {}", ref.getStamp());
	}, "t1").start();
	sleep(0.5);
	new Thread(() -> {
		log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp, ref.getStamp() + 1));
		log.debug("更新版本为 {}", ref.getStamp());
	}, "t2").start();
}

输出:

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,通过 AtomicStampedReference,我们可以知道引用变量中途被更改了几次

但是有时候并不关心引用变量更改了几次,只是单纯的关心是否被改过,所以就有了 AtomicMarkableReference

6.5.3 AtomicMarkableReference

与 AtomicStampedReference 类似,但它用一个 boolean 标记来替代版本号,适用于那种对象是否被修改过的场景,而不关心修改了多少次

6.6 原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

原子数组是 JUC 包下提供的一组类,用于高效、线程安全的操作数组中的单个元素,而无需使用重量级的同步锁

它们并不是指整个数组的操作都是原子的(例如原子的交换整个数组),而是指对数组中的每个独立元素的读写和更新操作是原子的

为什么需要原子数组?

  • 想象一个场景:你有一个很大的数组,被多个线程频繁地更新,但每个线程只更新其中不同的索引位置。
  • 使用 synchronized 的缺点:如果你用 synchronized 来保护整个数组,那么同一时间只能有一个线程操作数组中的任何一个元素,即使它们操作的是完全不同的位置。这会造成巨大的性能瓶颈,线程之间会频繁争抢锁,导致不必要的阻塞。
  • 使用原子数组的优势:原子数组利用CAS(Compare-And-Swap) 操作,允许多个线程同时修改数组中的不同元素。因为每个元素的操作是独立的,线程间不存在锁竞争(除非它们恰好要修改同一个元素),从而大大提升了高并发场景下的吞吐量。

6.7 字段更新器

  • AtomicReferenceFieldUpdater:用于更新某个类中的 volatile V 类型的引用字段
  • AtomicIntegerFieldUpdater:用于更新某个类中的 volatile int 字段
  • AtomicLongFieldUpdater

字段更新器是 JUC 包下的一组类,它们提供了一种基于反射的、对已存在的类的 volatile 字段进行原子性访问的机制

可以理解为:一个外挂的原子操作工具箱,它不需要你修改类的原始结构(比如把字段类型从 int 改为 AtomicInteger),就能为这个类的某个普通 volatile 字段赋予类似原子类的 CAS 操作能力

6.8 原子累加器

原子累加器是 JUC 包中引入的一组类,用于在高并发环境下实现高性能的累加操作,最常见的两个类是:

  • LongAdder
  • DoubleAdder

它们的设计目标非常明确:在极高的线程竞争环境下,提供比 AtomicLong 和 AtomicDouble 高得多的吞吐量

  1. 为什么需要它?

要理解原子累加器,必须先明白它要解决什么问题。我们先回顾一下 AtomicLong 的工作原理。

AtomicLong 的工作方式: 它内部维护着一个 volatile long value。当线程调用 increment() 或 add() 方法时,它会在一个单一的、共享的 value 变量上执行CAS操作。

优点: 在低至中等的竞争条件下,表现很好。

缺点: 在极高竞争环境下,大量线程会不断地尝试CAS更新同一个变量。这会导致大量的CPU空转(自旋) 和缓存一致性流量(如缓存行失效),从而成为系统瓶颈,显著降低吞吐量。

原子累加器的解决方案: LongAdder 采用了一种“分而治之”的策略。它不再让所有线程去争抢一个共享变量,而是将竞争分散到多个单元上。

  1. 核心工作原理:“空间换时间”与最终一致性

LongAdder 的核心思想非常巧妙: 基础值(Base Value): 它内部仍然有一个 base 字段,类似于 AtomicLong 的 value。在没有竞争或竞争很低时,操作会直接CAS更新 base。

单元数组(Cells[]): 当竞争加剧(即线程在CAS更新 base 时失败),LongAdder 就会初始化一个 Cell 数组。Cell 是一个用 @sun.misc.Contended 注解填充了的 volatile long 变量,其主要目的是避免伪共享(False Sharing)。

分散竞争: 每个线程会通过哈希算法(通常与线程ID相关)映射到 Cell 数组中的某一个槽位(Cell),然后对该槽位进行自己的累加操作。这样,原本所有线程争抢一个 base,就变成了多个线程分散到多个 Cell 上去操作,竞争压力被大大稀释了。

求和(读操作): 当你调用 sum() 方法获取最终结果时,LongAdder 会将所有 Cell 数组中的值加上 base 的值,返回总和。

这个过程体现了“最终一致性”:在累加过程中,总和是不确定的(因为有些值还在Cell里没合并)。只有在调用 sum() 的那一刻,你才能得到一个准确的、最终的有效值。这意味着它不适合要求强一致性的场景(例如序列号生成器),但极其适合统计型场景。

todo 源码之 LongAddeer

todo 原理之伪共享

6.9 Unsafe

[[Unsafe 详解]]

第 7 章 不可变类

7.1 日期转换的问题

下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的,有很大概率出现异常

出现异常:

解决 1 - 同步锁:

这样虽然能解决问题,但是会带来性能上的损失:

解决 2 - 使用不可变类

如果一个对象不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改,这样的对象在 Java 中有很多,例如在 Java 8 之后,提供了一个新的日期格式化类:

7.2 不可变类的设计

另一个更为熟悉的 String 类也是不可变的,以 String 为例,说明不可变类设计的要素:

final 的使用:

  • String 类、类中所有的属性都是 final 的
  • 属性用 final 修饰保证了该属性是只读的,不能修改
  • 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

保护性拷贝:

  • 当在使用字符串时,也有一些跟修改相关的方法,比如 substring 等,那这些方法是如何实现的?

  • 发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造方法,看是否对 final char[] value 做出了修改?

  • 结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制,这种通过创建副本对象来避免共享的手段称之为保护性拷贝

7.3 享元模式

享元模式用于当需要重用数量有限的同一类对象时

体现:

包装类:在 JDK 中的 Boolean、Byte、Short、Integer、Long、Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:

应用:

例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。

java
class Pool {
	//1. 连接池大小
	private final int poolSize;
	
	//2. 连接对象数组
	private Connection[] connections;
	
	//3. 连接状态数组 0 表示空闲、1 表示繁忙
	private AtomicIntegerArray states;
	
	//4. 构造方法初始化
	public Pool(int poolSize) {
		this.poolSize = poolSize;
		this.connections = new Connection[poolSize];
		this.states = new AtomicIntegerArray(new int[poolSize]);
		for (int i = 0; i < poolSize; i++) {
			connections[i] = new MockConnection("连接" + (i + 1));
		}
	}
	
	//5. 借连接
	public Connection borrow() {
		while(true) {
			for (int i = 0; i < poolSize; i++) {
				//获取空闲连接
				if (states.get(i) == 0) {
					if (states.compareAndSet(i, 0, 1)) {
						log.debug("borrow {}", connections[i]);
						return connections[i];
					}
				}
			}
			// 如果没有空闲连接,当前线程进入等待
			synchronized (this) {
				try {
					log.debug("wait...");
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	//6. 归还连接
	public void free(Connection conn) {
		for (int i = 0; i < poolSize; i++) {
			if (connections[i] == conn) {
				states.set(i, 0);
				synchronized (this) {
					log.debug("free {}", conn);
					this.notifyAll();
				}
				break;
			}
		}
	}
}

class MockConnection implements Connection {
	//实现略
}

//使用连接池
public static void main(Sting[] args) {
	Pool pool = new Pool(2);
	for (int i = 0; i < 5; i++) {
		new Thread(() -> {
			Connection conn = pool.borrow();
			try {
				Thread.sleep(new Random().nextInt(1000));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			pool.free(conn);
		}).start();
	}
}

7.4 final 原理

理解了 volatile 原理,再对比 final 的实现就比较简单了

字节码:

发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况

第 8 章 工具

8.1 线程池

8.1.1 自定义线程池

main 线程创建任务交给 Thread Pool 处理,Thread Pool 处理不完任务的就放到 Blocking Queue 等待

  1. 自定义拒绝策略接口
java
@FuntionalInterface //拒绝策略
interface RejectPolicy<T> {
	void reject(BlockingQueue<T> queue, T task);
}
  1. 自定义任务队列
java
class BlockingQueue<T> {
	//1. 任务队列
	private Deque<T> queue = new ArrayDeque<>();
	
	//2. 锁
	private ReentrantLock lock = new ReentantLock();
	
	//3. 生产者条件变量
	private Condition fullWaitSet = lock.newCondition();
	
	//4. 消费者条件变量
	private Condition emptyWaitSet = lock.newCondition();
	
	//5. 容量
	private int capcity;
	
	public BlockingQueue(int capcity) {
		this.capcity = capcity;
	}
	
	//带超时阻塞获取
	public T poll(long timeout, TimeUnit unit) {
		lock.lock();
		try {
			//将 timeout 统一转换为纳秒
			long nanos = unit.toNanos(timeout);
			while (queue.isEmpty()) {
				try {
					//返回值是剩余时间
					if (nanos <= 0) {
						return null;
					}
					nanos = emptyWaitSet.awaitNanos(nanos);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			T t = queue.removeFirst();
			fullWaitSet.signal();
			return t;
		} finally {
			lock.unlock();
		}
	}
	
	//阻塞获取
	public T take() {
		lock.lock();
		try {
			while (queue.isEmpty()) {
				try {
					emptyWaitSet.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			T t = queue.removeFirst();
			fullWaitSet.signal();
			return t;
		} finally {
			lock.unlock();
		}
	}
	
	//阻塞添加
	public void put(T task) {
		lock.lock();
		try {
			while(queue.size() == capcity) {
				try {
					log.debug("等待加入任务队列 {} ...", task);
					fullWaitSet.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			log.debug("加入任务队列 {}", task);
			queue.addLast(task);
			emptyWaitSet.signal();
		} finally {
			lock.unlock();
		}
	}
	
	//带超时时间阻塞添加
	public boolean offer(T task, long timeout, TimeUnit timeUnit) {
		lock.lock();
		try {
			long nanos = timeUnit.toNanos(timeout);
			while (queue.size() == capcity) {
				try {
					if (nanos <= 0) {
						return false;
					}
					log.debug("等待加入任务队列 {} ...", task);
					nanos = fullWaitSet.awaitNanos(nanos);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			log.debug("加入任务队列 {}", task);
			queue.addLast(task);
			emptyWaitSet.signal();
			return true;
		} finally {
			lock.unlock();
		}
	}
	
	public int size() {
		lock.lock();
		try {
			return queue.size();
		} finally {
			lock.unlock();
		}
	}
	
	public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
		lock.lock();
		try {
			//判断队列是否满
			if (queue.size() == capcity) {
				rejectPolicy.reject(this, task);
			} else { //有空闲
				log.debug("加入任务队列 {}", task);
				queue.addLast(task);
				emptyWaitSet.signal();
			}
		} finally {
			lock.unlock();
		}
	}
}
  1. 自定义线程池
java
class ThreadPool {
	//任务队列
	private BlockingQueue<Runnable> taskQueue;
	
	//线程集合
	private HashSet<Worker> workers = new HashSet<>();
	
	//核心线程数
	private int coreSize;
	
	//获取任务时的超时时间
	private long timeout;
	
	private TimeUnit timeUnit;
	
	private RejectPolicy<Runnable> rejectPolicy;
	
	public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
		this.coreSize = coreSize;
		this.timeout = timeout;
		this.timeUnit = timeUnit;
		this.taskQueue = new BlockingQueue<>(queueCapcity);
		this.rejectPolicy = rejectPolicy;
	}
	
	class Worker extends Thread {
		private Runnable task;
		
		public Worker(Runnable task) {
			this.task = task;
		}
		
		@Override
		public void run() {
			//执行任务
			//当 task 不为空,执行任务
			//当 task 执行完毕,再接着从任务队列获取任务并执行
			while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
				try {
					log.debug("正在执行...{}", task);
					task.run();
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					task = null;
				}
			}
			synchronized (workers) {
				log.debug("worker 被移除{}", this);
				workers.remove(this);
			}
		}
	}
	
	//执行任务
	public void execute(Runnable task) {
		//当任务数没有超过 coreSize 时,直接交给 worker 对象执行
		//如果任务数超过 coreSize 时,加入任务队列暂存
		synchronized (workers) {
			if (workers.size() < coreSize) {
				Worker worker = new Worker(task);
				log.debug("新增 worker{}, {}", worker, task);
				workers.add(worker);
				worker.start();
			} else {
				taskQueue.put(task);
				//1.死等
				//2.带超时等待
				//3.让调用者放弃任务执行
				//4.让调用者抛出异常
				//5.让调用者自己执行任务
				taskQueue.tryPut(rejectPolicy, task);
			}
		}
	}
}
  1. 测试
java
public static void main(String[] args) {
	ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
		//1.死等
		//queue.put(task);
		//2.超时等待
		//queue.offer(task, 1500, TimeUnit.MILLISECONDS);
		//3.让调用者放弃任务执行
		//log.debug("放弃{}", task);
		//4.让调用者抛出异常
		//throw new RuntimeException("任务执行失败 " + task);
		//5.让调用者自己执行任务
		task.run();
	});
	for (int i = 0; i < 4; i++) {
		int j = i;
		threadPool.execute(() -> {
			try {
				Thread.sleep(1000L);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			log.debug("{}", j);
		});
	}
}

8.1.2 ThreadPoolExecutor

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值

8.1.2.1 构造方法

  • corePoolSize:核心线程数目(最多保留的线程数)
  • maximumPoolSize:最大线程数目
  • keepAliveTime:生存时间-针对救急线程
  • unit:时间单位-针对救急线程
  • workQueue:阻塞队列
  • threadFactory:线程工厂
  • handler:拒绝策略

工作方式:

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排队,直到有空闲的线程
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急
  • 如果线程到达 maximumPoolSize 并且任务队列也满了仍然有新任务时会执行拒绝策略,拒绝策略 JDK 有四种实现,其它著名框架也提供了实现:
    • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy:让调用者运行任务
    • DiscardPolicy:放弃本次任务
    • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现:在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
    • Netty 的实现:创建一个新线程来执行任务
    • ActiveMQ 的实现:带超时等待(60s)尝试放入队列
    • PinPoint 的实现:使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制

线程池的几种阻塞队列:

常用的有五种,有界队列 ArrayBlockingQueue;无界队列 LinkedBlockingQueue;优先级队列 PriorityBlockingQueue;延迟队列 DelayQueue;同步队列 SynchronousQueue

(1)ArrayBlockingQueue:一个有界的先进先出的阻塞队列,底层是一个数组,适合固定大小的线程池

java
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10, true);

(2)LinkedBlockingQueue:底层是链表,如果不指定大小,默认大小是 Integer.MAX_VALUE,几乎相当于一个无界队列

(3)PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列,任务按照其自然顺序或 Comparator 来排序,适用于需要按照给定优先级处理任务的场景,比如优先处理紧急任务

(4)DelayQueue:类似于 PriorityBlockingQueue,由二叉堆实现的无界优先级阻塞队列,Executors 中的 newScheduledThreadPool() 就使用了 DelayQueue 来实现延迟执行

(5)SynchronousQueue:每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都必须等待另一个线程的插入操作,Executors.newCachedThreadPool() 就使用了 SynchronousQueue,这个线程池会根据需要创建新线程,如果有空闲线程则会重复使用,线程空闲 60 秒后会被回收

线程池的几种状态:

线程池有 5 种状态,它们的转换遵循严格的状态流转规则,不同状态控制着线程池的任务调度和关闭行为,状态由 running -> shutdown -> stop -> tidying -> terminated 依次流转

running 状态的线程池可以接收新任务,并处理阻塞队列中的任务;shutdown 状态的线程池不会接收新任务,但会处理阻塞队列中的任务;stop 状态的线程池不会接收新任务,也不会处理阻塞队列中的任务,并且会尝试中断正在执行的任务;tidying 状态表示所有任务已经终止;terminated 状态表示线程池安全关闭,所有线程销毁

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,如下:

8.1.2.2 newFixedThreadPool

特点:

  • 核心线程数 = 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务
  • 固定大小的线程池,适合用于任务数量确定,且对线程数有明确要求的场景,例如,IO 密集型任务、数据库连接池等
8.1.2.3 newCachedThreadPool

特点:

  • 核心线程数是 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收),救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
  • 缓存线程池,适用于短时间内任务量波动较大的场景,例如,短时间内有大量的文件处理任务或网络请求
8.1.2.4 newSingleThreadExecutor

使用场景:希望多个任务排队执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放,适用于日志记录、文件处理等

8.1.2.5 newScheduledThreadPool

定时任务线程池,适用于需要定时执行任务的场景,例如,定时发送邮件、定时备份数据等

8.1.3 线程池常用方法

8.1.3.1 提交任务的方法

线程池提交 execute 和 submit 有什么区别:

execute 方法没有返回值,适用于不关心结果和异常的简单任务

java
threadsPool.execute(new Runnable() {
	@Override
	public void run() {
		System.out.println("execute() 方法提交的任务");
	}
})

submit 有返回值,适用于需要获取结果或处理异常的场景

java
Future<Object> future = executor.submit(harReturnValuetask);
try {
	Object s = future.get();
} catch (InterruptedException e | ExecutionException e) {
	//处理无法执行任务异常
} finally {
	//关闭线程池
	executor.shutdown();
}
8.1.3.2 关闭线程池的方法

可以调用线程池的 shutdownshutdownNow 方法来关闭线程池

shutdown 不会立即停止线程池,而是等待所有任务执行完毕后再关闭线程池

java
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> System.out.println("Task 1"));
executor.execute(() -> System.out.println("Task 2"));

executor.shutdown(); //不会立刻关闭,而是等待所有任务执行完毕

shutdownNow 会尝试通过一系列动作来停止线程池,包括停止接收外部提交的任务、忽略队列里等待的任务、尝试将正在跑的任务 interrupt 中断

java
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> {
	try {
		Thread.sleep(5000); //模拟长时间运行任务
		System.out.println("Task executed");
	} catch (InterruptedException e) {
		System.out.println("任务被中断";)
	}
});

List<Runnable> unexecutedTasks = executor.shutdownNow(); //立即关闭线程池
System.out.println("未执行的任务数: " + unexecutedTasks.size());

需要注意的是,shutdownNow 不会真正终止正在运行的任务,只是给任务线程发送 interrupt 信号,任务是否能真正终止取决于线程是否响应 InterruptedException

8.1.3.4 其它方法

8.1.4 线程池如何实现参数的动态修改

线程池提供的 setter 方法就可以在运行时动态修改参数,比如说 setCorePoolSize 可以用来修改核心线程数、setMaximumPoolSize 可以用来修改最大线程数

需要注意的是,调用 setCorePoolSize() 时如果新的核心线程数比原来的大,线程池会创建新的线程;如果更小线程池不会立即销毁多余的线程,除非有空闲线程超过 keepAliveTime

当然了,还可以利用 Nacos 配置中心,或者实现自定义的线程池,监听参数变化去动态调整参数

8.1.5 线程池在使用的时候需要注意什么

第一,选择合适的线程池大小,过小的线程池可能会导致任务一直在排队;过大的线程池可能会导致大家都在竞争 CPU 资源,增加上下文切换的开销

第二,选择合适的任务队列,使用有界队列可以避免资源耗尽的风险,但是可能会导致任务被拒绝;使用无界队列虽然可以避免任务被拒绝,但是可能会导致内存耗尽

第三,尽量使用自定义的线程池,而不是使用 Executors 创建的线程池,因为 newFixedThreadPool 线程池由于使用了 LinkedBlockingQueue,队列的容量默认无限大,任务过多时会导致内存溢出;newCachedThreadPoo 线程池由于核心线程数无限大,当任务过多的时候会导致创建大量的线程,导致服务器负载过高宕机

8.1.6 线程池执行中断电了应该怎么处理

线程池本身只能在内存中进行任务调度,并不会持久化,一旦断电,线程池里的所有任务和状态都会丢失

可以从以下几个方面考虑:

第一,持久化任务,可以将任务持久化到数据库或者消息队列中,等电恢复后再重新执行 第二,任务幂等性,需要保证任务是幂等的,也就是无论执行多少次,结果都一致 第三,恢复策略,当系统重启时,应该有一个恢复流程,检测上次是否有未完成的任务,将这些任务重新加载到线程池中执行,确保断电前的工作能够恢复

8.2 Worker Thread

8.2.1 定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务,也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了

注意,不同的任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的工人既要招呼客人(任务类型 A),又要到后厨做菜(任务类型 B)显然效率不咋地,分成服务员(线程池 A)与厨师(线程池 B)更为合理

8.2.2 饥饿

固定大小线程池会有饥饿现象

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:做菜
  • 比如工人 A 处理了点餐任务,接下来它要等着工人 B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,这时没人做饭了,饥饿
java
public class TestDeadLock {
	static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
	static Random RANDOM = new Random();
	static String cooking() {
		return MENU.get(RANDOM.nextInt(MENU.size()));
	}
	
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		
		executorService.execute(() -> {
			log.debug("处理点餐...");
			Future<String> f = executorService.submit(() -> {
				log.debug("做菜");
				return cooking();
			});
			try {
				log.debug("上菜: {}", f.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		});
		
		/*executorService.execute(() -> {
			log.debug("处理点餐...");
			Future<String> f = executorService.submit(() -> {
				log.debug("做菜");
				return cooking();
			});
			try {
				log.debug("上菜: {}", f.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		});*/
		
	}
}

有注释时输出:

当注释取消后,可能会输出:

解决方法可以是增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:

java
public class TestDeadLock {
	static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
	static Random RANDOM = new Random();
	static String cooking() {
		return MENU.get(RANDOM.nextInt(MENU.size()));
	}
	
	public static void main(String[] args) {
		ExecutorService waiterPool = Executors.newFixedThreadPool(1);
		ExecutorService cookPool = Executors.newFixedThreadPool(1);
		
		waiterPool.execute(() -> {
			log.debug("处理点餐...");
			Future<String> f = cookPool.submit(() -> {
				log.debug("做菜");
				return cooking();
			});
			try {
				log.debug("上菜: {}", f.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		});
		
		waiterPool.execute(() -> {
			log.debug("处理点餐...");
			Future<String> f = cookPool.submit(() -> {
				log.debug("做菜");
				return cooking();
			});
			try {
				log.debug("上菜: {}", f.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		});
		
	}
}

8.3 线程池的线程数应该怎么配置

  • 过小会导致程序不能充分的利用系统资源,容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存

8.3.1 CPU 密集型

对于 CPU 密集型任务,目标是尽量减少线程上下文切换,通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当前线程由于页缺失故障或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

8.3.2 IO 密集型

对于 IO 密集型任务,由于线程经常处于等待状态,等待 IO 操作完成,所以 CPU 不总是处于繁忙状态,所以可以设置更多的线程来提高并发,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 IO 操作、远程 RPC 调用、数据库操作时,这时候 CPU 就闲下来了,可以利用多线程提高它的利用率,通过设置为 CPU 核心数的两倍

经验公式如下:

8.4 任务调度线程池

在任务调度线程池功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务

java
public static void main(String[] args) {
	Timer timer = new Timer();
	TimerTask task1 = new TimerTask() {
		@Override
		public void run() {
			log.debug("task 1");
			sleep(2);
		}
	};
	TimerTask task2 = new TimerTask() {
		@Override
		public void run() {
			log.debug("task 2");
		}
	};
	//使用 timer 添加两个任务,希望它们都在 1s 后执行
	//但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
	timer.schedule(task1, 1000);
	timer.schedule(task2, 1000);
}

todo

8.5 Fork/Join

第 9 章 JUC

9.1 AQS

9.1.1 AQS 定义

AQS 是 Java 并发包 (java.util.concurrent.locks) 中一个用于构建同步器的框架。你可以把它想象成一个万能模板或地基

几乎所有你在 JUC 包里看到的同步器,其底层实现都依赖于 AQS,包括:

  • ReentrantLock (可重入锁)
  • ReentrantReadWriteLock (可重入读写锁)
  • Semaphore (信号量)
  • CountDownLatch (倒计时门闩)
  • ThreadPoolExecutor 中的 Worker (线程池中的工作线程也用它来管理状态)

AQS 提供了一个通用的机制,来管理以下两个关键部分:

  1. 同步状态 (State):用一个 volatile 修饰的 int 成员变量 state 来表示资源的状态。
    • 对于锁:state=0 表示锁未被占用,state=1 表示已被占用,state>1 表示被同一个线程重入。
    • 对于信号量:state 表示可用的许可数量。
    • 对于 CountDownLatch:state 表示还需要倒计数的数量。
  2. 线程等待队列:一个先入先出 (FIFO) 的双向链表(CLH队列的变体)。当一个线程尝试获取资源失败时(比如尝试加锁但锁已被别人持有),AQS 会将这个线程包装成一个节点 (Node) 并放入队列中阻塞等待。当持有资源的线程释放资源时,AQS 会负责唤醒队列中下一个等待的线程。

AQS 的工作模式:模板方法设计模式:

AQS 使用了模板方法模式。它定义了构建一个同步器所需的核心流程和骨架,而将一些特定化的操作留给子类去实现。

AQS 提供的主要方法:

  • getState(): 获取当前同步状态。
  • setState(int newState): 设置同步状态。
  • compareAndSetState(int expect, int update): 使用 CAS 原子操作设置状态,这是实现并发安全的基础。

需要子类重写的关键方法 (protected):

  • tryAcquire(int arg)尝试独占式获取资源。成功返回 true,失败返回 false。
  • tryRelease(int arg)尝试独占式释放资源。成功返回 true,失败返回 false。
  • tryAcquireShared(int arg)尝试共享式获取资源。返回负数表示失败;0表示成功,但无剩余资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int arg)尝试共享式释放资源。如果释放后允许唤醒后续等待节点返回 true,否则返回 false。

开发者要做一个新的同步器怎么办?

只需要继承 AQS,然后根据你的需求(是独占锁还是共享锁?),重写上面那几个 tryXXX 方法,来定义“如何获取和释放状态”的规则。而线程排队、阻塞、唤醒等复杂且易错的底层操作,AQS 已经在父类中帮你完美实现了。

一句话概括:

AQS 是 JDK 提供的一个基础框架,用于高效地构建各种类型的同步器(锁和其他同步工具)。它解决了同步器设计中最复杂的部分(线程排队、阻塞、唤醒),让开发者可以专注于定义资源访问的规则。

9.1.2 AQS 原理

AQS 的核心原理可以概括为三点:一个状态一个队列一套模板方法

AQS 是一个用于构建锁和同步器的框架。它使用了模板方法模式,将复杂的线程等待、排队、唤醒等底层操作封装起来,只暴露出几个关键的方法(如tryAcquire)让子类去实现自定义的资源获取和释放规则。

它的设计目标是:如果一个线程请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制AQS是用 CLH队列锁的变体来实现的,即将暂时获取不到锁的线程加入到队列中。

AQS 三大核心组件:

  1. 一个状态:state (volatile int)

这是一个使用volatile修饰的关键字段,表示共享资源的状态。所有并发问题的核心,其实就是对这个state值的原子性操作和同步

  • 对于锁 (如 ReentrantLock):
    • state = 0:表示锁未被任何线程持有。
    • state = 1:表示锁已被一个线程持有。
    • state > 1:表示锁被同一个线程重入了多次(可重入性的体现)。
  • 对于信号量 (如 Semaphore):
    • state 表示可用的许可数量acquire() 会减少 staterelease() 会增加 state
  • 对于计数器 (如 CountDownLatch):
    • state 表示需要倒计数的数量countDown() 会减少 stateawait() 会等待 state 变为0。

state的修改采用 CAS 操作,保证了原子性和可见性。

  1. 一个队列:CLH 变体的同步队列

这是一个 FIFO 的双向链表(不再是严格的CLH单向链表),用于存放所有等待资源的线程。当线程获取资源失败时,AQS 会将该线程以及当前的等待状态封装成一个 Node 节点,并将其加入队列尾部。

  • Node 节点:包含了需要同步的线程本身、线程的等待状态(WAITINGCANCELLED等)、前驱节点(prev)和后继节点(next)。
  • 队列头尾:AQS持有队列的头节点(head)和尾节点(tail)的引用。头节点可以理解为当前正持有锁的线程节点

这个队列是AQS实现阻塞锁公平锁的关键。

  1. 一套模板方法:获取与释放资源

这是AQS提供给外部的API,也是子类继承时需要依赖的骨架。主要分为独占共享两种模式。

  • 独占模式 (Exclusive):同一时刻只有一个线程能获取资源,如 ReentrantLock
    • acquire(int arg):模板方法。尝试获取资源的主入口。
      1. 调用子类实现的 tryAcquire(arg) 方法尝试直接获取资源。
      2. 如果成功,直接返回。
      3. 如果失败,则调用 addWaiter(Node.EXCLUSIVE) 将当前线程包装成独占模式的Node节点并加入同步队列尾部。
      4. 调用 acquireQueued(...) 让节点在队列中自旋循环尝试获取资源,如果还是失败,则判断是否需要阻塞线程。当被前驱节点的唤醒或线程被中断时,会解除阻塞并再次尝试。
    • release(int arg):模板方法。尝试释放资源的主入口。
      1. 调用子类实现的 tryRelease(arg) 方法尝试释放资源。
      2. 如果释放成功(例如state变为0),则调用 unparkSuccessor(Node node) 方法,唤醒头节点的后继节点(即队列中第一个等待的线程)。
  • 共享模式 (Shared):同一时刻可以有多个线程获取资源,如 SemaphoreCountDownLatch
    • 逻辑与独占模式类似,但调用的是 acquireSharedtryAcquireSharedreleaseSharedtryReleaseShared 等方法。释放资源时,会传播唤醒事件,让多个等待的线程可以同时被唤醒并获取资源。

关键在于tryAcquiretryRelease 等方法在AQS中只是抛出 UnsupportedOperationException需要子类根据具体的同步需求(是锁?是信号量?)来重写这些方法,定义获取和释放资源的具体规则。

AQS 工作流程:(以 ReentrantLock 的独占模式为例)

假设线程A、B、C竞争一把锁。

  1. 初始状态state = 0,队列为空。

  2. 线程A调用lock()

    • 底层调用 AQS.acquire(1)
    • acquire() 先调用子类重写的 tryAcquire(1) 方法。
    • tryAcquire 使用CAS尝试将 state 从0改为1,成功。
    • 线程A获取锁,exclusiveOwnerThread 被设为线程A,直接返回。
  3. 线程B调用lock()

    • 调用 AQS.acquire(1)
    • tryAcquire(1) 失败(因为state已经是1,且当前持有线程是A)。
    • 调用 addWaiter(Node.EXCLUSIVE),将线程B包装成Node,加入同步队列尾部。此时队列:head -> [B] <- tail
    • 调用 acquireQueued,Node B会自旋检查自己是否是头节点的后继节点(即下一个该轮到的节点)。如果是,会再次尝试 tryAcquire。此时还不是,所以最终调用 LockSupport.park() 阻塞线程B
  4. 线程C调用lock()

    • 同样,tryAcquire 失败。
    • 被加入队列尾部并阻塞。此时队列:head -> [B] -> [C] <- tail
  5. 线程A调用unlock()

    • 底层调用 AQS.release(1)
    • release() 调用子类重写的 tryRelease(1),将 state 减1,成功变为0。
    • 调用 unparkSuccessor(Node node),找到头节点的下一个节点(Node B),使用 LockSupport.unpark(threadB) 唤醒线程B
  6. 线程B被唤醒后

    • 在之前阻塞的地方(acquireQueued循环中)恢复运行。
    • 再次尝试 tryAcquire(1),这次成功(因为state又变回0了)。
    • 线程B获取锁成功,将自己所在的Node设置为新的头节点(原头节点出队)。此时队列:head -> [C] <- tail
  7. 后续:线程B释放锁时会唤醒线程C,重复上述过程。

AQS的原理就像一个高度抽象和可定制的资源分配管理器

  1. 状态管理:用一个 state 变量来量化资源。
  2. 队列管理:用一个FIFO队列来公平地管理所有无法立即获取资源的线程,让它们排队等待。
  3. 模板设计:将“如何获取/释放资源”的规则交给子类定义,而自己负责“线程排队、阻塞、唤醒”的底层机制。

9.1.3 AQS 深入理解

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之,这显然不够优雅,于是在 JSR166 中创建了 AQS,提供了这种通用的同步器机制

AQS 要实现的功能目标:

  • 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
  • 获取锁超时机制
  • 通过打断取消机制
  • 独占机制及共享机制
  • 条件不满足时的等待机制

AQS 的基本思想其实很简单:

  1. 获取锁的逻辑
java
while(state 状态不允许获取) {
	if(队列中还没有此线程) {
		入队并阻塞
	}
}
当前线程出队
  1. 释放锁的逻辑
java
if(state 状态允许了) {
	恢复阻塞的线程
}

要点:

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列

设计:

  1. state 设计
  • state 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
  1. 阻塞恢复设计
  • 早期控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用 resume,那么 suspend 将感知不到
  • 解决方法就是使用 park unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题
  • park unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程还可以通过 interrupt 打断
  1. 队列设计
  • 使用了 FIFO 先入先出队列,并不支持优先级队列
  • 设计时借鉴了 CLH 队列,它是一种单向无锁队列

队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态

入队伪代码:

java
do {
	//原来的 tail
	Node prev = tail;
} while (tail.compareAndSet(prev, node)) //用 cas 在原来 tail 的基础上改为 node

出队伪代码:

java
//prev 是上一个节点
while((Node prev = node.prev).state != 唤醒状态) {
}
//设置头节点
head = node;

CLH 的好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 在一些方面改进了 CLH:

java
private Node enq(final Node node) {
	for(;;) {
		Node t = tail;
		//队列中还没有元素 tail 为 null
		if (t == null) {
			//将 head 从 null -> dummy
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			//将 node 的 prev 设置为原来的 tail
			node.prev = t;
			//将 tail 从原来的 tail 设置为 node
			if (compareAndSetTail(t. node)) {
				//原来 tail 的 next 设置为 node
				t.next = node;
				return t;
			}
		}
	}
}

主要用到 AQS 的并发工具类:

9.2 ReentrantLock 原理

ReentrantLock 是基于 AQS 框架实现的一种可重入的独占锁,它的名字就揭示了它的两个核心特性:

  • Reentrant(可重入性):同一个线程可以多次获取同一把锁,例如:在递归调用中或者在一个 synchronized 方法中调用另一个 synchronized 方法时,线程可以重复进入被它自己锁住的代码区域,ReentrantLock 也支持这个特性,每次获取锁后,内部会有一个计数器加一,每次释放锁,计数器减一,只有当计数器归零时,锁才被完全释放,其它线程才能获取
  • Lock(显式锁):它的加锁 lock() 和解锁 unlock() 操作都是显式的,需要程序员手动编写代码控制,这相比于 synchronized 的隐式加解锁,提供了更大的灵活性,但也要求更高的责任心(通常必须在 finally 块中解锁)

要理解 ReentrantLock,必须先理解 AQS,AQS 是 JUC 包中几乎所有同步器(如 Semaphore,CountDownLatch)的底层框架,它解决了一个核心问题:

  • 如何在状态位被占有时,高效且线程安全地管理阻塞线程的队列?

AQS 的核心是一个 FIFO 的双向队列(CLH 队列的变体)和一个用 volatile 修饰的 state 状态位:

  • state 状态变量:一个 int 类型的 volatile 变量,表示锁的状态,对于 ReentrantLock,state = 0 表示锁未被任何线程持有,state > 0 表示锁被某个线程持有,并且数值表示该线程重入的次数
  • CLH 队列:一个双向链表结构的队列,用于存放所有等待获取锁的线程,当线程尝试获取锁失败时,它会被包装成一个节点 Node 并加入到这个队列中排队等待

AQS 采用了模板方法模式,它将大量的并发控制逻辑(如队列的维护、线程的阻塞与唤醒)已经实现好,只留下了几个关键方法供子类重写:

  • tryAcquire(int arg):尝试以独占的方式获取资源,成功返回 true,失败返回 false,ReentrantLock 在这里实现了它的重入逻辑
  • tryRelease(int arg):尝试以独占的方式释放资源,成功返回 true,失败返回 false,ReentrantLock 在这里实现重入次数的减少和锁的完全释放
  • 对于共享锁还有 tryAcquireShared 和 tryReleaseShared,ReentrantLock 用不到

ReentrantLock 的核心就是通过内部类 Sync、FairSync 和 NonfairSync 来继承 AQS 并重写这些方法,[[详解这句话]]

获取锁 lock() 的详细流程:

我们以默认的非公平锁为例,拆解 lock() 方法的调用过程:

java
final void lock() {
    if (compareAndSetState(0, 1)) // 1. 直接尝试“插队”CAS抢锁
        setExclusiveOwnerThread(Thread.currentThread()); // 抢成功,设置当前线程为锁所有者
    else
        acquire(1); // 2. 抢失败,进入AQS标准获取流程
}
  1. 没有竞争时:

线程调用 lock() 时,不会先去看队列,而是直接尝试用 CAS (Compare-And-Swap) 操作将 state 从 0 改为 1。如果成功,说明锁之前是空闲的,它瞬间就抢到了锁,非常高效。这是一种非公平的体现,因为它没管队列里是否已经有线程在排队等待。

  1. 如果快速抢锁失败(CAS 失败,说明 state 已经不是 0,锁被人占了),则调用 acquire(1):
java
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 2.1 再次尝试获取一次(非公平的又一次体现)
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2.2 如果还是失败,就入队并可能阻塞
        selfInterrupt(); // 如果在阻塞等待过程中被中断了,补上中断标志
}

tryAcquire(arg):这里调用了子类 NonfairSync 重写的 tryAcquire 方法:

它会再次尝试获取锁(前面 CAS 获取过一次,因为没获取到进入 tryAcquire(arg) 再获取一次):

  • 如果 state 为 0,再尝试用 CAS 抢一次
  • 如果 state 不为 0,但当前持有锁的线程就是自己,那么 state + 1(重入),返回 true
  • 否则,获取失败,返回 false,接下来进入 addWaiter 方法,构造 Node 队列

addWaiter(Node.EXCLUSIVE):前面 tryAcquire 失败,说明确实抢不到锁了,这时 AQS 会调用 addWaiter 方法将当前线程包装成一个独占模式(EXCLUSIVE)的 Node 节点,并用 CAS 操作将这个节点快速添加到等待队列的尾部

  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

acquireQueued(final Node node, int arg):这是最复杂的部分,是线程入队后阻塞等待的核心逻辑:

  • 在一个自旋循环里,线程会检查自己的前一个节点是不是头节点(head)。只有头节点的下一个节点(即第二个节点)有资格尝试获取锁。
  • 如果自己是第二个节点,就会再次调用 tryAcquire 尝试获取锁。如果成功,将自己设置为新的头节点,并返回。
  • 如果自己不是第二个节点,或者尝试获取又失败了,则会调用 shouldParkAfterFailedAcquire 方法,检查是否需要阻塞当前线程(通常是将前驱节点的 waitStatus 标记为 SIGNAL(-1),表示“你释放锁后需要唤醒我”,返回 false,表示不应该 park)。

  • shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued,再次 tryAcquire 尝试获取锁,当然这次 state 仍为 1,失败
  • 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true,则调用 parkAndCheckInterrupt() 方法,底层使用 LockSupport.park() 将当前线程挂起(阻塞),等待被唤醒,阻塞用灰色表示

  • 当有多个线程经历上述竞争失败,变成这个样子:

  • 当线程被唤醒后(通常是持有锁的线程释放锁时唤醒它),它会再次在自旋中检查自己是否是第二个节点,并尝试获取锁。

释放锁 unlock() 的详细流程:

Thread-0 释放锁,进入 tryRelease 流程,如果成功:

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程:

找到队列中离 head 最近的一个 Node,unpark 恢复其运行,本例中即为 Thread-1,回到 Thread-1 的 acquireQueued 流程:

如果加锁成功(没有竞争),会设置:

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平),例如这时有 Thread-4 来了:

如果不巧又被 Thread-4 抢了先:

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

9.2.1 源码

加锁源码:todo

解锁源码:todo

可重入原理:todo

可打断原理:todo

公平锁实现原理:todo

9.3 条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

await 流程:

刚开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁:

unpark AQS 队列中的下一个节点,竞争锁,假设没有其它竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0:

signal 流程:

假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1

Thread-1 释放锁,进入 unlock 流程

9.3.1 源码

todo

9.4 读写锁 ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能

提供一个数据容器类,内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

java
class DataContainer {
	private Object data;
	private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
	private ReentrantReadWriteLock.ReadLock r = rw.readLock();
	private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
	
	public Object read() {
		log.debug("获取读锁...");
		r.lock();
		try {
			log.debug("读取");
			sleep(1);
			return data;
		} finally {
			log.debug("释放读锁...");
			r.unlock();
		}
	}
	
	public void write() {
		log.debug("获取写锁...");
		w.lock();
		try {
			log.debug("写入");
			sleep(1);
		} finally {
			log.debug("释放写锁...");
			w.unlock();
		}
	}
}

测试读锁-读锁可以并发

java
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
	dataContainer.read();
}, "t1").start();

new Thread(() -> {
	dataContainer.read();
}, "t2").start();

输出:可以看出 Thread-0 锁定期间,Thread-1 的读操作不受影响:

测试读锁-写锁相互阻塞:

java
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
	dataContainer.read();
}, "t1").start();

Thread.sleep(100);
new Thread(() -> {
	dataContainer.write();
}, "t2").start();

输出:可以看出读锁与写锁相互阻塞

测试写锁-写锁相互阻塞:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  • 重入时降级支持:即持有写锁的情况下去获取读锁
java
class CachedData {
	Object data;
	//是否有效,如果失效,需要重新计算 data
	volatile boolean cacheValid;
	final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	void processCachedData() {
		rwl.readLock().lock();
		if (!cacheValid) {
			//获取写锁前必须释放读锁
			rwl.readLock().unlock();
			rwl.writeLock().lock();
			try {
				//判断是否有其它线程已经获取了写锁,更新了缓存,避免重复更新
				if (!cacheValid) {
					data = ...;
					cacheValid = true;
				}
				//降级为读锁,释放写锁,这样能够让其它线程读取缓存
				rwl.readLock().lock();
			} finally {
				rwl.writeLock().unlock();
			}
		}
		//自己用完数据,释放读锁
		try {
			use(data);
		} finally {
			rwl.readLock().unlock();
		}
	}
}

9.4.1 原理

普通的互斥锁如 synchronized 或 ReentrantLock 只允许一个线程访问共享资源,无论这个线程是读操作还是写操作,但在很多实际场景中,读操作通常不会修改数据,是线程安全的,可以并发进行,而写操作会修改数据,必须互斥进行

读写锁就是基于这种读写分离的思想设计的,它指定了如下规则:

  • 读-读不互斥:多个线程可以同时持有读锁,进行并发读取
  • 读-写互斥:如果一个线程已经持有读锁,则其它请求写锁的线程必须等待,直到所有读锁被释放,反之,如果一个线程已经持有写锁,则其它请求读锁或写锁的线程必须等待
  • 写-写互斥:同一时刻只允许一个线程持有写锁

和 ReentrantLock 一样,ReentrantReadWriteLock 也是基于 AQS (AbstractQueuedSynchronizer) 实现的。这是理解其原理的关键。

面临的挑战是:AQS 只有一个 int 类型的 state 状态变量。如何用一个变量同时表示读锁的数量和写锁的状态?

解决方案:按位切割使用 state

ReentrantReadWriteLock 巧妙地将一个 32 位的 int 类型的 state 变量拆成了两部分:

  • 高 16 位:用来表示读锁的状态(即当前所有线程获取读锁的总次数)。因为读锁是共享的,多个线程都可以获取,所以这里记录的是重入次数的总和。
  • 低 16 位:用来表示写锁的状态(即写锁的重入次数)。因为写锁是独占的,所以它的重入次数就是持有写锁的线程的重入次数。

例如:

  • state = 0:锁未被任何线程持有。
  • state = 0x00000001:低 16 位为 1,表示写锁被持有 1 次。
  • state = 0x00030000:高 16 位为 3,表示读锁被总共获取了 3 次(可能是一个线程重入 3 次,也可能是三个线程各获取 1 次)。
  • state = 0x00030001:读写锁都被持有,这是不允许的,违反了读-写互斥规则。

通过位运算可以轻松地分离和操作这两部分:

  • 获取写锁状态:state & 0x0000FFFF(掩码操作,保留低16位)
  • 获取读锁状态:state >>> 16(无符号右移16位)
  • 计算新的 state:(读状态 << 16) | 写状态
9.4.1.1 写锁(WriteLock)的获取与释放

写锁是独占锁,原理与 ReentrantLock 非常相似。

获取写锁 (writeLock().lock()):

  1. 检查状态:首先检查 state。
  • 如果 state != 0,说明有锁被持有(可能是读锁,也可能是写锁)。
  1. 判断是否获取成功:
  • 成功条件 1:state == 0(锁完全空闲),那么当前线程可以尝试 CAS 获取写锁。
  • 成功条件 2:state != 0,但写锁状态不为零 (低16位 != 0),并且当前线程正是写锁的持有者(可重入)。此时只需增加写状态即可。
  1. 失败入队:如果上述条件都不满足(例如,有其他线程持有读锁或写锁),则获取失败。当前线程会被包装成一个独占模式(EXCLUSIVE)的 Node 加入到 AQS 的 CLH 队列中排队,并可能被挂起等待。

释放写锁 (writeLock().unlock()):

  1. 检查当前线程是否是写锁的持有者,否则抛出异常。
  2. 将写状态(低 16 位)减 1。
  3. 如果减后写状态为 0,则表示写锁被完全释放,将独占线程设为 null。
  4. 唤醒队列中等待的线程(可能是下一个写锁线程,也可能是读锁线程——如果队列头节点后面有连续的读锁等待节点)。
9.4.1.2 读锁(ReadLock)的获取与释放

读锁是共享锁,这是最复杂也是最具特色的部分。

获取读锁 (readLock().lock()):

  1. 检查写锁:如果写锁状态不为 0 (低16位 != 0),且持有写锁的线程不是当前线程,则获取失败。(读-写互斥,写锁优先) 如果持有写锁的就是自己,那么可以获取读锁,这涉及“锁降级”,后面会讲。

  2. 判断是否获取成功:

  • 读锁此时是否应该阻塞?对于公平锁,会检查队列中是否有比自己更早的等待者;对于非公平锁,可以尝试直接获取。
  • 检查读锁数量是否超过上限(65535)。
  • 如果条件允许,则尝试用 CAS 更新 高 16 位的状态(即读状态 + 1)。
  1. 成功后的记录:获取读锁成功后,需要在当前线程的 ThreadLocal 中记录重入次数。这是因为读锁是共享的,多个线程都可以获取,AQS 的 state 高 16 位只记录了全局的读锁总次数,而每个线程自己重入了多少次,需要自己单独记录,以便后续正确释放。

  2. 失败入队:如果获取失败,线程会被包装成一个共享模式(SHARED)的 Node 加入到 CLH 队列中排队等待。

释放读锁 (readLock().unlock()):

  1. 获取当前线程的 ThreadLocal 中记录的重入次数。

  2. 计算新的 state:将全局 state 的高 16 位(读状态)减去相应的值。

  3. 使用 CAS 更新 state。

  4. 如果更新后的 state 为 0,说明读锁和写锁都完全释放了,然后会唤醒队列中后续的等待线程。

9.4.1.3 锁降级 (Lock Downgrading)

这是一个重要的特性:允许一个已经持有写锁的线程,继续获取读锁,然后再释放写锁的过程。

java
// 锁降级的示例代码
writeLock.lock(); // 获取写锁
try {
    // 修改数据...
    readLock.lock(); // 在释放写锁前获取读锁 -> 锁降级开始
} finally {
    writeLock.unlock(); // 释放写锁 -> 锁降级完成。此时本线程还持有读锁
}
try {
    // ... 读取数据,其他读线程也可以并发读取了
} finally {
    readLock.unlock(); // 最终释放读锁
}

为什么需要锁降级?

主要是为了保证数据的可见性。如果在写完数据后直接释放写锁,那么在释放写锁和获取读锁之间的极短间隙内,可能有另一个写线程获取了写锁并修改了数据,这样当前线程之后读到的就是脏数据。锁降级保证了当前线程在释放写锁后,依然持有读锁,使得数据对自己永远是可见的,从而避免了其他写线程的干扰。

注意:读写锁不支持锁升级(读锁 -> 写锁)!因为多个线程可能同时持有读锁,如果其中一个线程试图升级为写锁,它必须等待所有其他读锁释放,这很容易造成死锁。

核心原理总结:

ReentrantReadWriteLock 通过按位切割 AQS 的 state 变量,巧妙地用一个整数同时管理了读锁和写锁的状态。它通过共享模式和独占模式的配合,以及 ThreadLocal 的辅助,实现了“读-读共享、读-写互斥、写-写互斥”的复杂同步规则,从而在保证数据安全的前提下,显著提升了读操作的并发性能。

9.4.2 源码

9.5 StampedLock

StampedLock 是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合戳使用

加解读锁:

java
long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁:

java
long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读:StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次戳校验如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全

java
long stamp = lock.tryOptimisticRead();
//验戳
if (!lock.validate(stamp)) {
	//锁升级
}

[[StampedLock 详解]]

9.6 Semaphore

Semaphore 是信号量,用来限制能同时访问共享资源的线程上限

java
public static void main(String[] args) {
	//1. 创建 Semaphore 对象
	Semaphore semaphore = new Semaphore(3);
	
	//2. 10 个线程同时运行
	for (int i = 0; i < 10; i++) {
		new Thread(() -> {
			//3. 获取许可
			try {
				semaphore.acquire();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				log.debug("running...");
				sleep(1);
				log.debug("end...");
			} finally {
				//4. 释放许可
				semaphore.release();
			}
		}).start();
	}
}

输出:

9.6.1 限制对共享资源的使用

使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数

用 Semaphore 实现简单连接池,对比享元模式下的实现(用 wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接池数是相等的

java
class Pool {
	//1. 连接池大小
	private final int poolSize;
	
	//2. 连接对象数组
	private Connection[] connections;
	
	//3. 连接状态数组 0 表示空闲,1 表示繁忙
	private AtomicIntegerArray states;
	
	private Semaphore semaphore;
	
	//4. 构造方法初始化
	public Pool(int poolSize) {
		this.poolSize = poolSize;
		//让许可数与资源数一致
		this.semaphore = new Semaphore(poolSize);
		this.connections = new Connection[poolSize];
		this.states = new AtomicIntegerArray(new int[poolSize]);
		for (int i = 0; i < poolSize; i++) {
			connections[i] = new MockConnection("连接" + (i + 1));
		}
	}
	
	//5. 借连接
	public Connection borrow() {
		//获取许可
		try {
			semaphore.acquire(); //没有许可的线程,在此等待
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		for (int i = 0; i < poolSize; i++) {
			//获取空闲连接
			if (states.get(i) == 0) {
				if (states.compareAndSet(i, 0, 1)) {
					log.debug("borrow {}", connections[i]);
					return connections[i];
				}
			}
		}
		//不会执行到这里
		return null;
	}
	
	//6. 归还连接
	public void free(Connection conn) {
		for (int i = 0; i < poolSize; i++) {
			if (connections[i] == conn) {
				states.set(i, 0);
				log.debug("free {}", conn);
				semaphore.release();
				break;
			}
		}
	}
}

9.6.2 原理

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

  • 刚开始,permits(state)为 3,这时 5 个线程来获取资源

  • 假设其中 Thread-1、Thread-2、Thread-4 CAS 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

  • 这时 Thread-4 释放了 permits,状态如下:

  • 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

9.6.3 源码

9.6.4 为什么要有 propagate

9.7 CountDownLatch

用来进行线程同步协作,等待所有线程完成倒计时

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

java
public static void main(String[] args) throws InterruptedException {
	CountDownLatch latch = new CountDownLatch(3);
	
	new Thread(() -> {
		log.debug("begin...");
		sleep(1);
		latch.countDown();
		log.debug("end...{}", latch.getCount());
	}).start();
	
	new Thread(() -> {
		log.debug("begin...");
		sleep(2);
		latch.countDown();
		log.debug("end...{}", latch.getCount());
	}).start();
	
	new Thread(() -> {
		log.debug("begin...");
		sleep(1.5);
		latch.countDown();
		log.debug("end...{}", latch.getCount());
	}).start();
	
	log.debug("waiting...");
	latch.await();
	log.debug("wait end...");
	
}

输出:

9.7.1 同步等待多线程准备完毕

java
AtomicInteger num = new AtomicInteger(0);

ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
	return new Thread(r, "t" + num.getAndIncrement());
});

CountDownLatch latch = new CountDownLatch(10);

String[] all = new String[10];

Random r = new Random();

for (int j = 0; j < 10; j++) {
	int x = j;
	service.submit(() -> {
		for (int i = 0; i <= 100; i++) {
			try {
				Thread.sleep(r.nextInt(100));
			} catch (InterruptedException e) {
			}
			all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
			System.out.print("\r" + Arrays.toString(all));
		}
		latch.countDown();
	});
}
latch.await();
System.out.println("\n游戏开始...");
service.shutdown();

中间输出:

最后输出:

9.8 CycliBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数,构造时设置计数个数,每个线程执行到某个需要同步的时刻调用 await() 方法进行等待,当等待的线程数满足计数个数时,继续执行

java
CyclicBarrier cb = new CyclicBarrier(2); //个数为 2 时才会继续执行

new Thread(() -> {
	System.out.println("线程 1 开始.." + new Date());
	try {
		cb.await(); //当个数不足时,等待
	} catch (InterruptedException | BrokenBarrierException e) {
		e.printStackTrace()'
	}
	System.out.println("线程 1 继续向下运行..." + new Date());
}).start();

new Thread(() -> {
	System.out.println("线程 2 开始.." + new Date());
	try {
		Thread.sleep(2000);
	} catch (InterruptedException e) {
	
	}
	try {
		cb.await(); //2 秒后,线程个数够 2,继续运行
	} catch (InterruptedException | BrokenBarrierException e) {
		e.printStackTrace()'
	}
	System.out.println("线程 2 继续向下运行..." + new Date());
}).start();

9.9 CyclicBarrier 与 CountDownLatch 的区别

CyclicBarrier 让所有线程相互等待,全部到达后再继续,CountDownLatch 让主线程等待所有子线程执行完再继续

[[CyclicBarrier 与 CountDownLatch 的区别]]

补:Exchanger

Exchanger 交换者,用于在两个线程之间进行数据交换

支持双向数据交换,比如说线程 A 调用 exchange(dataA),线程 B 调用 exchange(dataB),它们会在同步点交换数据,即 A 得到 B 的数据,B 得到 A 的数据

如果一个线程先调用 exchange(),它会阻塞等待,直到另一个线程也调用 exchange()

使用 Exchanger 的时候,需要先创建一个 Exchanger 对象,然后在两个线程中调用 exchange() 方法,就可以进行数据交换了

9.10 线程安全的集合类

遗留的安全集合:

  • Hashtable
  • Vector

使用 Collections 修饰的线程安全集合,这些集合的底层实现借助修饰器模式,把不安全的集合当做构造器参数传入后,每次调用操作集合的方法实际会在方法上加上 synchronized 锁,所以这类线程安全的集合开销比较大:

  • Collections.synchronizedCollection
  • Collections.synchronizedList
  • Collections.synchronizedMap
  • Collections.synchronizedSet
  • Collections.synchronizedNavigableMap
  • Collections.synchronizedNavigableSet
  • Collections.synchronizedSortedMap
  • Collections.synchronizedSortedSet

JUC 安全集合:

  • Blocking 类:大部分实现基于锁,并提供用来阻塞的方法
  • CopyOnWrite 类:写时复制技术,这类容器对于写操作开销相对较重
  • Concurrent 类:
    • 内部很多操作使用 CAS 优化,一般可以提供较高吞吐量
    • 缺点:弱一致性:
      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
      • 求大小的操作时弱一致性的,size 操作未必是 100% 准确
      • 读取操作弱一致性

对于非安全容器来讲,遍历时如果发生了修改,使用 fail-fast(立刻失败) 机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历

9.10.1 ConcurrentHashMap

9.10.1.1 JDK7 HashMap 的并发死链问题

[[并发死链]]

9.10.1.2 ConcurrentHashMap 原理

设计思想的演进:从分段锁到 CAS + Synchronized

  1. JDK 7 的分段锁(Segment Locking)
  • 原理:CHM(ConcurrentHashMap) 内部维护了一个 Segment 数组,每个 Segment 本质上就是一个独立的、小的 HashMap,每次操作只锁住其中一个 Segment,而不是整个表
  • 优点:相比 Hashtable 的全局锁,并发度大大提升(默认 16 个段,并发度就是 16)
  • 缺点:设计稍显繁琐,在某些场景下性能并非最优
  1. JDK 8 的全新设计(CAS + Synchronized)
  • 抛弃了分段锁,采用了与 HashMap 更相似的结构:Node 数组 + 链表 + 红黑树
  • 核心并发策略:
    • CAS:一种乐观锁,用于无锁化的初始化数组、插入头节点、扩容等操作,没有线程争用时性能极高
    • Synchronized:用于锁定单个桶(链表头节点/红黑树根节点),锁的粒度从 JDK 7 的一个段(包含多个桶)细化到了一个桶,并发度理论上等于数组的长度,极大的降低了线程冲突的概率
    • 同时利用了 volatile 关键字保证内存可见性

核心源码解析(基于 JDK 8+)

  1. 重要的属性与节点类
java
// 核心表数组,懒初始化,大小总是 2 的幂次。
transient volatile Node<K,V>[] table;

//扩容时的新 hash 表
private transient volatile Node<K, V>[] nextTable;

// 控制标识符,用于扩容和控制初始化。
// 负数:表示正在初始化或扩容。初始化时为 -1,扩容时为 -(1 + 扩容线程数)
// 正数:下一次扩容的阈值。
private transient volatile int sizeCtl;

// 基本的节点类,用于链表。注意 val 和 next 都是 volatile 的!
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;    // 保证可见性
    volatile Node<K,V> next; // 保证可见性
    // ... 
}

// 用于红黑树的节点类
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // 删除时需要取消链接
    boolean red;
    // ...
}

// 一个特殊的节点类,在扩容时用于标记已经迁移的桶位
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null); // hash 值为 MOVED
        this.nextTable = tab;
    }
}
  1. 重要方法
java
//获取 Node[] 中第 i 个 Node
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i)

//CAS 修改 Node[] 中第 i 个 Node 的值,c 为旧值,v 为新值
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v)

//直接修改 Node[] 中第 i 个 Node 的值,v 为新值
static final <K, V> void setTabAt(Node<K, V>[] tab, int i, Node<K, V> v)
  1. 构造器分析

可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

java
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
	if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
		throw new IllegalArgumentException();
	if (initialCapacity < concurrencyLevel)
		initialCapacity = concurrencyLevel;
	long size = (long)(1.0 + (long)initialCapacity / loadFactor);
	// tableSizeFor 仍然是保证计算的大小是 2^n,即 16,32,64 ...
	int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
	this.sizeCtl = cap;
}
  1. get 流程

get 过程核心总结:

  1. 计算哈希,定位桶位
  2. 通过 tabAt() (底层是 Unsafe.getObjectVolatile)安全的获取头节点,保证能读到其它线程的最新写入
  3. 如果头节点匹配,直接返回
  4. 如果头节点是特殊节点(哈希为负),调用其 find 方法,ForwardingNode 的 find 方法会去新数组 nextTable 中查找,TreeBin 的 find 方法会在红黑树中查找,[[理解这句话]]
  5. 否则,遍历链表
  6. 全程无锁,仅通过 volatile 读保证可见性
java
public V get(Object key) { 
	Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 
	// spread 方法能确保返回结果是正数 
	int h = spread(key.hashCode()); 
	if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { 
	// 如果头结点已经是要查找的 key 
		if ((eh = e.hash) == h) { 
			if ((ek = e.key) == key || (ek != null && key.equals(ek))) 
				return e.val; //直接返回
		} 
		// hash 为负数表示该 bin 正在在扩容中(就会去新表查找)或是 treebin(树), 这时调用特殊节点的 find 方法来查找 
		else if (eh < 0) 
			return (p = e.find(h, key)) != null ? p.val : null; 
		// 正常遍历链表, 用 equals 比较 
		while ((e = e.next) != null) { 
			if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
				 return e.val; 
		} 
	} 
	return null; 
}
  1. put(K key, V value) 方法详解:

这是理解 CHM 并发控制的精髓:

java
public V put(K key, V value) {
	return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); // 计算哈希,让其分布更均匀,混合高低位
    int binCount = 0; // 记录链表长度,用于判断是否要树化

    // 使用一个循环,CAS 操作可能失败,需要重试
    for (Node<K,V>[] tab = table;;) {
	    //f 是链表头节点,fh 是链表头节点的 hash,i 是链表在 table 中的下标
        Node<K,V> f; int n, i, fh;

        // 情况1:表还未初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化表(使用 CAS 竞争初始化权),无需 synchronized 创建成功,进入下一轮循环

        // 情况2:目标桶位是空的
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 使用 CAS 无锁化地插入新节点作为头节点
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // CAS 成功,插入完成,直接跳出循环
            // CAS 失败,说明有其他线程抢先插入了,进入下一轮循环重试
        }

        // 情况3:发现特殊节点,说明正在扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f); // 当前线程帮忙一起扩容

        // 情况4:目标桶位不为空,且不在扩容,则锁住这个桶位的头节点
        else {
            V oldVal = null;
            //锁住链表头结点
            synchronized (f) { // 细粒度的锁!只锁住这个桶
                if (tabAt(tab, i) == f) { // 再次验证链表头结点没有被移动
                    if (fh >= 0) { // 头节点哈希大于0,说明是链表
                        binCount = 1;
                        //遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到Key相同的节点,更新Value
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 遍历到链表尾部,插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 如果是红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插入方法,putTreeVal 会看 key 是否已经在树中,是,则返回对应的 TreeNode
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD) // 如果链表长度达到阈值
                    treeifyBin(tab, i); // 将链表转化为红黑树
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 增加计数,并检查是否需要扩容
    return null;
}

put 过程核心总结:

(1)计算哈希,定位桶位 (2)桶为空:用 CAS 无锁插入,失败则循环重试 (3)桶在扩容:当前线程协助扩容 (4)桶有数据:Synchronized 锁住头节点,进行链表或树的插入/更新 (5)插入后判断是否需要树化 (6)最后更新计数,并判断是否需要触发扩容

  1. 初始化表方法 initTable 方法
java
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl < 0 表示有其他线程正在初始化或扩容
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // 让出CPU,等待初始化完成
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS成功,当前线程获得初始化权利
            //获得锁,创建 table,这时其它线程会在 while() 循环中 yield 直至 table 创建
            try {
                // 双重检查,防止其他线程已经初始化完成
                if ((tab = table) == null || tab.length == 0) {
                    // 计算初始容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 计算扩容阈值:n - n/4 = 0.75n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 将sizeCtl设置为扩容阈值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
  1. 计数 addCount() 方法
java
/**
 * 增加计数值,并检查是否需要扩容
 * @param x 要增加的数量(通常为1,在删除操作中可能为负数)
 * @param check 检查扩容的阈值,如果binCount大于等于此值则检查扩容
 */
private final void addCount(long x, int check) {
    CounterCell[] as; 
    long b, s;
    
    // ==================== 第一部分:计数更新 ====================
    
    // 如果counterCells不为null,说明已经存在竞争,需要使用CounterCell
    // 或者如果counterCells为null,但CAS更新baseCount失败,说明出现竞争
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        
        CounterCell a; 
        long v; 
        int m;
        boolean uncontended = true; // 表示是否无竞争
        
        // 检查CounterCell数组是否未初始化,或者长度为空
        // 或者当前线程对应的CounterCell槽位为null
        // 或者CAS更新CounterCell的值失败
        if (as == null || 
            (m = as.length - 1) < 0 || 
            (a = as[getProbe() & m]) == null || 
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            
            // 如果上述条件任一满足,说明竞争激烈,需要完整地处理计数
            fullAddCount(x, uncontended);
            return; // 在fullAddCount中会完成计数,这里直接返回
        }
        
        // 如果check <= 1,说明不需要检查扩容(比如在replace操作中)
        if (check <= 1)
            return;
            
        // 计算当前总元素数量
        s = sumCount();
    }
    
    // ==================== 第二部分:扩容检查 ====================
    
    // 如果check >= 0,需要检查是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; 
        int n, sc;
        
        // 循环检查扩容条件:
        // 1. 当前元素数量s >= 扩容阈值sizeCtl
        // 2. table不为null
        // 3. 当前table长度小于最大容量
        while (s >= (long)(sc = sizeCtl) && 
               (tab = table) != null && 
               (n = tab.length) < MAXIMUM_CAPACITY) {
            
            // 生成当前容量的扩容标识戳
            // resizeStamp(n)返回一个基于n的唯一标识,用于区分不同容量的扩容
            int rs = resizeStamp(n);
            
            // 如果sc < 0,说明已经有其他线程在扩容
            if (sc < 0) {
                // 检查扩容状态是否仍然有效,如果以下任一条件满足则退出:
                // 1. 扩容标识不匹配(说明不是同一轮扩容)
                // 2. 扩容已经完成(sc == rs + 1,这个条件在JDK8中实际不会出现,是历史遗留)
                // 3. 扩容线程数达到最大值
                // 4. nextTable为null(扩容目标数组不存在)
                // 5. transferIndex <= 0(所有迁移任务已被分配完)
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                    sc == rs + 1 || 
                    sc == rs + MAX_RESIZERS || 
                    (nt = nextTable) == null || 
                    transferIndex <= 0)
                    break; // 退出扩容检查循环
                
                // CAS尝试增加扩容线程数,成功则参与协助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt); // 协助数据迁移
            }
            // 当前没有线程在扩容,当前线程发起扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) {
                // CAS成功设置sizeCtl为负值,表示扩容开始
                // (rs << RESIZE_STAMP_SHIFT) + 2 的高16位是扩容戳,低16位是扩容线程数+1
                transfer(tab, null); // 开始数据迁移,nextTable由transfer创建
            }
            
            // 重新计算元素数量,继续检查是否还需要扩容
            s = sumCount();
        }
    }
}

// ==================== 相关辅助方法 ====================

/**
 * 获取当前线程的探针值,用于定位CounterCell
 */
final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

/**
 * 计算总元素数量 = baseCount + 所有CounterCell的值
 */
final long sumCount() {
    CounterCell[] as = counterCells; 
    CounterCell a;
    long sum = baseCount; // 基础计数值
    
    // 累加所有CounterCell的值
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

/**
 * 生成扩容标识戳
 * @param n 当前table容量
 * @return 基于容量的唯一标识
 */
static final int resizeStamp(int n) {
    // numberOfLeadingZeros(n)返回n的前导0个数
    // 与一个标志位进行或运算,确保结果不为0
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
  1. size() 方法
java
/**
 * 返回此映射中的键值映射数。
 * 如果映射包含的元素数量大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。
 *
 * @return 此映射中的键值映射数量
 */
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

/**
 * 返回映射中的键值映射数量作为long类型。
 * 这是size()的替代方案,可以避免Integer.MAX_VALUE的限制。
 *
 * @return 映射中的键值映射数量
 * @since 1.8
 */
public long mappingCount() {
    long n = sumCount();
    return (n < 0L) ? 0L : n; // 当并发时可能为负值,但我们会将其视为0
}

/**
 * 计算当前总元素数量 = baseCount + 所有CounterCell的值
 * 这个方法被size(), mappingCount()等多个方法调用
 * 
 * @return 当前估计的元素总数
 */
final long sumCount() {
    // 获取CounterCell数组引用
    CounterCell[] as = counterCells;
    
    // 获取基础计数值
    long sum = baseCount;
    
    // 如果CounterCell数组不为null,则累加所有CounterCell的值
    if (as != null) {
        // 遍历CounterCell数组中的所有单元格
        for (int i = 0; i < as.length; ++i) {
            CounterCell a;
            // 如果当前单元格不为null,则将其值加到总和中
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

size 方法:

ConcurrentHashMap 的 size() 方法返回的是一个近似值,在并发环境下,它可能不是完全精确的。但它并非总是完全不准,它会尝试通过多次计算来得到一个相对准确的值。如果程序中需要严格的准确性,应该使用 mappingCount() 方法(返回 long)或依赖其他业务逻辑保证。

它的设计目标是:避免使用全局锁,以极高的性能获取一个在大多数情况下足够有用的尺寸值。

为什么 size() 很难计算?

想象一下,一个多线程环境下的计数器:线程 A 在读 size。线程 B 和 C 同时在执行 put 和 remove,增加和减少这个 size。如果你只用简单的 volatile int size,然后使用 synchronized 或 AtomicInteger 来更新,那么所有 put/remove 操作都会竞争同一个锁,这会成为巨大的性能瓶颈,违背了 CHM 的设计初衷。

解决方案:分而治之的计数 (JDK8 方案) CHM 采用了一种称为 “分桶计数” 或 “累加计数” 的方法。其核心思想是:不让所有线程更新同一个计数器,而是让它们分散地去更新多个不同的计数器,最后需要 size() 时,再把所有计数器的值加起来。

  1. 核心属性

首先,我们看支撑这个机制的核心属性:

java
// 1. 基础计数器:没有竞争时,线程通过CAS尝试更新这个值
private transient volatile long baseCount;

// 2. 计数器细胞数组(CounterCell array):当线程竞争更新baseCount失败时,不再重试,而是转而在这个数组中为自己找一个“坑位”进行计数。
//    它的思想跟LongAdder类一模一样,可以认为CHM内部嵌入了一个LongAdder。
private transient volatile CounterCell[] counterCells;

// 3. 锁状态标识:这个sizeCtl属性我们见过,它在计数时也扮演重要角色,用于初始化、扩容和创建CounterCell数组时的锁控制。
private transient volatile int sizeCtl;

CounterCell 是一个简单的包装类:

java
// 一个用于分配计数的填充单元。改编自LongAdder和Striped64。
@jdk.internal.vm.annotation.Contended // 避免伪共享(False Sharing)的重要注解
static final class CounterCell {
    volatile long value; // 每个Cell自己持有的计数值
    CounterCell(long x) { value = x; }
}
  1. 计数过程:addCount(long x, int check)

每次执行 put 或 remove 操作后,都会调用 addCount(1L, binCount) 或 addCount(-1L, -1) 来更新计数。

addCount 方法是理解计数如何工作的核心:

java
// x 是增加的数量,例如 put 是 1,remove 是 -1
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;

    // 第一步:尝试更新 baseCount
    // 如果 counterCells 还未初始化(as == null),则尝试用 CAS 直接更新 baseCount
    if ((as = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 进入这个if块,说明两种情况之一:
        // 1. counterCells 数组已经初始化了(不为null),或者...
        // 2. 尝试用CAS更新baseCount失败了(说明有其他线程也在更新,产生了竞争)

        CounterCell a; long v; int m;
        boolean uncontended = true; // 表示“未竞争”

        // 第二步:操作CounterCell数组
        // 情况A:计数器细胞数组还未初始化或长度为空
        // 情况B:当前线程通过探针哈希算出来的数组槽位是null
        // 情况C:CAS更新这个槽位的Cell的value值也失败了
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null || // 获取当前线程的随机探针,计算槽位
            !(uncontended = U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))) {

            // 如果上述任何一种情况发生,就调用fullAddCount方法
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算当前总元素数,判断是否需要扩容(这里不是size()的逻辑,略过)
        s = sumCount();
    }
    // ... 后续是检查扩容的代码
}

这个过程可以简化为:

  • 首选路径:尝试用 CAS 更新 baseCount。成功则返回。

  • 竞争路径:如果 CAS 更新 baseCount 失败(说明有竞争),则线程会有一个独有的 ThreadLocalRandom 探针值,像计算哈希一样,在 CounterCell[] 数组中找到一个属于自己的“坑位”(Cell)。

  • 最终路径:尝试用 CAS 更新自己那个“坑位”里的值。如果连这个都失败了(非常罕见的高并发),则进入 fullAddCount 方法进行重试、初始化数组或扩容数组。

这样,更新的压力就被分散到了 baseCount 和多个 CounterCell 上,极大地减少了线程间的竞争。

  1. 获取 Size:size() 和 sumCount()

现在来看 size() 方法本身,它就非常简单了:

java
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

// mappingCount() 方法更推荐,因为它返回long,避免溢出
public long mappingCount() {
    long n = sumCount();
    return (n < 0L) ? 0L : n; // ignore transient negative values
}

// 核心求和方法:把 baseCount 和所有 CounterCell 里的值加起来
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount; // 先以baseCount为基础
    if (as != null) {
        for (int i = 0; i < as.length; ++i) { // 遍历整个CounterCell数组
            if ((a = as[i]) != null)
                sum += a.value; // 把每个Cell的值累加上去
        }
    }
    return sum;
}

所以,size() 方法做的就是:

总元素数 = baseCount + (所有CounterCell[i].value的和)

为什么这是“近似值”?

因为 sumCount() 操作没有加锁

考虑这个场景:

  1. 线程 T1 开始计算 sumCount()。它读到 baseCount = 100

  2. 当 T1 正在遍历 counterCells 数组时,线程 T2 完成了一个 put 操作,成功地将数据 +1 记录到了某个 CounterCell[3] 里,CounterCell[3].value 从 5 变成了 6

  3. 线程 T3 完成了一个 remove 操作,成功地将 baseCount 通过 CAS 从 100 更新到了 99

  4. 线程 T1 继续计算,它读到了最新的 CounterCell[3].value = 6,但它之前读到的 baseCount 是旧的 100

  5. 最终 T1 计算出的结果是 100 + ... + 6 + ... = 106,而真实的瞬间状态可能是 99 + ... + 6 + ... = 105

这个结果 106 就是一个过时(Stale)的近似值。它既不是计算开始时的状态,也不是计算结束时的状态,而是一个中间状态的混合体。

  1. 扩容方法 transfer 方法
java
/**
 * 将节点从当前table转移到新table。
 * 这是扩容过程中的核心方法,支持多线程协同迁移。
 *
 * @param tab     当前table(旧table)
 * @param nextTab 新table,如果为null则初始化
 */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length;   // 旧table的长度
    int stride;           // 每个线程处理的桶区间大小(步长)
    
    // ==================== 第一步:初始化新table和迁移参数 ====================
    
    // 计算每个线程处理的步长,最小为MIN_TRANSFER_STRIDE(16)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // 确保步长不小于最小值
    
    // 如果nextTab为null,说明是扩容的发起线程,需要初始化新table
    if (nextTab == null) {
        try {
            @SuppressWarnings("unchecked")
            // 创建新table,容量是旧table的2倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            // 处理内存不足等情况
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;        // 更新全局nextTable引用
        transferIndex = n;          // 从后往前迁移,初始值为旧table长度
    }
    
    int nextn = nextTab.length;     // 新table的长度
    
    // 创建ForwardingNode节点,用于标记已迁移的桶
    // ForwardingNode的hash值为MOVED(-1),指向新table
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    
    boolean advance = true;         // 控制是否推进到下一个桶
    boolean finishing = false;      // 标记迁移是否完成
    
    // ==================== 第二步:迁移循环 - 每个线程处理自己的区间 ====================
    
    // i: 当前处理的桶索引, bound: 当前线程处理的区间下界
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; 
        int fh;
        
        // 这个while循环负责分配迁移任务(确定当前线程要处理的桶区间)
        while (advance) {
            int nextIndex, nextBound;
            
            // 情况1:如果还有未处理的桶,继续处理当前区间
            if (--i >= bound || finishing)
                advance = false;
            
            // 情况2:所有桶都已分配完,没有更多迁移任务
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            
            // 情况3:为当前线程分配新的迁移区间
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                // CAS成功,当前线程获得[nextBound, nextIndex-1]区间的迁移任务
                bound = nextBound;    // 区间下界
                i = nextIndex - 1;    // 从区间末尾开始处理
                advance = false;
            }
        }
        
        // ==================== 第三步:检查迁移状态 ====================
        
        // 情况1:所有迁移任务已完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            
            // 如果是最后一个完成迁移的线程,执行收尾工作
            if (finishing) {
                nextTable = null;        // 清理临时引用
                table = nextTab;         // 更新主table为新table
                sizeCtl = (n << 1) - (n >>> 1); // 更新sizeCtl为新容量的0.75倍
                return;                  // 迁移完成,退出方法
            }
            
            // 当前线程完成自己的迁移任务,减少扩容线程计数
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 检查是否所有线程都完成了迁移工作
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return; // 还有其他线程在工作,直接返回
                
                // 当前线程是最后一个工作线程,设置完成标志
                finishing = advance = true;
                i = n; // 重新检查所有桶,确保没有遗漏
            }
        }
        
        // 情况2:当前桶为空,直接标记为已迁移
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd); // CAS设置ForwardingNode
        
        // 情况3:当前桶已经是ForwardingNode,说明已被其他线程迁移
        else if ((fh = f.hash) == MOVED)
            advance = true; // 推进到下一个桶
        
        // 情况4:当前桶需要迁移,开始迁移工作
        else {
            // 对当前桶加锁,确保线程安全
            synchronized (f) {
                // 双重检查,防止在加锁期间桶已被其他线程修改
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn; // 低位链表(low node)和高位链表(high node)
                    
                    // 情况4a:普通链表节点
                    if (fh >= 0) {
                        // 计算扩容后的分布:原位置 或 原位置 + 旧容量
                        int runBit = fh & n; // 用于优化链表迁移
                        Node<K,V> lastRun = f;
                        
                        // 遍历链表,找到最后一段连续相同runBit的节点
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        
                        // 根据runBit决定lastRun节点应该放在低位还是高位
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        
                        // 重新遍历链表,构建低位和高位两个链表
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; 
                            K pk = p.key; 
                            V pv = p.val;
                            
                            // 根据hash & n的结果决定节点位置
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln); // 添加到低位链表
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn); // 添加到高位链表
                        }
                        
                        // 将低位链表放在新table的i位置(原位置)
                        setTabAt(nextTab, i, ln);
                        // 将高位链表放在新table的i+n位置(原位置+旧容量)
                        setTabAt(nextTab, i + n, hn);
                        // 将旧table的当前桶标记为已迁移
                        setTabAt(tab, i, fwd);
                        advance = true; // 推进到下一个桶
                    }
                    
                    // 情况4b:红黑树节点
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;   // 低位树
                        TreeNode<K,V> hi = null, hiTail = null;   // 高位树
                        int lc = 0, hc = 0; // 节点计数
                        
                        // 遍历树节点,分离为低位树和高位树
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            
                            // 根据hash & n的结果决定节点位置
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        
                        // 判断是否需要将树退化为链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        
                        // 设置新table中的节点
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

扩容机制:

这是 CHM 中最复杂的一部分,其核心思想是多线程协同扩容

  • 触发时机:在 put 方法最后的 addCount 中,如果元素总数超过 sizeCtl,会触发 transfer。

  • 如何协同:

    • 第一个触发扩容的线程会初始化一个比原数组大一倍的 nextTable。
    • 它会把原数组分成若干个 stride(步长/任务区间)**。
    • 其他线程在 put 时如果发现 fwd 节点(情况3),就会调用 helpTransfer 来协助扩容。参与扩容的线程数记录在 sizeCtl 的高 16 位。
    • 每个线程领取一个 stride 任务,负责将自己区间内的桶位从 table 迁移到 nextTable。迁移一个桶位后,会在原位置放置一个 ForwardingNode 节点作为标记。
    • 当所有线程都完成自己的任务后,整个扩容结束,nextTable 被设置为新的 table。
  • 精妙之处:

    • 高并发:多线程可以并行迁移不同的桶位,极大提高了扩容速度。
    • 无阻塞查询:get 操作遇到 ForwardingNode,可以转到 nextTable 上去查找,不会阻塞。
    • 无阻塞插入:put 操作遇到 ForwardingNode,会先帮忙扩容,扩容完成后再在新的 table 上进行插入。

JDK8 的 ConcurrentHashMap 总结:

  • 底层数据结构:数组(Node)+ 链表(Node)+ 红黑树(TreeNode)以下数组简称 table,链表简称 bin
  • 初始化:使用 cas 来保证并发安全,懒惰初始化 table
  • 树化:当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头
  • put:如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素添加至 bin 的尾部
  • get:无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索
  • 扩容:扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会被复制到新 table 中
  • size:元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中,最后统计数量时累加即可
9.10.1.3 JDK 7 的 ConcurrentHashMap

它维护了一个 segment 数组,每个 segment 对应一把锁

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 JDK8 中是类似的
  • 缺点:Segment 数组默认大小为 16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

其中 Segments 数组中每一个元素中存一个哈希表

  • 分段锁架构:Segment数组 + 锁分离

  • 链表解决冲突:只有链表,没有红黑树

  • 独立扩容:每个Segment独立扩容

  • 无锁读取:get操作完全无锁

  1. 构造方法
java
/**
 * 创建一个具有指定初始容量、负载因子和并发级别的ConcurrentHashMap
 * 
 * @param initialCapacity 初始容量,整个Map的初始容量,会平均分配到各个Segment
 * @param loadFactor 负载因子,每个Segment的负载因子
 * @param concurrencyLevel 并发级别,即Segment的数量
 * @throws IllegalArgumentException 如果初始容量为负、负载因子非正或并发级别非正
 */
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // ==================== 第一步:参数校验 ====================
    
    // 校验参数合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    
    // 如果并发级别大于最大分段数,使用最大分段数
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;

    // ==================== 第二步:计算Segment相关参数 ====================
    
    // 找到大于等于concurrencyLevel的最小的2的幂
    // 这样可以用位运算代替取模运算,提高性能
    int sshift = 0;      // 移位次数
    int ssize = 1;       // Segment数组的实际大小(2的幂次方)
    
    // 循环计算,直到ssize >= concurrencyLevel
    while (ssize < concurrencyLevel) {
        ++sshift;        // 移位次数加1
        ssize <<= 1;     // ssize乘以2
    }
    
    // 计算用于定位Segment的移位值和掩码值
    this.segmentShift = 32 - sshift;  // 用于计算Segment索引的右移位数
    this.segmentMask = ssize - 1;     // 用于计算Segment索引的掩码
    
    // ==================== 第三步:计算每个Segment的容量 ====================
    
    // 如果初始容量大于最大容量,使用最大容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    
    // 计算每个Segment的初始容量
    // c = 总容量 / Segment数量,向上取整
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;  // 如果除不尽,向上取整
    
    // 每个Segment中table的最小容量
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    
    // 找到大于等于c的最小的2的幂
    while (cap < c)
        cap <<= 1;  // cap乘以2,直到cap >= c

    // ==================== 第四步:创建Segment数组和第一个Segment ====================
    
    // 创建Segment数组,长度为ssize
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    
    // 创建Segment数组
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    
    // 使用UNSAFE将第一个Segment放入数组的第0个位置
    // 这里使用有序写入(putOrderedObject)来保证初始化顺序
    UNSAFE.putOrderedObject(ss, SBASE, s0); 
    
    // 将Segment数组赋值给segments字段
    this.segments = ss;
}

可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShiftthis.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位,结果再与 this.segmentMask 做位运算,最终得到 1010 即下标为 10 的 segment

  1. put 方法
java
public V put(K key, V value) {
	Segment<K, V> s;
	if (value == null)
		throw new NullPointerException();
	int hash = hash(key);
	//计算出 segment 下标
	int j = (hash >>> segmentShift) & segmentMask;
	
	//获得 segment 对象,判断是否为 null,是则创建该 segment
	if ((s = (Segment<K, V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
		//这时不能确定是否有真的为 null,因为其它线程也发现该 segment 为 null,因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
		s = ensureSegment(j);
	}
	//进入 segment 的 put 流程
	return s.put(key, hash, value, false);
}

segment 继承了可重入锁(ReentrantLock),它的 put 方法为:

java
/**
 * Segment内部的put方法
 * 
 * @param key
 * @param hash 哈希值
 * @param value
 * @param onlyIfAbsent 如果为true,则仅当key不存在时才put
 * @return 旧值,如果不存在则返回null
 */
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // ==================== 第一步:尝试获取锁 ====================
    
    // 尝试获取锁
    //如果不成功,进入 scanAndLockForPut 流程
    //如果是多核 cpu 最多 tryLock 64 次,进入 lock 流程
    //在尝试期间,还可以顺便看该节点在链表中有没有,如果没有顺便创建出来
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    
    //执行到这里 segment 已经被成功加锁,可以安全执行
    V oldValue;
    try {
        // ==================== 第二步:获取Segment内部数组 ====================
        
        HashEntry<K,V>[] tab = table;
        
        // 计算在Segment内部的桶索引
        int index = (tab.length - 1) & hash;
        
        // 获取桶的第一个节点(volatile读取)
        HashEntry<K,V> first = entryAt(tab, index);
        
        // ==================== 第三步:遍历链表查找key ====================
        
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                // 当前节点不为空,检查key是否匹配
                K k;
                if ((k = e.key) == key || 
                    (e.hash == hash && key != null && key.equals(k))) {
                    // 找到匹配的key
                    oldValue = e.value;
                    
                    // 如果onlyIfAbsent为false,或者旧值为null,则更新值
                    if (!onlyIfAbsent || oldValue == null) {
                        e.value = value;
                        ++modCount;  // 结构性修改计数
                    }
                    break;  // 退出循环
                }
                e = e.next;  // 继续遍历下一个节点
            }
            else {
	            //新增
                // 到达链表末尾,需要插入新节点
                
                // 如果scanAndLockForPut返回了预创建的节点,则使用它
                if (node != null)
                    node.setNext(first);  // 新节点指向原头节点
                else
                    // 否则创建新节点
                    node = new HashEntry<K,V>(hash, key, value, first);
                
                // 节点计数加1
                int c = count + 1;
                
                // ==================== 第四步:检查是否需要扩容 ====================
                
                // 如果超过阈值且未达到最大容量,则进行扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);  // 扩容并插入新节点
                else
                    // 不需要扩容,直接将新节点设置为桶的头节点
                    setEntryAt(tab, index, node);
                
                // 更新Segment的节点计数(volatile写入)
                count = c;
                
                // 新插入节点,旧值为null
                oldValue = null;
                break;
            }
        }
    } finally {
        // ==================== 第五步:释放锁 ====================
        
        unlock();
    }
    return oldValue;
}
  1. 扩容 rehash 方法

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

java
/**
 * Segment内部的扩容方法。
 * 将Segment的table容量扩大一倍,并重新分布所有节点。
 * 调用此方法时,必须持有Segment锁。
 * 
 * @param node 新插入的节点(在扩容期间插入)
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
    // ==================== 第一步:准备工作 ====================
    
    // 获取旧的哈希表
    HashEntry<K,V>[] oldTable = table;
    
    // 获取旧表的容量
    int oldCapacity = oldTable.length;
    
    // 新容量是旧容量的2倍(左移1位相当于乘以2)
    int newCapacity = oldCapacity << 1;
    
    // 重新计算扩容阈值:新容量 * 负载因子
    threshold = (int)(newCapacity * loadFactor);
    
    // 创建新的哈希表,容量为原来的2倍
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    
    // 计算新表的掩码,用于计算节点在新表中的位置
    int sizeMask = newCapacity - 1;
    
    // ==================== 第二步:遍历旧表,重新分布节点 ====================
    
    for (int i = 0; i < oldCapacity; i++) {
        // 获取旧表中第i个桶的第一个节点
        HashEntry<K,V> e = oldTable[i];
        
        // 如果桶不为空,处理该桶中的所有节点
        if (e != null) {
            HashEntry<K,V> next = e.next;
            
            // 计算该节点在新表中的位置
            // 由于新容量是旧容量的2倍,节点的新位置有两种可能:
            // 1. 原位置 i
            // 2. 原位置 i + oldCapacity
            int idx = e.hash & sizeMask;
            
            // ==================== 情况1:单节点链表 ====================
            
            if (next == null) {
                // 如果桶中只有一个节点,直接放入新表的对应位置
                newTable[idx] = e;
            }
            
            // ==================== 情况2:多节点链表 ====================
            
            else {
                // 用于优化的变量:最后一段连续相同位置的节点
                HashEntry<K,V> lastRun = e;
                
                // 最后一段连续节点的位置
                int lastIdx = idx;
                
                // 遍历整个链表,找到最后一段连续相同位置的节点
                // 这个优化可以避免对部分节点进行不必要的复制
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    // 计算当前节点在新表中的位置
                    int k = last.hash & sizeMask;
                    
                    // 如果位置发生变化,更新lastRun和lastIdx
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                
                // 将最后一段连续节点直接移动到新表的对应位置
                // 这样lastRun及其后续节点都不需要重新创建
                newTable[lastIdx] = lastRun;
                
                // ==================== 复制剩余节点 ====================
                
                // 从链表头开始,复制lastRun之前的所有节点
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    // 获取当前节点的属性
                    int h = p.hash;
                    int k = h & sizeMask;
                    
                    // 获取新表k位置的当前头节点
                    HashEntry<K,V> n = newTable[k];
                    
                    // 创建新节点,并插入到链表的头部
                    // 注意:这里创建了新节点,而不是重用旧节点
                    // 这样可以避免旧链表的变化影响新链表
                    newTable[k] = new HashEntry<K,V>(h, p.key, p.value, n);
                }
            }
        }
    }
    
    // ==================== 第三步:处理新插入的节点 ====================
    
    // 计算新节点在新表中的位置
    int nodeIndex = node.hash & sizeMask;
    
    // 将新节点插入到对应桶的头部
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    
    // ==================== 第四步:更新Segment的table引用 ====================
    
    table = newTable;
}
  1. get 方法

get 时并未加锁,用了 unsafe 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新表取内容

java
/**
 * 返回指定键所映射的值,如果此映射不包含该键的映射关系,则返回 {@code null}。
 * 
 * 更正式地说,如果此映射包含从键 {@code k} 到值 {@code v} 的映射关系,使得 {@code key.equals(k)},
 * 则此方法返回 {@code v};否则返回 {@code null}。(最多只能有一个这样的映射关系。)
 *
 * @param key 要返回其关联值的键
 * @return 指定键所映射的值,如果此映射不包含该键的映射关系,则返回 {@code null}
 * @throws NullPointerException 如果指定的键为 null
 */
public V get(Object key) {
    // ==================== 第一步:参数校验和哈希计算 ====================
    
    // 获取Segment数组引用
    Segment<K,V> s;
    
    // 获取HashEntry数组引用
    HashEntry<K,V>[] tab;
    
    // 计算key的哈希值
    int h = hash(key);
    
    // ==================== 第二步:计算Segment和桶的位置 ====================
    
    // 计算key的哈希值在内存中的偏移量,用于后续volatile读取
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    
    // 使用UNSAFE volatile读取获取Segment(保证可见性)
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        
        // ==================== 第三步:遍历链表查找key ====================
        
        // 遍历Segment中的桶链表查找key
        // 使用volatile读取保证获取到最新的链表头节点
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            
            K k;
            
            // 检查当前节点是否匹配目标key
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                // 找到匹配的key,返回对应的value
                // value字段是volatile的,保证读取到最新值
                return e.value;
        }
    }
    
    // 没有找到对应的key,返回null
    return null;
}
  1. size 方法

计算元素个数前,先不加锁计算两次,如果前后两次结果一样,认为个数结果正确并返回;如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回

java
/**
 * 返回此映射中的键值映射数。
 * 如果映射包含的元素数量大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。
 * 
 * 注意:与大多数集合不同,此方法不是常量时间操作。由于映射的异步性质,
 * 确定当前元素数量需要遍历所有Segment,因此如果在遍历期间有写入操作,
 * 返回的结果可能不准确。因此,此方法通常用于监控或估计大小,而不用于程序控制。
 *
 * @return 此映射中的键值映射数量
 */
public int size() {
    // 尝试多次无锁统计,如果结果稳定则返回
    // 如果多次统计结果不一致,则最终通过加锁所有Segment来获得准确大小
    final Segment<K,V>[] segments = this.segments;
    
    int size;
    boolean overflow; // 是否溢出(超过Integer.MAX_VALUE)
    long sum;         // 所有Segment的modCount总和
    long last = 0L;   // 上一次的sum值
    int retries = -1; // 重试次数,初始为-1
    
    try {
        for (;;) {
            // 如果重试次数达到阈值,强制加锁统计
            if (retries++ == RETRIES_BEFORE_LOCK) {
                // 加锁所有Segment
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // 确保Segment存在并加锁
            }
            
            sum = 0L;
            size = 0;
            overflow = false;
            
            // 遍历所有Segment,统计元素数量和modCount
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // 累加modCount,用于检测是否有结构性修改
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            
            // 如果是第一次迭代,或者sum与last不同(说明有修改),继续循环
            // 如果sum等于last,说明两次统计期间没有结构性修改,结果可信
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 如果之前加锁了,现在需要释放所有锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    
    // 处理溢出情况
    return overflow ? Integer.MAX_VALUE : size;
}
9.10.1.4 ConcurrentHashMap 对 HashMap 的改进

首先是 hash 的计算方法上,ConcurrentHashMap 的 spread 方法接收一个已经计算好的 hashCode,然后将这个哈希码的高 16 位与自身进行异或运算

java
static final int spread(int h) {
	return (h ^ (h >>> 16)) & HASH_BITS;
}

比 HashMap 的 hash 计算多了一个 & HASH_BITS 的操作,这里的 HASH_BITS 是一个常数,值为 0x7fffffff,它确保结果是一个非负整数

java
static final int hash(Object key) {
	int h;
	return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

另外,ConcurrentHashMap 对节点 Node 做了进一步的封装,比如说用 ForwardingNode 来表示正在进行扩容的节点

java
static final class ForwardingNode<K, V> extends Node<K, V> {
	final Node<K, V>[] nextTable;
	ForwardingNode(Node<K, V>[] tab) {
		super(MOVED, null, null, null);
		this.nextTable = tab;
	}
}

最后就是 put 方法,通过 CAS + synchronized 代码块来进行并发写入

9.10.1.5 为什么 ConcurrentHashMap 在 JDK1.7 中要用 ReentrantLock,而在 JDK1.8 要用 synchronized

JDK1.7 中的 ConcurrentHashMap 使用了分段锁机制,每个 Segment 都继承了 ReentrantLock,这样可以保证每个 Segment 都可以独立的加锁

而在 JDK1.8 中,ConcurrentHashMap 取消了 Segment 分段锁,采用了更加精细化的桶锁,以及 CAS 无锁算法,每个桶都可以独立的加锁,只有在 CAS 失败时才会使用 synchronized 代码块加锁,这样可以减少锁的竞争,提高并发性能

9.10.1.6 为什么 ConcurrentHashMap 比 Hashtable 效率高

Hashtable 在任何时刻只允许一个线程访问整个 Map,是通过对整个 Map 加锁来实现线程安全的,比如 get 和 put 方法,是直接在方法上加的 synchronized 关键字

而 ConcurrentHashMap 在 JDK8 中采用 CAS + synchronized 实现的,仅在必要时加锁 比如说 put 的时候优先使用 CAS 尝试插入,如果失败再使用 synchronized 代码块加锁,get 的时候是完全无锁的,因为 value 是 volatile 变量修饰的,保证了内存可见性

9.10.2 BlockingQueue

BlockingQueue 是 JUC 包下的一个线程安全队列,支持阻塞式的生产者-消费者模型,当队列容器已满,生产者线程会被阻塞,直到消费者线程取走元素后为止;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止

BlockingQueue 的实现类有很多,比如说 ArrayBlockingQueue、PriorityBlockingQueue 等

阻塞队列使用 ReentrantLock + Condition 来确保并发安全

以 ArrayBlockingQueue 为例,它内部维护了一个数组,使用两个指针分别指向队头和队尾,put 的时候先用 ReentrantLock 加锁,然后判断队列是否已满,如果已满就阻塞等待,否则插入元素

java
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

public void put(E e) throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly(); //加锁,确保线程安全
	try {
		while (count == items.length) { //队列满,阻塞
			notFull.await();
		}
		enqueue(e); //插入元素
	} finally {
		lock.unlock(); //释放锁
	}
}
9.10.2.1 LinkedBlockingQueue
  1. 基本的入队出队
java
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	static class Node<E> {
		E item;
		
		//next 指向有三种情况:
		//1. 指向真正的后继节点
		//2. 指向自己,发生在出队时
		//3. 指向 null,表示没有后继节点了,到最后了
		Node<E> next;
		
		Node(E x) {
			item = x;
		}
	}
}

初始化链表 last = head = new Node<E>(null); Dummy 节点用来占位,item 为 null

当一个节点入队 last = last.next = node;

再来一个节点入队 last = last.next = node;

出队:

java
Node<E> h = head;
Node<E> first = h.next;
h.next = h; //help GC
head = first;
E x = first.item;
first.item = null;
return x;

h = head

first = h.next

h.next = h

head = first

java
E x = first.item;
first.item = null;
return x;

  1. 加锁分析

用了两把锁和 Dummy 节点

  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行

线程安全分析:

  • 当节点总数大于 2 时(包括 Dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全,两把锁保证了入队和出队没有竞争
  • 当节点总数等于 2 时(即一个 Dummy 节点,一个正常节点)的时候,仍然是两把锁锁两个对象,不会竞争
  • 当节点总数等于 1 时(就一个 Dummy 节点),这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
java
// 用于 put(阻塞)offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();

//用户 take(阻塞)poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();

put 操作源码:

take 操作源码:

LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较:

  • Linked 支持有界,Array 强制有界
  • Linked 的实现是链表,Array 的实现是数组
  • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 是一把锁

9.10.3 ConcurrentLinkedQueue

todo

9.10.4 CopyOnWriteArrayList

CopyOnWriteArrayList 是 ArrayList 的线程安全版本

CopyOnWriteArraySet 是它的马甲,底层实现采用了写入时拷贝的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离,以新增为例:

java
public boolean add(E e) {
	synchronized (lock) {
		//获取旧的数组
		Object[] es = getArray();
		int len = es.length;
		//拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)
		es = Arrays.copyOf(es, len + 1);
		//添加新元素
		es[len] = e;
		//替换旧的数组
		setArray(es);
		return true;
	}
}

这里的源码版本是 Java11,在 Java1.8 中使用的是可重入锁而不是 synchronized

其它读操作并未加锁,例如:

java
public void forEach(Consumer<? super E> action) {
	Objects.requireNonNull(action);
	for (Object x : getArray()) {
		@SuppressWarnings("unchecked") E e = (E) x;
		action.accept(e);
	}
}

适合读多写少的应用场景

get 的弱一致性:

迭代器弱一致性:

不要觉得弱一致性就不好:

  • 数据库的 MVCC 都是弱一致性的表现
  • 并发高和一致性是矛盾的,需要权衡

第 10 章 ThreadLocal

ThreadLocal 是一种用于实现线程局部变量的工具类,它允许每个线程都拥有自己的独立副本,从而实现线程隔离

在 Web 应用中,可以使用 ThreadLocal 存储用户会话信息,这样每个线程在处理用户请求时都能方便地访问当前用户的会话信息

在数据库操作中,可以使用 ThreadLocal 存储数据库连接对象,每个线程有自己独立的数据库连接,从而避免了多线程竞争同一数据库连接的问题

在格式化操作中,例如日期格式化,可以使用 ThreadLocal 存储 SimpleDateFormat 实例,避免多线程共享同一实例导致的线程安全问题

ThreadLocal 的优点:每个线程访问的变量副本都是独立的,避免了共享变量引起的线程安全问题,由于 ThreadLocal 实现了变量的线程独占,使得变量不需要同步处理,因此能够避免资源竞争,ThreadLocal 可用于跨方法、跨类时传递上下文数据,不需要在方法间传递参数

ThreadLocal 的实现:当我们创建一个 ThreadLocal 对象并调用 set 方法时,其实是在当前线程中初始化了一个 ThreadLocalMap,ThreadLocalMap 是 ThreadLocal 的一个静态内部类,它内部维护了一个 Entry 数组,key 是 ThreadLocal 对象,value 是线程的局部变量,这样就相当于为每个线程维护了一个变量副本

Entry 继承了 WeakReference,它限定了 key 是一个弱引用,弱引用的好处是当内存不足时,JVM 会回收 ThreadLocal 对象,并且将其对应的 Entry.value 设置为 null,这样可以在很大程度上避免内存泄漏

ThreadLocal 内存泄漏:ThreadLocalMap 的 key 是弱引用,但 value 是强引用,如果一个线程一直在运行,并且 value 一直指向某个强引用对象,那么这个对象就不会被回收,从而导致内存泄漏 怎么解决?使用完 ThreadLocal 后,及时调用 remove() 方法释放内存空间

ThreadLocalMap 源码:todo

第 11 章 面试题

(1)什么是线程安全

如果一段代码块或者一个方法被多个线程同时执行还能够正确的处理共享数据,那么这段代码块或者这个方法就是线程安全的

  • 原子性
  • 可见性
  • 有序性:确保线程不会因为死锁、饥饿、活锁等问题导致无法继续执行

(2)线程间有哪些通信方式

线程之间传递信息的方式有多种,比如说使用 volatile 和 synchronized 关键字共享对象、使用 wait() 和 notify() 方法实现生产者-消费者模式、使用 Exchanger 进行数据交换、使用 Condition 实现线程间的协调等

  • Exchanger 是一个同步点,可以在两个线程之间交换数据,一个线程调用 exchange() 方法,将数据传递给另一个线程,同时接收另一个线程的数据
  • CompletableFuture 支持异步编程,允许线程在完成计算后将结果传递给其它线程

(3)sleep 和 wait 的区别

  • 所属类不同,sleep 会让当前线程休眠,不需要获取对象锁,属于 Thread 类的方法;wait 会让获得对象锁的线程等待,要提前获得对象锁,属于 Object 类的方法

    • sleep() 方法属于 Thread 类
    • wait() 方法属于 Object 类
  • 锁行为不同 如果一个线程在持有某个对象锁时调用了 sleep 方法,它在睡眠期间仍然会持有这个锁;当线程执行 wait 方法时,它会释放持有的对象锁,因此其它线程也有机会获取该对象的锁

  • 使用条件不同

    • sleep() 方法可以在任何地方被调用
    • wait() 方法必须在同步代码块或同步方法中被调用,这是因为调用 wait() 方法的前提是当前线程必须持有对象锁
  • 唤醒方式不同

    • 调用 sleep 方法后,线程会进入 Timed_Waiting 状态,即在指定的时间内暂停执行,当指定的时间结束后,线程会自动恢复到 runnable 状态,等待 CPU 调度再次执行
    • 调用 wait 方法后,线程会进入 waiting 状态,直到有其它线程在同一对象上调用 notify 或 notifyAll 方法,线程才会从 waiting 状态转变为 runnable 状态,准备再次获得 CPU 的执行权

(4)怎么保证线程安全

线程安全是指在并发环境下,多个线程访问共享资源时,程序能够正确地执行,而不会出现数据不一致的问题

为了保证线程安全,可以使用 synchronized 关键字对方法加锁,对代码块加锁。线程在执行同步方法、同步代码块时,会获取锁,其它线程就会阻塞并等待锁

如果需要更细粒度的锁,可以使用 ReentrantLock 并发重入锁等

如果需要保证变量的内存可见性,可以使用 volatile 关键字

对于简单的原子变量操作,还可以使用 Atomic 原子类

对于线程独立的数据,可以使用 ThreadLocal 来为每个线程提供专属的变量副本

对于需要并发容器的地方,可以使用 ConcurrentHashMap、CopyOnWriteArrayList 等

(5)线程安全的使用场景

单例模式,在多线程环境下,如果多个线程同时尝试创建实例,单例类必须确保只创建一个实例,并提供一个全局访问点

饿汉式是一种比较直接的实现方式,它通过在类加载时就立即初始化单例对象来保证线程安全

java
class Singleton {
	private static final Singleton instance = new Singleton();
	
	private Singleton() {
	}
	
	public static Singleton getInstance() {
		return instance;
	}
}

懒汉式单例则在第一次使用时初始化单例对象,这种方式需要使用双重检查锁来确保线程安全,volatile 关键字用来保证可见性,synchronized 关键字用来保证同步

java
class LazySingleton {
	private static volatile LazySingleton instance;
	
	private LazySingleton() {}
	
	public static LazySingleton getInstance() {
		if (instance == null) { //第一次检查
			synchronized (LazySingleton.class) {
				if (instance == null) { //第二次检查
					instance = new LazySingleton();
				}
			}
		}
		return instance;
	}
}

Released under the MIT License.

本站访客数 人次 本站总访问量