首先从乐观锁和悲观锁的设计思想开始。

乐观锁和悲观锁

悲观锁

悲观锁总认为最坏的情况可能会出现,即数据很可能会被其他人所修改,所以悲观锁在持有数据的时候总会把资源或者数据锁住,其他线程想要请求这个资源时阻塞,直到悲观锁释放资源读写操作均加锁)。传统的关系型数据库里边就用到了很多悲观锁机制,比如行锁,表锁等,读锁,写锁等,均在做操作之前先上锁。悲观锁的实现往往依靠数据库本身的锁功能实现。

Java 中的 SynchronizedReentrantLock 等独占锁(排他锁)也是一种悲观锁思想的实现,因为 SynchronziedReetrantLock 不管是否持有资源,它都会尝试去加锁。

一个悲观锁的运用场景: select * from student where name="cxuan" for update 这条 sql 语句从 Student 表中选取 name = "cxuan" 的记录并对其加锁,那么其他写操作再这个事务提交之前都不会对这条数据进行操作,起到了独占和排他的作用。

悲观锁因为对读写都加锁,所以性能较差,不适用多并发场景。

乐观锁

乐观锁的思想与悲观锁的思想相反,它总认为资源和数据不会被别人所修改,所以乐观锁读取不上锁,但是在写入操作时会判断当前数据是否被修改过。乐观锁的实现方案一般有两种:版本号机制CAS实现。乐观锁多适用于多读的应用类型,这样可以提高吞吐量。

在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

乐观锁的实现方式

版本号机制

通过数据表中加上一个 version 字段实现:表示数据被修改的次数当执行写操作并且写入成功后,version = version + 1。当线程A要更新数据时,在读取数据的同时也会读取 version 值;在提交更新时,若刚才读取到的 version 值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。

举一个栗子:

  1. 事务一开启,男柜员先执行读操作,取出金额和版本号,执行写操作:
1
updateset 金额 = 120,version = version + 1 where 金额 = 100 and version = 0

此时金额改为 120,版本号为1,事务未提交。

  1. 事务二开启,女柜员先执行读操作,取出金额和版本号,执行写操作
1
updateset 金额 = 50,version = version + 1 where 金额 = 100 and version = 0

此时金额改为 50,版本号变为 1,事务未提交。

  1. 现在提交事务一,金额改为 120,版本变为1,提交事务。理想情况下应该变为 金额 = 50,版本号 = 2,但是实际上事务二 的更新是建立在金额为 100 和 版本号为 0 的基础上的,所以事务二不会提交成功,应该重新读取金额和版本号,再次进行写操作。(这样,就避免了女柜员 用基于 version=0 的旧数据修改的结果覆盖男操作员操作结果的可能)
CAS算法

CAS 即 compare and swap(比较与交换),是一种有名的无锁算法。即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步。

CAS涉及3个要素:需要读写的内存值 V;进行比较的值 A;拟写入的新值 B

当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

JAVA对CAS的支持:在JDK1.5 中新添加 java.util.concurrent (J.U.C) 就是建立在 CAS 之上的。

乐观锁的缺点

ABA问题

如果一个变量第一次读取的值是 A,准备好需要对 A 进行写操作的时候,发现值还是 A,那么这种情况下,能认为 A 的值没有被改变过吗?可以是由 A -> B -> A 的这种情况,(AtomicInteger没有考虑这种情况)

JDK 1.5 以后的 AtomicStampedReference类提供该能力:其中的 compareAndSet 方法首先检查当前引用是否等于预期引用,以及当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

也可以采用CAS的一个变种DCAS解决上述问题:对于每一个V增加一个表示引用修改次数的标记符。对于每个V,如果引用修改了一次,这个计数器就加1。当该变量需要update的时候,就同时检查变量的值和计数器的值。

循环开销大

乐观锁在进行写操作时判断是否能够写入成功;如果写入不成功,将触发等待 -> 重试机制。这通常通过一个自旋锁实现:适用于短期内获取不到、等待重试的锁;但不适用于长期获取不到锁的情况。另外,自旋循环对于性能开销比较大。

Java的CAS和synchronized使用场景

CAS 适用于多读场景(写冲突较少);synchronized 适用于多写场景(写冲突较多)

  1. 对于资源竞争较少(线程冲突较轻)的情况:使用 synchronized 同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗 cpu 资源;而 CAS 基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。
  2. 对于资源竞争严重(线程冲突严重)的情况:CAS 自旋的概率较大,从而浪费更多的 CPU 资源,效率低于 synchronized

