volatile和lock是Java中用于线程协同同步的两种机制。本文基于JDK1.8和Hotspot进行分析。
Volatile
volatile是Java中的一个关键字,它的作用有
- 保证变量的可见性
- 防止重排序
- 保证64位变量(long,double)的原子性读写
volatile在Java语言规范中规定的是
The Java programming language allows threads to access shared variables (§17.1). As a rule, to ensure that shared variables are consistently and reliably updated, a thread should ensure that it has exclusive use of such variables by obtaining a lock that, conventionally, enforces mutual exclusion for those shared variables.
The Java programming language provides a second mechanism, volatile fields, that is more convenient than locking for some purposes.
A field may be declared volatile, in which case the Java Memory Model ensures that all threads see a consistent value for the variable .
It is a compile-time error if a final variable is also declared volatile.
Java内存模型中规定了volatile的happen-before效果,对volatile变量的写操作happen-before于后续的读。这样volatile变量能够确保一个线程的修改对其他线程可见。volatile因为不能保证原子性,所以不能在有约束或后验条件的场景下使用,如i++,常用的场景是stop变量保证系统停止对其他线程可见,double-check lock单例中防止重排序来保证安全发布等。
以下面这段代码为例
public class TestVolatile {
private static volatile boolean stop = false;
public static void main(String[] args) {
stop = true;
boolean b = stop;
}
}
stop字段声明为volatile类型后,编译后的字节码中其变量的access_flag中ACC_VOLATILE位置为1。
关键的字节码内容如下
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=1, locals=2, args_size=1
0: iconst_1
1: putstatic #2 // Field stop:Z
4: getstatic #2 // Field stop:Z
7: istore_1
8: return
LineNumberTable:
line 14: 0
line 15: 4
line 16: 8
LocalVariableTable:
Start Length Slot Name Signature
0 9 0 args [Ljava/lang/String;
8 1 1 b Z
static {};
descriptor: ()V
flags: ACC_STATIC
Code:
stack=1, locals=0, args_size=0
0: iconst_0
1: putstatic #2 // Field stop:Z
4: return
LineNumberTable:
line 11: 0
}
通过hsdis查看虚拟机产生的汇编代码。
测试环境为java version “1.8.0_45”,MACOS10.12.1 i386:x86-64
在执行参数上添加
-XX:+UnlockDiagnosticVMOptions
-XX:+LogCompilation
-XX:+PrintAssembly
-Xcomp
-XX:CompileCommand=dontinline,*TestVolatile.main
-XX:CompileCommand=compileonly,*TestVolatile.main
查看main方法的汇编指令结果
Decoding compiled method 0x000000010c732c50:
Code:
[Disassembling for mach='i386:x86-64']
[Entry Point]
[Verified Entry Point]
[Constants]
# {method} {0x000000012422a2c8} 'main' '([Ljava/lang/String;)V' in 'com/concurrent/volatiles/TestVolatile'
# parm0: rsi:rsi = '[Ljava/lang/String;'
# [sp+0x40] (sp of caller)
0x000000010c732da0: mov %eax,-0x14000(%rsp)
0x000000010c732da7: push %rbp
0x000000010c732da8: sub $0x30,%rsp
0x000000010c732dac: movabs $0x12422a448,%rdi ; {metadata(method data for {method} {0x000000012422a2c8} 'main' '([Ljava/lang/String;)V' in 'com/concurrent/volatiles/TestVolatile')}
0x000000010c732db6: mov 0xdc(%rdi),%ebx
0x000000010c732dbc: add $0x8,%ebx
0x000000010c732dbf: mov %ebx,0xdc(%rdi)
0x000000010c732dc5: movabs $0x12422a2c8,%rdi ; {metadata({method} {0x000000012422a2c8} 'main' '([Ljava/lang/String;)V' in 'com/concurrent/volatiles/TestVolatile')}
0x000000010c732dcf: and $0x0,%ebx
0x000000010c732dd2: cmp $0x0,%ebx
0x000000010c732dd5: je 0x000000010c732e03 ;*iconst_1
; - com.concurrent.volatiles.TestVolatile::main@0 (line 14)
0x000000010c732ddb: movabs $0x76adce798,%rsi ; {oop(a 'java/lang/Class' = 'com/concurrent/volatiles/TestVolatile')}
0x000000010c732de5: mov $0x1,%edi
0x000000010c732dea: mov %dil,0x68(%rsi)
0x000000010c732dee: lock addl $0x0,(%rsp) ;*putstatic stop
; - com.concurrent.volatiles.TestVolatile::main@1 (line 14)
0x000000010c732df3: movsbl 0x68(%rsi),%esi ;*getstatic stop
; - com.concurrent.volatiles.TestVolatile::main@4 (line 15)
0x000000010c732df7: add $0x30,%rsp
0x000000010c732dfb: pop %rbp
0x000000010c732dfc: test %eax,-0x3adbd02(%rip) # 0x0000000108c57100
; {poll_return}
0x000000010c732e02: retq
0x000000010c732e03: mov %rdi,0x8(%rsp)
0x000000010c732e08: movq $0xffffffffffffffff,(%rsp)
0x000000010c732e10: callq 0x000000010c7267e0 ; OopMap{rsi=Oop off=117}
;*synchronization entry
; - com.concurrent.volatiles.TestVolatile::main@-1 (line 14)
; {runtime_call}
0x000000010c732e15: jmp 0x000000010c732ddb
0x000000010c732e17: nop
0x000000010c732e18: nop
0x000000010c732e19: mov 0x2a8(%r15),%rax
0x000000010c732e20: movabs $0x0,%r10
0x000000010c732e2a: mov %r10,0x2a8(%r15)
0x000000010c732e31: movabs $0x0,%r10
0x000000010c732e3b: mov %r10,0x2b0(%r15)
0x000000010c732e42: add $0x30,%rsp
0x000000010c732e46: pop %rbp
0x000000010c732e47: jmpq 0x000000010c6940e0 ; {runtime_call}
[Exception Handler]
可以看到在mov %dil,0x68(%rsi)
给stop赋值后增加了lock addl $0x0,(%rsp)
,IA32中对lock的说明是:
The LOCK # signal is asserted during execution of the instruction following the lock prefix. This signal can be used in a multiprocessor system to ensure exclusive use of shared memory while LOCK # is asserted
lock用于在多处理器中执行指令时对共享内存的独占使用。它的副作用是能够将当前处理器对应缓存的内容刷新到内存,并使其他处理器对应的缓存失效。另外还提供了有序的指令无法越过这个内存屏障的作用。
Lock
synchronized
Java中提供的锁的关键字是synchronized, 可以加在方法块上,也可以加在方法声明中。
synchronized关键字起到的作用是设置一个独占访问临界区,在进入这个临界区前要先获取对应的监视器锁,任何Java对象都可以成为监视器锁,声明在静态方法上时监视器锁是当前类的Class对象,实例方法上是当前实例。
synchronized提供了原子性、可见性和防止重排序的保证。
JMM中定义监视器锁的释放操作happen-before与后续的同一个监视器锁获取操作。再结合程序顺序规则就可以形成内存传递可见性保证。
下面以一段代码查看各个层次的实现
public class TestSynchronize {
private int count;
private void inc() {
synchronized (this) {
count++;
}
}
public static void main(String[] args) {
new TestSynchronize().inc();
}
}
编译后inc方法的字节码为
private void inc();
descriptor: ()V
flags: ACC_PRIVATE
Code:
stack=3, locals=3, args_size=1
0: aload_0
1: dup
2: astore_1
3: monitorenter
4: aload_0
5: dup
6: getfield #2 // Field count:I
9: iconst_1
10: iadd
11: putfield #2 // Field count:I
14: aload_1
15: monitorexit
16: goto 24
19: astore_2
20: aload_1
21: monitorexit
22: aload_2
23: athrow
24: return
Exception table:
from to target type
4 16 19 any
19 22 19 any
LineNumberTable:
line 14: 0
line 15: 4
在synchronized代码块前后增加的monitorenter和monitorexist两个JVM字节码指令,指令的参数是this引用。
hotspot中对于monitor_enter和monitor_exit的处理是
void LIRGenerator::monitor_enter(LIR_Opr object, LIR_Opr lock, LIR_Opr hdr, LIR_Opr scratch, int monitor_no, CodeEmitInfo* info_for_exception, CodeEmitInfo* info) {
if (!GenerateSynchronizationCode) return;
// for slow path, use debug info for state after successful locking
CodeStub* slow_path = new MonitorEnterStub(object, lock, info);
__ load_stack_address_monitor(monitor_no, lock);
// for handling NullPointerException, use debug info representing just the lock stack before this monitorenter
__ lock_object(hdr, object, lock, scratch, slow_path, info_for_exception);
}
void LIRGenerator::monitor_exit(LIR_Opr object, LIR_Opr lock, LIR_Opr new_hdr, LIR_Opr scratch, int monitor_no) {
if (!GenerateSynchronizationCode) return;
// setup registers
LIR_Opr hdr = lock;
lock = new_hdr;
CodeStub* slow_path = new MonitorExitStub(lock, UseFastLocking, monitor_no);
__ load_stack_address_monitor(monitor_no, lock);
__ unlock_object(hdr, object, lock, scratch, slow_path);
}
inc方法在本机上输出的汇编代码为
Decoding compiled method 0x0000000115be3e50:
Code:
[Entry Point]
[Constants]
# {method} {0x0000000113082328} 'inc' '()V' in 'com/concurrent/lock/TestSynchronize'
# [sp+0x50] (sp of caller)
0x0000000115be3fc0: mov 0x8(%rsi),%r10d
0x0000000115be3fc4: shl $0x3,%r10
0x0000000115be3fc8: cmp %rax,%r10
0x0000000115be3fcb: jne 0x0000000115b1de20 ; {runtime_call}
0x0000000115be3fd1: data32 data32 nopw 0x0(%rax,%rax,1)
0x0000000115be3fdc: data32 data32 xchg %ax,%ax
[Verified Entry Point]
0x0000000115be3fe0: mov %eax,-0x14000(%rsp)
0x0000000115be3fe7: push %rbp
0x0000000115be3fe8: sub $0x40,%rsp
0x0000000115be3fec: movabs $0x113082848,%rax ; {metadata(method data for {method} {0x0000000113082328} 'inc' '()V' in 'com/concurrent/lock/TestSynchronize')}
0x0000000115be3ff6: mov 0xdc(%rax),%edi
0x0000000115be3ffc: add $0x8,%edi
0x0000000115be3fff: mov %edi,0xdc(%rax)
0x0000000115be4005: movabs $0x113082328,%rax ; {metadata({method} {0x0000000113082328} 'inc' '()V' in 'com/concurrent/lock/TestSynchronize')}
0x0000000115be400f: and $0x0,%edi
0x0000000115be4012: cmp $0x0,%edi
0x0000000115be4015: je 0x0000000115be418d ;*aload_0
; - com.concurrent.lock.TestSynchronize::inc@0 (line 14)
0x0000000115be401b: lea 0x20(%rsp),%rdi
0x0000000115be4020: mov %rsi,0x8(%rdi)
0x0000000115be4024: mov (%rsi),%rax
0x0000000115be4027: mov %rax,%rbx
0x0000000115be402a: and $0x7,%rbx
0x0000000115be402e: cmp $0x5,%rbx
0x0000000115be4032: jne 0x0000000115be40b9
0x0000000115be4038: mov 0x8(%rsi),%ebx
0x0000000115be403b: shl $0x3,%rbx
0x0000000115be403f: mov 0xa8(%rbx),%rbx
0x0000000115be4046: or %r15,%rbx
0x0000000115be4049: xor %rax,%rbx
0x0000000115be404c: and $0xffffffffffffff87,%rbx
0x0000000115be4050: je 0x0000000115be40e1
0x0000000115be4056: test $0x7,%rbx
0x0000000115be405d: jne 0x0000000115be40a6
0x0000000115be405f: test $0x300,%rbx
0x0000000115be4066: jne 0x0000000115be4085
0x0000000115be4068: and $0x37f,%rax
0x0000000115be406f: mov %rax,%rbx
0x0000000115be4072: or %r15,%rbx
0x0000000115be4075: lock cmpxchg %rbx,(%rsi)
0x0000000115be407a: jne 0x0000000115be41a4
0x0000000115be4080: jmpq 0x0000000115be40e1
0x0000000115be4085: mov 0x8(%rsi),%ebx
0x0000000115be4088: shl $0x3,%rbx
0x0000000115be408c: mov 0xa8(%rbx),%rbx
0x0000000115be4093: or %r15,%rbx
0x0000000115be4096: lock cmpxchg %rbx,(%rsi)
0x0000000115be409b: jne 0x0000000115be41a4
0x0000000115be40a1: jmpq 0x0000000115be40e1
0x0000000115be40a6: mov 0x8(%rsi),%ebx
0x0000000115be40a9: shl $0x3,%rbx
0x0000000115be40ad: mov 0xa8(%rbx),%rbx
0x0000000115be40b4: lock cmpxchg %rbx,(%rsi)
0x0000000115be40b9: mov (%rsi),%rax
0x0000000115be40bc: or $0x1,%rax
0x0000000115be40c0: mov %rax,(%rdi)
0x0000000115be40c3: lock cmpxchg %rdi,(%rsi)
0x0000000115be40c8: je 0x0000000115be40e1
0x0000000115be40ce: sub %rsp,%rax
0x0000000115be40d1: and $0xfffffffffffff007,%rax
0x0000000115be40d8: mov %rax,(%rdi)
0x0000000115be40db: jne 0x0000000115be41a4 ;*monitorenter
; - com.concurrent.lock.TestSynchronize::inc@3 (line 14)
0x0000000115be40e1: mov 0xc(%rsi),%eax ;*getfield count
; - com.concurrent.lock.TestSynchronize::inc@6 (line 15)
0x0000000115be40e4: inc %eax
0x0000000115be40e6: mov %eax,0xc(%rsi) ;*putfield count
; - com.concurrent.lock.TestSynchronize::inc@11 (line 15)
0x0000000115be40e9: lea 0x20(%rsp),%rax
0x0000000115be40ee: mov 0x8(%rax),%rdi
0x0000000115be40f2: mov (%rdi),%rsi
0x0000000115be40f5: and $0x7,%rsi
0x0000000115be40f9: cmp $0x5,%rsi
0x0000000115be40fd: je 0x0000000115be411a
0x0000000115be4103: mov (%rax),%rsi
0x0000000115be4106: test %rsi,%rsi
0x0000000115be4109: je 0x0000000115be411a
0x0000000115be410f: lock cmpxchg %rsi,(%rdi)
0x0000000115be4114: jne 0x0000000115be41b7 ;*monitorexit
; - com.concurrent.lock.TestSynchronize::inc@15 (line 16)
0x0000000115be411a: movabs $0x113082848,%rax ; {metadata(method data for {method} {0x0000000113082328} 'inc' '()V' in 'com/concurrent/lock/TestSynchronize')}
0x0000000115be4124: incl 0x108(%rax) ;*goto
; - com.concurrent.lock.TestSynchronize::inc@16 (line 16)
0x0000000115be412a: add $0x40,%rsp
0x0000000115be412e: pop %rbp
0x0000000115be412f: test %eax,-0x684e035(%rip) # 0x000000010f396100
; {poll_return}
0x0000000115be4135: retq ;*return
; - com.concurrent.lock.TestSynchronize::inc@24 (line 17)
0x0000000115be4136: mov 0x2a8(%r15),%rax
0x0000000115be413d: xor %r10,%r10
0x0000000115be4140: mov %r10,0x2a8(%r15)
0x0000000115be4147: xor %r10,%r10
0x0000000115be414a: mov %r10,0x2b0(%r15)
0x0000000115be4151: mov %rax,%rsi
0x0000000115be4154: lea 0x20(%rsp),%rax
0x0000000115be4159: mov 0x8(%rax),%rbx
0x0000000115be415d: mov (%rbx),%rdi
0x0000000115be4160: and $0x7,%rdi
0x0000000115be4164: cmp $0x5,%rdi
0x0000000115be4168: je 0x0000000115be4185
0x0000000115be416e: mov (%rax),%rdi
0x0000000115be4171: test %rdi,%rdi
0x0000000115be4174: je 0x0000000115be4185
0x0000000115be417a: lock cmpxchg %rdi,(%rbx)
0x0000000115be417f: jne 0x0000000115be41ca ;*monitorexit
; - com.concurrent.lock.TestSynchronize::inc@21 (line 16)
0x0000000115be4185: mov %rsi,%rax
0x0000000115be4188: jmpq 0x0000000115be4205
0x0000000115be418d: mov %rax,0x8(%rsp)
0x0000000115be4192: movq $0xffffffffffffffff,(%rsp)
0x0000000115be419a: callq 0x0000000115bd5be0 ; OopMap{rsi=Oop off=479}
;*synchronization entry
; - com.concurrent.lock.TestSynchronize::inc@-1 (line 14)
; {runtime_call}
0x0000000115be419f: jmpq 0x0000000115be401b
0x0000000115be41a4: mov %rsi,0x8(%rsp)
0x0000000115be41a9: mov %rdi,(%rsp)
0x0000000115be41ad: callq 0x0000000115bd4060 ; OopMap{rsi=Oop [40]=Oop off=498}
;*monitorenter
; - com.concurrent.lock.TestSynchronize::inc@3 (line 14)
; {runtime_call}
0x0000000115be41b2: jmpq 0x0000000115be40e1
0x0000000115be41b7: lea 0x20(%rsp),%rax
0x0000000115be41bc: mov %rax,(%rsp)
0x0000000115be41c0: callq 0x0000000115bd4420 ; {runtime_call}
0x0000000115be41c5: jmpq 0x0000000115be411a
0x0000000115be41ca: lea 0x20(%rsp),%rax
0x0000000115be41cf: mov %rax,(%rsp)
0x0000000115be41d3: callq 0x0000000115bd4420 ; {runtime_call}
0x0000000115be41d8: jmp 0x0000000115be4185
0x0000000115be41da: nop
0x0000000115be41db: nop
0x0000000115be41dc: mov 0x2a8(%r15),%rax
0x0000000115be41e3: movabs $0x0,%r10
0x0000000115be41ed: mov %r10,0x2a8(%r15)
0x0000000115be41f4: movabs $0x0,%r10
0x0000000115be41fe: mov %r10,0x2b0(%r15)
0x0000000115be4205: add $0x40,%rsp
0x0000000115be4209: pop %rbp
0x0000000115be420a: jmpq 0x0000000115b440e0 ; {runtime_call}
[Exception Handler]
其中lock cmpxchg为Compare And Exchange
CMPXCHG compares its destination (first) operand to the value in AL, AX or EAX (depending on the size of the instruction). If they are equal, it copies its source (second) operand into the destination and sets the zero flag. Otherwise, it clears the zero flag and leaves the destination alone.
CMPXCHG is intended to be used for atomic operations in multitasking or multiprocessor environments. To safely update a value in shared memory, for example, you might load the value into EAX, load the updated value into EBX, and then execute the instruction lock cmpxchg [value],ebx. If value has not changed since being loaded, it is updated with your desired new value, and the zero flag is set to let you know it has worked. (The LOCK prefix prevents another processor doing anything in the middle of this operation: it guarantees atomicity.) However, if another processor has modified the value in between your load and your attempted store, the store does not happen, and you are notified of the failure by a cleared zero flag, so you can go round and try again.
wait()和notify()
和synchronized紧密关联的还有wait()和notify()的等待通知机制。wait()和notify()能够实现基于条件的等待通知方式。但是wait()的对象和等待的条件谓词以及监视器锁,这三者必须统一起来,并且为了防止假通知要在循环中判断条件。
常见的使用方式是
condition protected by lock
synchronized(lock) {
while(condition not satified) {
lock.wait();
}
// now do something
if(change others' condition) {
lock.notify()/notifyAll()
}
}
java.util.concurrent.locks.Lock
Java中除了使用synchronized关键字外,还提供了Lock类及实现提供同样的功能,
java.util.concurrent.locks.ReentrantLock提供了和synchronized相同的独占可重入锁以及相同的内存语义。
Lock的区别是,提供了tryLock、定时、可中断的lock,并且使用时unlock要用finally保护起来确保一定能够解锁。
ReentrantLock中内置一个AbstractQueuedSynchronizer类来实现基于状态的等待和同步。
lock方法和unlock方法等都委托给这个AQS的子类去实现。
class ReentrantLock{
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
}
Lock分为公平锁和非公平锁两种实现,公平锁按照先到先得的策略,而非公平锁则可以抢占。
下面是AQS的子类实现。使用state表示加锁状态,0表示没有加锁,大于0表示加锁的次数,说明可以一个线程多次重入。
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
// 按照是否公平进行不同的实现
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
// 非公平的尝试获取
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前状态
int c = getState();
if (c == 0) {
// 如果状态为0, 使用CAS进行一次尝试占用
if (compareAndSetState(0, acquires)) {
// 成功CAS,则将自己设置为占有线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 否则判断如果自己就是占有线程,则无需加锁进行数量修改。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 抢占失败,进入等待队列
return false;
}
protected final boolean tryRelease(int releases) {
// 调用释放锁,说明已经持有了该锁,无需进行CAS操作。
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 状态为0说明锁已经释放,可以唤醒其他等待线程。
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
NonFaiSync
非公平锁的实现是
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
AbstractQueuedSynchronizer
AQS提供了一个实现基于状态(state)和等待队列的阻塞同步框架。它是实现Lock,CountDownLatch,Smepaphore等同步器的基础。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released.
它提供了不同形式的获取(acquire)和释放(release)操作。获取操作是依赖状态的操作,state由具体的实现来定义具体的语义,当条件不足时会阻塞,如Lock中当已获取锁时或成功抢到锁获取动作成功,否则失败进入阻塞状态。表示CountDownLatch闭锁时,state表示剩余需要countDown()的次数。在表示信号量时,state表示剩余的许可的个数,获取后state小于0则会阻塞。
子类可以通过getState(),setState(),compareAndSetState()几个方法来访问和控制state值。
AQS由exclusive和shared两种模式,在独占模式成功获取了AQS时,其他线程不可能获取成功。
子类通过覆写以下几个方法
- tryAcquire(int)
- tryRelease(int)
- tryAcquireShared(int)
- tryReleaseShared(int)
- isHeldExclusively()
AQS对外暴露的acquire(int),acquireShard(int), release(int)等方法会调用上述覆写方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果tryAcquire(arg)
返回成功,则直接返回,说明获取操作成功。否则,进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,addWaiter将创建一个等待者并加入到等待队列中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 尝试通过CAS将自己设置成Tail节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(node)实现为
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果tail为空说明队列没有创建
if (t == null) { // Must initialize
// 尝试创建第一个
if (compareAndSetHead(new Node()))
创建并抢占替换成功,则设置tail为自己临时创建的head节点,marker节点
tail = head;
} else {
node.prev = t;
// 否则,则将自己的prev指向tail,然后CAS替换tail为自己
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
获取前置节点
final Node p = node.predecessor();
// 如果前置节点是head并且成功acquire
if (p == head && tryAcquire(arg)) {
设置自己为新的head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否应该park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// park直到得到通知
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对应的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 如果tryRelease(arg)返回true,则将后续等待节点unpark
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Node是AQS中双向链表的节点
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
acquireQueued实现为
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Unsafe
神奇的Unsafe类在JDK很多源码中都实现,它能够提供park
,unpark
,compareAndSwapInt
,staticFieldOffset
等非常底层的方法。很多LockSupport
,AtomicIntegerUnsafe
类都使用它来实现线程等待、恢复、CAS等操作。这个类并不推荐开发者使用,并增加了调用安全验证,但是可以通过反射的方式获取到。
Unsafe
中的方法都是native
类型,实现在unsafe.cpp
中,如Unsafe.compareAndSwapInt
:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
} UNSAFE_END
inline jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest,
jbyte compare_value, cmpxchg_memory_order order) {
STATIC_ASSERT(sizeof(jbyte) == 1);
volatile jint* dest_int =
static_cast<volatile jint*>(align_ptr_down(dest, sizeof(jint)));
size_t offset = pointer_delta(dest, dest_int, 1);
jint cur = *dest_int;
jbyte* cur_as_bytes = reinterpret_cast<jbyte*>(&cur);
// current value may not be what we are looking for, so force it
// to that value so the initial cmpxchg will fail if it is different
cur_as_bytes[offset] = compare_value;
// always execute a real cmpxchg so that we get the required memory
// barriers even on initial failure
do {
// value to swap in matches current value ...
jint new_value = cur;
// ... except for the one jbyte we want to update
reinterpret_cast<jbyte*>(&new_value)[offset] = exchange_value;
jint res = cmpxchg(new_value, dest_int, cur, order);
if (res == cur) break; // success
// at least one jbyte in the jint changed value, so update
// our view of the current jint
cur = res;
// if our jbyte is still as cur we loop and try again
} while (cur_as_bytes[offset] == compare_value);
return cur_as_bytes[offset];
}
关于偏向锁、轻量级锁、自适应自旋和重量级锁
轻量级锁
Java中的监视器锁monitor需要操作系统的互斥量(mutex)和条件变量(condition variable)的支持,进行系统调用涉及到用户态内核态切换,导致开销比较大。在实际项目中观察发现,很大一部分锁都是无竞争的,因此增加了叫做轻量级锁(lightweight locking)的优化方式。Java对象在内存中,分为ObjectHeader和ObjectFields两个部分,Object Header对象头内存放了MarkWord和指向对象所属类的引用, MarkWord根据对象的不同状态存放不同的内容,如GC年龄、锁状态、HashCode等。
轻量级锁的工作方式为:
当对监视器锁对象执行monitorenter操作并且是轻量级加锁时,会在执行的线程上创建一个lock record,lock record保存了原有监视器锁对象的MarkWord,然后使用CAS尝试替换监视器锁的markword部分为这个lock record的引用。如果CAS成功,当前线程就拥有了这个锁;如果CAS失败,说明遇到了竞争其他线程也在获取这个锁,这时就会进行锁膨胀(inflated), 将操作系统的mutex和condition vaiable绑定到这个监视器锁对象上,膨胀成功后,markword指向了一个包含操作系统互斥量和条件变量的数据结构。在释放锁阶段,线程会尝试使用CAS将监视器锁的markword替换为原来的markword,如果替换成功,说明没有遇到竞争;如果失败,则遇到了竞争需要释放锁并且通知其他等待获取锁的线程。
偏向锁
偏向锁在轻量级锁的优化上更进一步,偏向锁认为锁对象不仅在大部分时候都是无竞争的,而且一直是由一个线程获取的。
-XX:+UseBiasedLocking Enables a technique for improving the performance of uncontended synchronization. An object is “biased” toward the thread which first acquires its monitor via a monitorenter bytecode or synchronized method invocation; subsequent monitor-related operations performed by that thread are relatively much faster on multiprocessor machines. Some applications with significant amounts of uncontended synchronization may attain significant speedups with this flag enabled; some applications with certain patterns of locking may see slowdowns, though attempts have been made to minimize the negative impact.
偏向锁的工作方式为:
如果设置了使用偏向锁,则对象markword的后三位为101,在加锁时线程会CAS尝试将mardword替换为自己的线程ID,线程ID可以是其他的能够标识线程的标识。如果CAS成功,则当前线程成为bias owner,之后线程获取锁时发现监视器对象是可偏向的并且线程ID为自己,则不再需要做其他事情。如果CAS失败,则其他线程已经成为了偏向拥有者,当前线程会撤销偏向,升级为轻量级加锁,在安全点(safe point)时,进行markword的替换。
线程实现方式
http://www.oracle.com/technetwork/java/threads-140302.html
Openjdk中的Object.wait()Object.notify()实现
参考: http://www.oracle.com/technetwork/java/biasedlocking-oopsla2006-wp-149958.pdf
原文: https://liuzhengyang.github.io/2017/03/28/volatileandlock/