研究了PriorityQueue原理,知道JDK源码怎么实现的优先队列,这次是要搞清PriorityBlockingQueue
阻塞优先队列是如何实现的;
本文从PriorityBlockingQueue的概念,结构,参数,源码解析(offer,poll,remove,add,grow),性能,线程安全性,使用场景,常见问题
8个方面进行分析。
关键点:与PriorityQueue一样的排序规则,无界队列,实现Queue,Collection,Iterator接口、不允许null键/值、提供阻塞操作、线程安全、不保证队列内元素的顺序;
概念
An unbounded BlockingQueue blocking queue that uses the same ordering rules as class PriorityQueue
and supplies blocking retrieval operations
.
- While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError).
- This class does not permit null elements.
- A priority queue relying on
Comparable natural ordering
also does not permit insertion ofnon-comparable
objects (doing so results in ClassCastException). - This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.
- The Iterator provided in method iterator() is
not guaranteed
to traverse the elements of the PriorityBlockingQueue in any particularorder
. If you need ordered traversal, consider usingArrays.sort(pq.toArray())
. Also, method drainTo can be used to remove some or all elements in priority order and place them in another collection. - Operations on this class make
no guarantees
about theordering
of elements withequal priority
.
If you need to enforce an ordering, you can define custom classes or comparators that use a secondary key to break ties in primary priority values.
For example, here is a class that applies first-in-first-out
tie-breaking to comparable elements. To use it, you would insert a new FIFOEntry(anEntry)
instead of a plain entry object.
1 | class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { |
- This class is a member of the Java Collections Framework
结构
基于二叉堆实现,参考JDK-PriorityQueue原理
PriorityQueue的类关系
PriorityQueue的类成员
参数
int.initialCapacity
:初始化容量,默认为11
;Comparator.comparator
:用于队列中元素排序;int.size
:记录队列中元素个数;ReentrantLock.lock
:用于所有public方法操作的加锁;Condition.notEmpty
:用于阻塞对空队列的操作;int.allocationSpinLock
: 队列扩容时用于CAS;PriorityQueue.queue
:用PriorityQueue进行序列化和反序列化;- 构造函数:新建1个空的队列;
1 | public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) { |
源码解析
- The implementation uses an
array-based binary heap
, with public operations protected with asingle lock
.- However, allocation during resizing uses a simple
spinlock
(used only while not holding main lock) in order to allow takes to operate concurrently with allocation.
This avoids repeated postponement of waiting consumers and consequent element build-up.- The need to back away from lock during allocation makes it impossible to simply wrap delegated
java.util.PriorityQueue
operations within a lock, as was done in a previous version of this class.- To maintain interoperability, a plain PriorityQueue is still used during serialization, which maintains compatibility at the expense of transiently doubling overhead.
heapify
参考PriorityQueue
从最后一个父节点开始siftdown,直到根节点
add/put/offer
1 | /** |
siftUp参考JDK-PriorityQueue原理
tryGrow
扩容要点
- lock是全局锁,如果在扩容时加锁会导致其他线程出队时会阻塞;
- 而队列很大时,扩容操作(arraycopy)是比较费时的,如果此时占用锁,那么其他线程在这个时候是不能进行出队操作,这样会
降低并发处理能力
; - 所以为了更好的性能,扩容时先释放锁;
- 但是释放锁后,会导致多个线程同时进行扩容,此时用spinLock以
CAS
控制只有1个线程可以执行扩容,其他CAS失败的则跳过(newArray=null); - CAS失败的线程调用
Thread.yield()
让出CPU时间,目的是让让CAS成功的线程扩容后优先调用lock.lock重新获取锁,但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁; - 在扩容时,若其他线程在执行了出队操作,直接cop扩容会导致copy的不是最新的数据,所以此时要加锁后再copy;
- 在加锁时,如果其他线程执行出了/如队操作,队列发生了变化(queue != array),当前扩容操作要取消;如果成功加锁且队列没发生改变,则可执行扩容操作;
关键点:解全局锁,CAS乐观锁申请数组大小,扩容前恢复加锁
1 | /** |
take/poll
take阻塞出队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//上锁,可中断
lock.lockInterruptibly();
E result;
try {
//阻塞直到队列返回结果
while ((result = dequeue()) == null) {
//阻塞等待恢复信号
notEmpty.await();
}
} finally {
//解锁
lock.unlock();
}
return result;
}poll阻塞出队(设置超时时间)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//阻塞直到队列返回结果,或者等待超时
while ((result = dequeue()) == null && nanos > 0) {
//阻塞等待恢复信号(超时时间)
nanos = notEmpty.awaitNanos(nanos);
}
} finally {
lock.unlock();
}
return result;
}
dequeue出队操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 /**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
//没元素返回空
if (n < 0) {
return null;
} else {
//拿出队头元素,用于返回
Object[] array = queue;
E result = (E) array[0];
//将队尾元素放到队头,并从队头开始执行siftDown
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
siftDownComparable(0, x, array, n);
} else {
siftDownUsingComparator(0, x, array, n, cmp);
}
size = n;
return result;
}
}
/**
* Inserts item x at position k, maintaining heap invariant by demoting x down the tree repeatedly
* until it is less than or equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n heap size
*/
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
if (n > 0) {
//拿出父节点值
Comparable<? super T> key = (Comparable<? super T>) x;
//存在叶子节点时 loop while a non-leaf
int half = n >>> 1;
while (k < half) {
// assume left child is least
int child = (k << 1) + 1;
//最小值
Object c = array[child];
int right = child + 1;
//存在right且right比left大
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0) {
c = array[child = right];
}
//父节点比最小值小,不需要交换,终止loop
if (key.compareTo((T) c) <= 0) {
break;
}
//父节点值以最小值替换
array[k] = c;
//父节点移到最小值位置
k = child;
}
//父节点值,最终赋给交换n轮后的叶子节点
array[k] = key;
}
}关于ReentrantLock.lockInterruptibly
- 获取并持有锁直到当前线程未被中断。
- 获取该锁并立即返回(如果该锁没有被另一个线程持有),将锁的保持计数设置为 1。
- 如果该锁已被另一个线程持有,则当前线程不可被调度(即阻塞状态,CPU不会给该线程分配时间片)直到
- 当前线程获取到该锁;
- 其他线程中断当前线程;
- 如果该锁被当前线程持有,则将锁的持有计数设置为1,
- 如果当前线程在进入此方法时已经设置了该线程的中断状态;或者在等待获取锁的同时被中断。则抛出
InterruptedException
异常,同时清除当前线程的中断状态;
- 在此实现中,因为
lockInterruptibly
方法是一个显式中断
点,所以要优先响应中断
,而不是响应锁的普通获取或重入获取;注意程序要响应中断还是比较expensive的,有时候甚至imposibble,所以如果线程支持中断,一定要声明清楚!
jdk8-lock-lockInterruptibly
关于Condition.await
await方法的调用将导致当前线程等待直到signalled或interrupted调用时才恢复;
condition关联的lock会被原子释放,当前线程将不可调度直到以下4种情况触发:
- 其他线程调用当前condition的signal()方法,并且当前线程正好被唤醒;
- 其他线程调用当前condition的signalAll()方法;
- 其他线程终止当前线程, and interruption of thread suspension is supported;
spurious wakeup
发生;
在所有情况中,在当前method能返回前,当前线程必须重新获取condition关联的锁;
在线程返回时await会保证一直持有condition关联的锁;
remove
加锁后,删除节点1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);
if (i == -1) {
return false;
}
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
// removed last element
if (n == i) {
array[i] = null;
} else {
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
//从删除节点开始下沉
siftDownComparable(i, moved, array, n);
} else {
siftDownUsingComparator(i, moved, array, n, cmp);
}
//siftDown后,若元素没有改变,可能是因为要删除的结点和堆尾结点是跨子树,或者要删除的结点是叶子结点
if (array[i] == moved) {
if (cmp == null) {
siftUpComparable(i, moved, array);
} else {
siftUpUsingComparator(i, moved, array, cmp);
}
}
}
size = n;
}
从当前节点上浮到根节点
1
2
3
4
5
6
7
8
9
10
11
12
13 private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0) {
break;
}
array[k] = e;
k = parent;
}
array[k] = key;
}peek
加锁后,返回堆顶节点
1
2
3
4
5
6
7
8
9
10
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}size
加锁后,返回堆大小
1
2
3
4
5
6
7
8
9
10
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}contains
加锁后,返回堆大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean contains(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(o) != -1;
} finally {
lock.unlock();
}
}
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++) {
if (o.equals(array[i])) {
return i;
}
}
}
return -1;
}性能
- O(1):peek
- O(n):heapify
- O(nlog(n)):put,remove
线程安全性
PriorityBlockingQueue中的锁
ReentrantLock
:重入锁,对queue的所有public操作加锁;Condition
:竞态条件,如果队列为空,take/poll时await
阻塞,offer时signal
取消阻塞;Unsafe
:扩容时,以compareAndSwapInt
执行CAS操作
关于UnSafe
Unsafe
是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源
、自主管理内存资源
等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。- 由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
关于CAS
- 什么是CAS? 即比较并替换,实现并发算法时常用到的一种技术。CAS操作包含三个操作数——内存位置、预期原值及新值。
- 执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何操作。我们都知道,CAS是一条CPU的原子指令(
cmpxchg
指令),不会造成所谓的数据不一致问题,Unsafe
提供的CAS方法(如compareAndSwap
XXX)底层实现即为CPU指令cmpxchg。- CAS在
java.util.concurrent.atomic
相关类、Java AQS
、CurrentHashMap
等实现上有非常广泛的应用。
锁的定义
1 | /** |
使用场景
常见问题
PriorityBlockingQueue中用到了那些锁?
- CAS
- ReentranLock
PriorityBlockingQueue中的Blocking体现在哪些操作?
- read:take
- write:grow,offer