自旋锁

提出背景

多处理器环境中的资源常常通过锁实现资源的互斥访问。同一时间,只能有一个线程获取到锁。那没有获取到锁的线程应该怎么办呢?通常有两种处理方式:

  1. 自旋锁(非阻塞):循环等待,判断该资源是否已经释放锁;
  2. 互斥锁(阻塞):自己阻塞,等待重新调度请求。

工作原理

自旋锁的核心思想是通过一个共享变量(通常是原子变量)来控制锁的状态。当一个线程尝试获取锁时,它会检查该变量的值:

  1. 如果锁未被占用(变量值为 false),线程可以成功获取锁,并将变量值设置为 true
  2. 如果锁已被占用(变量值为 true),线程会进入一个循环,不断检查变量值,直到锁被释放。

优缺点

优点:

  1. 减少线程上下文切换开销:当锁已经被占用时,线程不会进入操作系统的调度队列,也即不需要做内核态和用户态之间的切换进入阻塞状态,它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户进程和内核切换的消耗。

  2. 自旋锁通常适用在时间比较短的情况

缺点:

  1. 忙等待浪费CPU资源:如果长时间上锁,会有大量的线程处于自旋状态占用 CPU 资源,进而会影响整体系统的性能;
  2. 可能导致活锁:线程持有锁的时间越长,则持有该锁的线程被OS调度程序中断的风险越大。如果发生中断情况,那么其他线程将保持旋转状态(反复尝试获取锁),而持有该锁的线程并不打算释放锁,这样导致的是结果是无限期推迟,直到持有锁的线程可以完成并释放它为止
    • 解决方法:设定自旋时间,到时间后立即释放自旋锁。
  3. 不适用于高负载的情况:在高并发的环境下,自旋锁可能会造成性能下降,因为自旋时的CPU消耗可能会比阻塞等待的成本还要高。

示例

一个C++栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

class SpinLock {
private:
std::atomic_flag flag = ATOMIC_FLAG_INIT; // 初始化一个原子布尔类型变量:专门用于标记锁的状态

public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire)) {
// 自旋等待锁释放:不断循环,直到flag变为false,即锁被释放
/*
flag.test_and_set是一个原子操作, 它会检查 flag 的值并将其设置为 true:
如果原本是 false, 它就设置为true,并返回false表示锁被成功获得;否则返回 true,表示锁已经被占用,此时进入循环等待。
当 flag 被设置为 true 时,表示锁被占用,此时会不断循环直到 flag 被清除。

std::memory_order_acquire 是一个内存顺序, 保证了当前线程获取锁时,所有先前对共享数据的读写操作会在获取锁之前完成,从而保证内存顺序。
*/
}
}

void unlock() {
// 释放锁的原子操作,清除atomic_flag, 将其设置为 false,允许其他线程获取锁
//std::memory_order_relase 保证了释放锁时,前面的操作会在此锁释放前可见
flag.clear(std::memory_order_release);
}
};

// 示例:使用自旋锁保护共享变量
int shared_data = 0;

void thread_task(SpinLock& lock, int id) {
// 1. 获取锁
lock.lock();
std::cout << "Thread " << id << " is working with shared data.\n";
// 2. 操作共享数据
++shared_data;
// 3. 模拟线程工作时的一段时间延迟
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "Thread " << id << " finished working. Shared data = " << shared_data << "\n";
// 4. 释放锁
lock.unlock();
}

