Java多线程(五)---显式锁和ReentrantLock

Java并发包中的显示锁接口和类位于包java.util.concurrent.locks下,主要的接口和类有:

  • 锁接口Lock,主要实现类是ReentrantLock;
  • 读写锁接口ReadWriteLock,主要实现类是ReentrantReadWriteLock

本篇文章主要介绍接口Lock和实现类ReentrantLock。

Lock

Lock作为显式锁,其提供了一种无条件的、可轮询和定时的、可中断的锁操作,
其获得锁和释放锁的操作都是显示。

显式锁Lock接口的定义为:

1
2
3
4
5
6
7
8
public interface Lock {
void lock(); //获取锁
void lockInterruptibly() throws InterruptedException; //可中断的获取锁操作
boolean tryLock(); //尝试获取锁,不会被拥塞,如果失败立刻返回
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //在一定时间内尝试获得锁,如果超时则失败
void unlock(); // 释放锁
Condition newCondition();
}

void lock();获取锁。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。

void lockInterruptibly() throws InterruptedException;如果当前线程未被中断,则获取锁。如果锁可用,则获取锁,并立即返回。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:锁由当前线程获得;或者其他某个线程中断 当前线程,并且支持对锁获取的中断。如果当前线程:在进入此方法时已经设置了该线程的中断状态;或者在获取锁时被中断 ,并且支持对锁获取的中断,则将抛出 InterruptedException ,并清除当前线程的已中断状态。

boolean tryLock();仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true 。如果锁不可用,则此方法将立即返回值 false 。通常对于那些不是必须获取锁的操作可能有用。

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。如果锁可用,则此方法将立即返回值 true 。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下三种情况之一前,该线程将一直处于休眠状态:

void unlock();释放锁。对应于lock()、tryLock()、tryLock(xx)、lockInterruptibly()等操作,如果成功的话应该对应着一个unlock(),这样可以避免死锁或者资源浪费。

newCondition() 返回用来与此 Lock 实例一起使用的 Condition 实例。

可重入锁ReentrantLock

ReentrantLock它的基本用法lock/unlock实现了与synchronized一样的语义:

  • 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁;
  • 可以解决竟态条件问题;
  • 可以保证内存可见性。
    Lock的标准用法:
    1
    2
    3
    4
    5
    6
    7
    8
    Lock lock = new ReentrantLock();

    lock.lock();
    try{
    // to do sth.
    } finally{
    lock.unlock(); //须要在finally中释放锁
    }

tryLock避免死锁

使用tryLock(),可以避免死锁。在持有一个锁,获取另一个锁,获取不到的时候,可以释放已持有的锁,给其他线程机会获取锁,然后再重试获取所有锁。

我们看一个银行账户之间转账的例子,来理解一下tryLock的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class DeadlockAvoidance {
private static Random rnd = new Random();

// 转账
public boolean transferMoney(Account fromAcct, Account toAcct,DollarAmount amount,long timeout,TimeUnit unit)
throws InsufficientFundsException, InterruptedException {
long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
long randMod = getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() + unit.toNanos(timeout);

while (true) {
// 尝试获得fromAcct的锁
if (fromAcct.lock.tryLock()) {
try {
// 尝试获得toAcct的锁
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0) //余额不足
throw new InsufficientFundsException();
else { // 余额满足,转账
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally { //释放toAcct锁
toAcct.lock.unlock();
}
}
} finally { //释放fromAcct锁
fromAcct.lock.unlock();
}
}
// 获得锁失败
// 判断是否超时 如果超时则立刻失败
if (System.nanoTime() < stopTime)
return false;

// 如果没有超时,随机睡眠一段时间
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}


class Account {
//显示锁
public Lock lock;

void debit(DollarAmount d) {
}

void credit(DollarAmount d) {
}

DollarAmount getBalance() {
return null;
}
}

class InsufficientFundsException extends Exception {
}
}

