并发编程之:BlockingQueue
大家好,我是小黑,一个在互联网苟且偷生的农民工。
队列
学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时使用的List,Set这些数据结构相比有点特殊,它的特殊之处在于它只允许在队列的头部(Head)进行删除操作,在尾部(Tail)进行插入操作,这种方式的队列我们称之为先进先出队列(FIFO)。
在JDK1.5
中推出了队列这一数据结构的具体实现,接口Queue是对于队列的定义,并有一些列具有特殊功能的队列实现。
在Queue接口中定义了队列的如下方法:
其中add(E)
并非Queue接口新定义,而是从Collection接口继承而来的。
阻塞队列
BlockingQueue接口也是在JDK1.5
中推出,存放在java.util.concurrent
包中,继承自Queue,所以在BlockingQueue中有Queue的所有方法。
从名字就可以看出BlockingQueue是一种阻塞队列,它支持在检索元素时如果队列为空可以一直阻塞等待直到有元素可以获取,同样在添加元素时如果队列已满会阻塞等待队列中有空闲的存储空间。
BlockingQueue的方法可以归纳为四类:
- 在操作时如不能立即满足,会直接抛出异常
- 在操作时如不能立即满足,则返回特殊的值,如插入、移除方法会返回false,检查方法会返回null
- 在操作时如不能立即满足,则会阻塞等待,直到操作成功
- 在操作时如不能立即满足,则会阻塞等待给定的时间长度,时间到达后如果还不能满足则返回null
这四类方法总结如下。
因为在BlockingQueue的一些方法中,会通过null表示某种操作的失败,所以不允许在BlockingQueue中存放null值元素,会在操作时抛出NullPointerExection异常。
BlockingQueue因为是一个容器嘛,所以它也有容量的限制,在具体实现类中有可以设置容量的实现类,也有不可以设置容量的实现类,不能设置容量的实现类容量默认为Integer.MAX_VALUE
。
BlockingQueue是定义在java.util.concurrent
包中,那么它在并发情况下到底是不是线程安全的呢?
在JDK提供的BlockingQueue的具体实现类中,上面表格中的方法实现都是线程安全的,在内部都使用了锁或者其他形式的并发控制保证操作的原子性。
但是有一点要注意,就是一些批量处理的方法例如addAll
、containsAll
、retainAll
和removeAll
这些方法并不一定是线程安全的,使用时注意。
说完BlockingQueue接口我们接下来看看它都有哪些具体的实现呢?以及在它们内部是如何做到线程安全和阻塞的呢?
ArrayBlockingQueue
ArrayBlockingQueue是一个底层由数组支持额有界阻塞队列。
重要属性
先来看看ArrayBlockingQueue中都有哪些属性。
// 存放元素的数组
final Object[] items;
// 用来记录取元素的下标,用于下一次在take,poll,remove,peek方法中使用
int takeIndex;
// 用来记录添加元素的下标,用于下一次put,offer,add等方法使用
int putIndex;
// 记录队列中元素数量
int count;
// 用于控制并发访问时保证线程安全的锁
final ReentrantLock lock;
// 用于队列空时阻塞和唤醒等待线程的条件
private final Condition notEmpty;
// 用于队列满时阻塞和唤醒等待线程的条件
private final Condition notFull;
我们通过这些队列中的属性基本可以知道ArrayBlockingQueue中都有哪些重要信息,可以看出ArrayBlockingQueue就是使用Object[]
来存放元素的。
那么应该如何创建一个ArrayBlockingQueue呢?
构造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
默认的构造方法需要传入一个int类型的capacity
表示该队列的容量。在该构造方法中会调用另一个构造方法,传入一个默认值false。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
从这个方法我们看出传入的false表示会在内部用于创建一个ReentrantLock对象,我们都知道ReentrantLock支持公平和非公平的实现,我们猜想一下,这里的这个fair值是不是表示该阻塞队列对于阻塞排队的线程支持公平和非公平的策略呢?这里先卖个关子,在后面的方法中我们具体说。
除了这两种创建的方式,ArrayBlockingQueue还支持传入一个Collection集合。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 先创建一个ArrayBlockingQueue实例
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
// 循环将collection中的元素放入queue中
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
// 如果collection的元素个数超出queue的容量大小,会抛出异常
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
添加元素
先来看看添加一个新元素到ArrayBlockingQueue
是如何实现的,怎样保证线程安全的。
add(e)
public boolean add(E e) {
// 调用父类中的add(e)方法
return super.add(e);
}
public boolean add(E e) {
// 这里会直接调用offer(e)方法,如果offer方法返回false,则直接抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add方法的实现逻辑本质上是对offer方法套了一层壳,如果offer方法返回false时,抛出异常。所以我们直接看offer方法的实现就好。
offer(e)
public boolean offer(E e) {
// 这里先判断空,如果e为空会抛出空指针异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,保证入队操作的原子性
lock.lock();
try {
// 队列满时直接返回false
if (count == items.length)
return false;
else {
// 元素入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
可以看到offer方法的逻辑还是比较简单的,先检查入参不能为空,然后加锁保证入队操作的原子性,在获取锁成功后入队,如果队列已满则直接返回false,所以offer方法并不会阻塞。
put(e)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 可被中断方式获取锁
lock.lockInterruptibly();
try {
while (count == items.length)
// 队列满时会阻塞
notFull.await();
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}
put方法和offer方法唯一的区别,就是会在队列满的时候使用Condition条件对象notFull
阻塞等待。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 入队成功,唤醒等待的移除元素操作线程
notEmpty.signal();
}
在enqueue方法中才会完成对队列中的数组元素的赋值动作,完成之后唤醒阻塞等待的移除元素操作线程。
offer(e,time,unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
// 加锁之前先获取需要等待的时间值
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
// 时间小于等于0时,返回false
if (nanos <= 0)
return false;
// 阻塞等待指定时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
offer(e,time,unit)
方法与offer(e)方法相比,主要时多了一个等待时间,会在时间到达时如果没有空间添加元素返回false。
移除元素
ArrayBlockingQueue中移除元素的方法主要有remove(),poll(),take(),poll(time,unit)四个。这几个方法的实现逻辑都比较简单,这里不在单独贴代码 。我们来看一下阻塞方法take()
的实现即可。
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
while (count == 0)
// 如果元素数量==0,表示队列中为空,则阻塞等待
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 取出元素之后,唤醒其他等待线程。
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表结构的阻塞队列,可以在创建时指定边界大小,也可以不指定,在不指定边界时容量为
Integer.MAX_VALUE
。
重要属性
我们先来看看在LinkedBlockingQueue
中都有哪些重要的属性。
// 内部类Node节点,用来存放链表中的元素
static class Node<E> {
// 节点元素
E item;
// 当前节点的下一个节点,如果为空表示没有下一个节点
Node<E> next;
Node(E x) { item = x; }
}
// 队列的容量
private final int capacity;
// 队列中元素的数量
private final AtomicInteger count = new AtomicInteger();
// 头节点
transient Node<E> head;
// 最后一个节点
private transient Node<E> last;
// 获取元素时控制线程安全的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 添加元素时控制线程安全的锁
private final ReentrantLock putLock = new ReentrantLock();
// 控制消费者的条件
private final Condition notEmpty = takeLock.newCondition();
// 控制生产者的条件
private final Condition notFull = putLock.newCondition();
在LinkedBlockingQueue
中使用Node
来存放元素,和指向下一个节点的链表指针。
构造方法
在LinkedBlockingQueue
的构造方法中,会创建一个创建一个不存放元素的Node
对象赋值给head
和last
。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 创建一个不存放元素的Node对象赋值给head和last
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
// 入队
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
添加元素
offer(e)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
// 使用putLock加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 入队
enqueue(node);
// 数量+1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 唤醒一个生产者线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 唤醒消费者线程
signalNotEmpty();
// 入队失败情况会返回false
return c >= 0;
}
对于链表结构的LinkedBlockingQueue
来说,入队操作要简单很多,只需要将node节点挂在最后一个节点last的next,然后将自己赋值给last。
private void enqueue(Node<E> node) {
last = last.next = node;
}
put(e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 使用putLock加锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 如果队列容量已使用完则阻塞
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
对比结果也和我们最开始的方法汇总表格一样,offer(e)
方法会在入队时如果队列已满直接返回false,而put(e)
会一直阻塞等待,知道入队成功。
add(e)
方法和offer(e,time,unit)
方法实现逻辑上没有特殊之处,这里不再放源码。
移除元素
poll()
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 使用takeLock加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 还有元素时唤醒一个生产者线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
// 唤醒生产者线程
signalNotFull();
return x;
}
poll()方法会在元素出队时如果没有元素则直接返回null。
// 出队方法
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 使用takeLock加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//阻塞等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 还有元素时唤醒一个消费者线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
// 唤醒生产者线程
signalNotFull();
return x;
}
同样,take方法会在没有元素时一直等待。
对比
我们来对比一下ArrayBlockingQueue和LinkedBlockingQueue都有哪些区别。
- ArrayBlockingQueue基于数组实现,LinkedBlockingQueue基于链表实现
- ArrayBlockingQueue在添加和移除元素的操作中共用一把锁,LinkedBlockingQueue使用
takeLock
和putLock
两把锁 - ArrayBlockingQueue在添加和移除元素时直接使用元素的类型处理,LinkedBlockingQueue需要转成Node对象
- ArrayBlockingQueue创建时必须指定容量,LinkedBlockingQueue可以不指定,默认容量为
Integer.MAX_VALUE
由于LinkedBlockingQueue使用两把锁将入队操作和出队操作分离,这会大大提高队列的吞吐量,在高并发情况下生产者和消费者可以并行处理,提高并发性能。
但是LinkedBlockingQueue默认是无界队列,要小心内存溢出风险,所以最好在创建时指定容量大小。
BlockingQueue接口的实现类除了本期介绍的这两种,还有PriorityBlockingQueue
,SynchronousQueue
,LinkedBlockingDeque
等,每一个都有它独特的特性和使用场景,后面我们再单独深入解析。
好的,本期内容就到这里,我们下期见,关注我的公众号【小黑说Java】,更多干货内容。