阅读完需:约 10 分钟
PriorityBlockingQueue介绍
带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
它的源码相对比较简单,本节将介绍其核心源码部分。
我们来看看它有哪些属性:
// 构造方法中,如果不指定大小的话,默认大小为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 这个就是存放数据的数组
private transient Object[] queue;
// 队列当前大小
private transient int size;
// 大小比较器,如果按照自然序排序,那么此属性可设置为 null
private transient Comparator<? super E> comparator;
// 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
private final ReentrantLock lock;
// 这个很好理解,其实例由上面的 lock 属性创建
private final Condition notEmpty;
// 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;
// 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
private PriorityQueue q;
此类实现了 Collection 和 Iterator 接口中的所有接口方法,对其对象进行迭代并遍历时,不能保证有序性。如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。
PriorityBlockingQueue 提供了 drainTo 方法用于将部分或全部元素有序地填充(准确说是转移,会删除原队列中的元素)到另一个集合中。还有一个需要说明的是,如果两个对象的优先级相同(compare 方法返回 0),此队列并不保证它们之间的顺序。
PriorityBlockingQueue 使用了基于数组的二叉堆来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。
二叉堆:一颗完全二叉树,它非常适合用数组进行存储,对于数组中的元素 a[i]
,其左子节点为 a[2*i+1]
,其右子节点为 a[2*i + 2]
,其父节点为 a[(i-1)/2]
,其堆序性质为,每个节点的值都小于其左右子节点的值。二叉堆中最小的值就是根节点,但是删除根节点是比较麻烦的,因为需要调整树。
简单用个图解释一下二叉堆,这种数据结构的优点是一目了然的,最小的元素一定是根元素,它是一棵满的树,除了最后一层,最后一层的节点从左到右紧密排列。
下面开始 PriorityBlockingQueue 的源码分析,首先我们来看看构造方法:
// 默认构造方法,采用默认值(11)来进行初始化
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定数组的初始大小
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 指定比较器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
// 在构造方法中就先填充指定的集合中的元素
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
//
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
接下来,我们来看看其内部的自动扩容实现:
private void tryGrow(Object[] array, int oldCap) {
// 这边做了释放锁的操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 用 CAS 操作将 allocationSpinLock 由 0 变为 1,也算是获取锁
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果节点个数小于 64,那么增加的 oldCap + 2 的容量
// 如果节点数大于等于 64,那么增加 oldCap 的一半
// 所以节点数较小时,增长得快一些
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
// 这里有可能溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果 queue != array,那么说明有其他线程给 queue 分配了其他的空间
if (newCap > oldCap && queue == array)
// 分配一个新的大数组
newArray = new Object[newCap];
} finally {
// 重置,也就是释放锁
allocationSpinLock = 0;
}
}
// 如果有其他的线程也在做扩容的操作
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 重新获取锁
lock.lock();
// 将原来数组中的元素复制到新分配的大数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
扩容方法对并发的控制也非常的巧妙,释放了原来的独占锁 lock,这样的话,扩容操作和读操作可以同时进行,提高吞吐量。
下面,我们来分析下写操作 put 方法和读操作 take 方法。
public void put(E e) {
// 直接调用 offer 方法,因为前面我们也说了,在这里,put 方法不会阻塞
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 首先获取到独占锁
lock.lock();
int n, cap;
Object[] array;
// 如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 节点添加到二叉堆中
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 更新 size
size = n + 1;
// 唤醒等待的读线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
对于二叉堆而言,插入一个节点是简单的,插入的节点如果比父节点小,交换它们,然后继续和父节点比较。
// 这个方法就是将数据 x 插入到数组 array 的位置 k 处,然后再调整树
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 二叉堆中 a[k] 节点的父节点位置
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
我们用图来示意一下,我们接下来要将 11 插入到队列中,看看 siftUp 是怎么操作的。
我们再看看 take 方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 独占锁
lock.lockInterruptibly();
E result;
try {
// dequeue 出队
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 队头,用于返回
E result = (E) array[0];
// 队尾元素先取出
E x = (E) array[n];
// 队尾置空
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
dequeue 方法返回队头,并调整二叉堆的树,调用这个方法必须先获取独占锁。
废话不多说,出队是非常简单的,因为队头就是最小的元素,对应的是数组的第一个元素。难点是队头出队后,需要调整树。
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
// 这里得到的 half 肯定是非叶节点
// a[n] 是最后一个元素,其父节点是 a[(n-1)/2]。所以 n >>> 1 代表的节点肯定不是叶子节点
// 下面,我们结合图来一行行分析,这样比较直观简单
// 此时 k 为 0, x 为 17,n 为 9
int half = n >>> 1; // 得到 half = 4
while (k < half) {
// 先取左子节点
int child = (k << 1) + 1; // 得到 child = 1
Object c = array[child]; // c = 12
int right = child + 1; // right = 2
// 如果右子节点存在,而且比左子节点小
// 此时 array[right] = 20,所以条件不满足
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key = 17, c = 12,所以条件不满足
if (key.compareTo((T) c) <= 0)
break;
// 把 12 填充到根节点
array[k] = c;
// k 赋值后为 1
k = child;
// 一轮过后,我们发现,12 左边的子树和刚刚的差不多,都是缺少根节点,接下来处理就简单了
}
array[k] = key;
}
}
记住二叉堆是一棵完全二叉树,那么根节点 10 拿掉后,最后面的元素 17 必须找到合适的地方放置。首先,17 和 10 不能直接交换,那么先将根节点 10 的左右子节点中较小的节点往上滑,即 12 往上滑,然后原来 12 留下了一个空节点,然后再把这个空节点的较小的子节点往上滑,即 13 往上滑,最后,留出了位子,17 补上即可。
我稍微调整下这个树,以便读者能更明白:
好了, PriorityBlockingQueue 我们也说完了。