只有同时获得转出账户和转入账户的锁后,才会进行转账。如果不能同时获得两个锁,就释放掉已经获得的锁,并随机随眠一段时间,再去尝试获得全部的锁,循环这个过程直到超时。
tryLock还有另外一个方法
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //在一定时间内尝试获得锁,如果超时则失败
通过该方法可以实现定时锁。

另外lockInterruptibly方法不仅能获得锁,还能保持对于中断的响应,实现中断锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class InterruptibleLocking {
private Lock lock = new ReentrantLock();

public boolean sendOnSharedLine(String message)
throws InterruptedException {
// 可以响应中断的锁
lock.lockInterruptibly();
try {
return cancellableSendOnSharedLine(message);
} finally {
lock.unlock();
}
}

// 可能会抛出中断异常
private boolean cancellableSendOnSharedLine(String message) throws InterruptedException {
/* send something */
return true;
}

}

ReentrantLock实现原理

可重入锁在底层依赖于CAS,另外依赖于类LockSupport中的一些方法。

  • LockSupport
    LockSupport基本方法有park/unpark,park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不在对他进行调度,当有其他线程对它调用了unpark,unpark使参数指定的线程恢复可运行状态。

park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。与CAS方法一样,park/unpark也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作。

  • AQS
    利用CAS和LockSupport提供的基本方法,就可以用来实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueuedSynchronizer,我们简称为AQS,它简化了并发工具的实现。

AQS封装了一个状态,给子类提供了查询和设置状态的方法:

1
2
3
4
private volatile int state;
protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)

用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:

1
2
3
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t)
protected final Thread getExclusiveOwnerThread()

  • ReentrantLock
    ReentrantLock内部使用AQS,有三个内部类:
    1
    2
    3
    abstract static class Sync extends AbstractQueuedSynchronizer
    static final class NonfairSync extends Sync
    static final class FairSync extends Sync

Sync是抽象类,NonfairSync是fair为false时使用的类,FairSync是fire为true时使用的类。

我们看一下Sync的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = -5179523762034025860L;

// 之所以这里没有实现关键的lock操作,是因为不同的策略(公平和非公平)有不同的lock方式

abstract void lock();



/** 看名字就知道是非公平策略下尝试获取锁所调用的操作 */

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

// c==0表明现在还没有线程来获取这把锁

if (c == 0) {

if (compareAndSetState(0, acquires)) {

// 如果设置成功,则此线程就是独占锁的拥有者啦!

setExclusiveOwnerThread(current);

return true;

}

}

// 只有拥有该锁的线程能再次获取锁的许可(可重入性)

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

// 再次获取锁的许可,这里需要将许可数添加到state中

setState(nextc);

return true;

}

return false;

}



/** 无需多说,释放锁的通用操作*/

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

// 如果此时state值为0,说明此时该线程已经和该锁脱离关系了,所以锁的拥有者得设置成null

if (c == 0) {

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}



/** 判断当前线程是否是获取该锁的线程*/

protected final boolean isHeldExclusively() {

return getExclusiveOwnerThread() == Thread.currentThread();

}



final ConditionObject newCondition() {

return new ConditionObject();

}



/** 返回锁的拥有者线程*/

final Thread getOwner() {

return getState() == 0 ? null : getExclusiveOwnerThread();

}



/** 当前线程持有该锁的次数*/

final int getHoldCount() {

return isHeldExclusively() ? getState() : 0;

}



/** 当state为0,表明已经没有线程持有该锁*/

final boolean isLocked() {

return getState() != 0;

}



private void readObject(java.io.ObjectInputStream s)

throws java.io.IOException, ClassNotFoundException {

s.defaultReadObject();

setState(0); // reset to unlocked state

}

}

从ReentrantLock的Sync的实现来看,我们得到两个重要信息:1.只能有一个线程拥有该锁,所以ReentrantLock是使用AQS的独占模式。2.锁可以被拥有者线程重复持有,即可重入,并用AQS中state来记录拥有者线程当前持有该锁的次数,tryAcquire一次则state+1,tryRelease一次则state-1,所以当state为0时,表明该锁目前没有被任何线程持有。