int main() {
SpinLock spinlock;
std::vector<std::thread> threads;

for (int i = 0; i < 5; ++i) {
threads.emplace_back(thread_task, std::ref(spinlock), i);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Final value of shared data: " << shared_data << std::endl;
return 0;
}

一个Java栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SpinLock {
private AtomicReference<Thread> cas = new AtomicReference<Thread>();
public void lock() {
Thread current = Thread.currentThread();
// 利用CAS
while (!cas.compareAndSet(null, current)) {
// DO nothing
}
}
public void unlock() {
Thread current = Thread.currentThread();
cas.compareAndSet(current, null);
}
}

这段代码不支持重入:一个线程第一次已经获取到了该锁,如果在锁释放之前又一次重新获取该锁,第二次就不能成功获取到(由于不满足CAS,所以第二次获取会进入while循环等待,而如果是可重入锁,第二次也是应该能够成功获取到的)

可重入自旋锁

如何实现可重入锁呢?引入一个计数器,记录获取锁的线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReentrantSpinLock {
private AtomicReference<Thread> cas = new AtomicReference<Thread>();
private int count;
public void lock() {
Thread current = Thread.currentThread();
if (current == cas.get()) { // 如果当前线程已经获取到了锁,线程数增加一,然后返回
count++;
return;
}
// 如果没获取到锁,则通过CAS自旋
while (!cas.compareAndSet(null, current)) {
// DO nothing
}
}
public void unlock() {
Thread cur = Thread.currentThread();
if (cur == cas.get()) {
if (count > 0) {// 如果大于0,表示当前线程多次获取了该锁,释放锁通过count减一来模拟
count--;
} else {// 如果count==0,可以将锁释放,这样就能保证获取锁的次数与释放锁的次数是一致的了。
cas.compareAndSet(cur, null);
}
}
}
}

自旋锁的其他变种

TicketLock

思路:每当有线程获取锁的时候,就给该线程分配一个递增的id,称之为排队号锁对应一个服务号每当有线程释放锁,服务号就会递增,此时如果服务号与某个线程排队号一致,那么该线程就获得锁,由于排队号是递增的,所以就保证了最先请求获取锁的线程可以最先获取到锁,就实现了公平性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TicketLock {
// 服务号:当前持有锁的线程号
private AtomicInteger serviceNum = new AtomicInteger();
// 排队号
private AtomicInteger ticketNum = new AtomicInteger();
// 新增一个ThreadLocal,用于存储每个线程的排队号
private ThreadLocal<Integer> ticketNumHolder = new ThreadLocal<Integer>();
public void lock() {
// 1. 为当前线程分配一个新的排队号 currentTicketNum
int currentTicketNum = ticketNum.incrementAndGet();
// 2. 每个线程获取锁的时候(即调用lock()的时候):将当前线程的排队号保存在ticketNumHolder中
ticketNumHolder.set(currentTicketNum);
// 3. 自旋等待:循环判断排队号是否等于服务号
while (currentTicketNum != serviceNum.get()) {
// Do nothing
}
}
public void unlock() {
// 1. 从ThreadLocal中获取当前线程的排队号
Integer currentTickNum = ticketNumHolder.get();
// 2. 释放锁
// 检查 serviceNum 是否等于 currentTickNum:如果相等,则将 serviceNum 更新为 currentTickNum + 1,表示下一个线程可以获得锁;如果不相等,不做任何更改
serviceNum.compareAndSet(currentTickNum, currentTickNum + 1);
}
}

优点:保证了线程按照请求的顺序获取锁,不会出现线程饥饿的情况;

缺点:多处理系统上,每个进程/线程占用的处理器都在读写同一个变量serviceNum;每次读写操作都必须在多个处理器缓存之间进行缓存同步,这会导致繁重的系统总线和内存的流量,大大降低系统整体的性能。

CLHLock

CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态:如果发现前驱释放锁就结束自旋,获得锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class CLHLock {
// 锁的基本单元:每个线程在请求锁时,都会创建一个 CLHNode 对象,节点的状态由 isLocked 字段表示
public static class CLHNode {
private volatile boolean isLocked = true;
}
// 尾部节点:表示最后一个请求锁的线程
private volatile CLHNode tail;
// 线程本地存储:保存每个线程自己创建的 CLHNode 对象(每个线程只关心自己的节点,而不需要知道其他线程的节点)
private static final ThreadLocal<CLHNode> LOCAL = new ThreadLocal<CLHNode>();
// 原子更新器:用于原子地更新 CLHLock 对象中的 tail 字段
private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(CLHLock.class, CLHNode.class,
"tail");

public void lock() {
// 1. 创建节点并保存到 LOCAL:
CLHNode node = new CLHNode();
LOCAL.set(node);
// 2. 将新建的节点设置为尾部节点,并返回旧的节点(原子操作),这里旧的节点实际上就是当前节点的前驱节点
CLHNode preNode = UPDATER.getAndSet(this, node);
if (preNode != null) {
// 3. 等待前驱节点释放锁:前驱节点不为null表示当锁被其他线程占用,通过不断轮询判断前驱节点的锁标志位,等待前驱节点释放锁
while (preNode.isLocked) {
}
preNode = null;
LOCAL.set(node);
}
// 4. 如果不存在前驱节点,表示该锁没有被其他线程占用,则当前线程获得锁
}
public void unlock() {
// 1. 获取当前线程对应的节点
CLHNode node = LOCAL.get();
// 2.1 如果tail节点等于node(即当前线程是最后一个请求锁的线程):则将tail节点更新为null;
// 2.2 如果tail节点不等于node:将node的isLocked状态设置为false,表示当前线程释放了锁
if (!UPDATER.compareAndSet(this, node, null)) {
node.isLocked = false;
}
node = null;
}
}

工作原理小结:

线程获取锁

  1. 每个线程调用lock()时:创建一个新的CLHNode节点,将其与当前线程关联;
  2. 当前线程(通过原子操作)将自身节点设置为锁的尾部tail
  3. 如果有前驱节点(即当前线程不是第一个请求锁的线程):则进入自旋状态,等待前驱节点释放锁
  4. 只有前驱节点的isLocked被设置为false时:当前节点才能获取锁并继续执行。

线程释放锁

  1. 当前线程调用unlock()时:获取自己对应的CLHNode节点;
  2. 如果当前线程的节点是 tail(最后一个请求锁的线程):则将 tail 设置为 null,表示锁已被释放;
  3. 如果当前线程不是 tail:则将 isLocked 设置为 false,释放锁并允许其他线程获取锁

优点将获取锁的线程状态借助节点(node)保存,每个线程都有一份独立的节点,解决了TicketLock多处理器缓存同步的问题;每个线程都只关心前一个线程的锁状态,而不需要直接访问其他线程的节点,减少了共享内存的访问,从而减少了锁竞争; 缺点:每个线程都需要一个独立的 CLHNode 节点,这可能导致较高的内存消耗,尤其在高并发情况下。

MCSLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* MCS:发明人名字John Mellor-Crummey和Michael Scott
* 代码来源:http://ifeve.com/java_lock_see2/
*/
public class MCSLock {
// 每个线程在等待锁时,创建的节点
public static class MCSNode {
volatile MCSNode next; // 指向后驱节点
volatile boolean isLocked = true; // 当前线程是否在等待锁
}
private static final ThreadLocal<MCSNode> NODE = new ThreadLocal<MCSNode>();
// 队列:存储正在等待锁的线程的节点
@SuppressWarnings("unused")
private volatile MCSNode queue;
// queue原子更新器
private static final AtomicReferenceFieldUpdater<MCSLock, MCSNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(MCSLock.class, MCSNode.class,
"queue");
public void lock() {
// 1. 创建节点,并保存到ThreadLocal中
MCSNode currentNode = new MCSNode();
NODE.set(currentNode);
// 2. 将当前线程的节点设置为队列尾部,并且返回前一个节点
MCSNode preNode = UPDATER.getAndSet(this, currentNode);
// 3. 等待锁
if (preNode != null) {
// 如果之前节点不为null,表示锁已经被其他线程持有
preNode.next = currentNode;
// 循环判断,直到当前节点的锁标志位为false
while (currentNode.isLocked) {
}
}
}
public void unlock() {
// 1. 获取当前线程的节点
MCSNode currentNode = NODE.get();
// 2. 检查当前线程是否为队尾节点:
// 2.1 是队尾节点:next为null表示没有正在等待获取锁的线程
if (currentNode.next == null) {
// 更新状态并设置queue为null
if (UPDATER.compareAndSet(this, currentNode, null)) {
// 如果成功了,表示queue==currentNode,即当前节点后面没有节点了
return;
} else {
// 如果不成功,表示queue!=currentNode,即当前节点后面多了一个节点,表示有线程在等待
// 如果当前节点的后续节点为null,则需要等待其不为null
while (currentNode.next == null) {
}
}
} else {
// 2.2 不是队尾节点:唤醒后继节点
// 如果不为null,表示有线程在等待获取锁,此时将等待线程对应的节点锁状态更新为false(表示该线程可以获取锁),同时将当前线程的后继节点设为null
currentNode.next.isLocked = false;
currentNode.next = null;
}
}
}

CLHLock和MCSLock的共同点是什么呢?每个线程都有一份独立的节点,都通过链表的方式避免了减少了处理器缓存同步,极大的提高了性能;

CLHLock和MCSLock的区别是什么呢?CLHLock轮询其前驱节点的状态;而MCS则是查看当前节点的锁状态。

参考

看完你就应该能明白的悲观锁和乐观锁

看完你就明白的锁系列之自旋锁

自旋锁:原理、实现与应用

面试必备之深入理解自旋锁