ReentrantLock构造函数

1
2
3
4
5
6
7
8
9
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {

sync = fair ? new FairSync() : new NonfairSync();

}

很显然FairSync代表公平策略的实现,而NonfairSync代表非公平的实现,而且FairSync和NonfairSync都继承于上面提到的Sync。

我们先来看非公平NonfairSync的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static final class NonfairSync extends Sync {

private static final long serialVersionUID = 7316153563782823691L;



final void lock() {

if (compareAndSetState(0, 1))

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(1);

}



protected final boolean tryAcquire(int acquires) {

return nonfairTryAcquire(acquires);

}

}

可以看出,有了Sync,NonfairSync变得异常简单,它实现了Sync中没有实现的lock方法,lock方法会先看state值是否为0,为0表明该锁还没被线程拥有,所以立马将其设为1(是个CAS过程),如果成功,则把当前线程设置成独占锁的拥有者,如果失败或者state根本不为0,则说明当前线程可能没有抢锁成功或者当前线程不是第一次持有该锁了(可重入性),此时则会调用AQS的独占获取资源的方法acquire(acquire(1);AQS中acquire的代码:

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

在这里,acquire先会调用我们自己tryAcquire方法的实现,如果获取资源数成功,则返回,否则则以独占的方式进入队列中等待,而我们NonfairSync实现的tryAcquire方法是直接调用Sync中的nonfairTryAcquire实现.

我们再来看看公平的FairSync的内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static final class FairSync extends Sync {

private static final long serialVersionUID = -3000897897090466540L;



final void lock() {

acquire(1);

}



protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

// 对于公平策略,当state为0时,不能直接去尝试作为锁的拥有者,因为有可能队列中还有线程在等

if (c == 0) {

// 如果队列中没有线程且将state由0设置成1成功,则说明已经成功拥有该锁

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

// 拥有该锁后,将拥有者线程设置成当前线程

setExclusiveOwnerThread(current);

return true;

}

}

// 当前线程本来就是锁的拥有者,则直接给state值+1

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

// 这里之所有没有用CAS操作给state设值,是因为当锁找到拥有者后,只有拥有者线程能修改state

setState(nextc);

return true;

}

return false;

}

}

和NonfairSync中的lock方法相比,FairSync的lock是直接调用AQS中的acquire操作,而没有经过compareAndSetState(0, 1)的CAS操作,因为对于公平策略来说,即使这时碰巧锁的拥有者放弃了锁的使用权,它也不能通过CAS操作尝试去获取该锁,因为有可能AQS的队列中还有线程同样也在等待这个机会,所以不能直接让它走NonfairSync的lock中的CAS这一步。

因为AQS中的acquire方法会先调用tryAcquire,所以在tryAcquire不能光去看state是否为0,即使state为0当前线程也不能去尝试拥有该锁,所以tryAcquire还通过调用AQS中的hasQueuedPredecessors方法来看队列中是否有排队线程,如果没有,才去使用CAS来尝试获取该锁(compareAndSetState(0, acquires)),当此线程在此时队列中没有等待线程却还争夺锁失败或者当此线程不是锁的拥有者线程时,tryAcquire会返回false,于是AQS的acquireQueued操作直接让它去排队了(acquireQueued(addWaiter(Node.EXCLUSIVE), arg)))。

ReentrantLock和synchronized对比

相比synchronized,ReentrantLock可以实现与synchronized相同的语义,但还支持以非阻塞方式获取锁、可以响应中断、可以限时等,更为灵活。

synchronized代表一种声明式编程,程序员更多的是表达一种同步声明,由Java系统负责具体实现,程序员不知道其实现细节,显式锁代表一种命令式编程,程序员实现所有细节。

能用synchronized就用synchronized,不满足要求,再考虑ReentrantLock。

参考:
https://www.cnblogs.com/swiftma/p/6517198.html