# 阻塞队列 - 单锁实现 3 - 完整实现
阻塞队列的 1 篇和 2 篇 都看完了 下面就是完整的实现代码了
先编写一个 阻塞队列接口 如下:
/** | |
目前队列存在的问题 | |
<ol> | |
<li > 很多场景要求 & lt;b > 分离 & lt;/b > 生产者、消费者两个 & lt;b > 角色 & lt;/b>、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题 & lt;/li> | |
<li > 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试 & lt;/li> | |
<li > 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试 & lt;/li> | |
</ol> | |
解决方法 | |
<ol> | |
<li > 用锁保证线程安全 & lt;/li> | |
<li > 用条件变量让 poll 或 offer 线程进入 & lt;b > 等待 & lt;/b > 状态,而不是不断循环尝试,让 CPU 空转 & lt;/li> | |
</ol> | |
*/ | |
public interface BlockingQueueImpl<E> { // 阻塞队列 | |
void offer(E e) throws InterruptedException; | |
boolean offer(E e, long timeout) throws InterruptedException; | |
E poll() throws InterruptedException; | |
} |
我们先来讲解一下 阻塞队列接口中的 offer 和 poll 方法
首先它们内部都会用到 条件变量 ,条件变量会 抛出 检查异常所以 函数声明 中 都进行了 throws 抛出异常
offer:使用了 void 返回值 类型,因为 如果 队列满了就让当前线程阻塞不需要返回状态了
实现代码:
public class BlockingQueue<E> implements BlockingQueueImpl<E> | |
{ | |
// 定义数组 | |
private final E[] array; | |
// 定义 head 和 tail 指针 | |
private int head; | |
private int tail; | |
// 队列中元素个数 | |
private int size; | |
// 保证 队列的 线程安全 创建一个 锁对象 | |
private ReentrantLock lock = new ReentrantLock(); | |
/** | |
* 两个条件变量 | |
* 在我们将来 offer 以及 poll 这两个方法 暂时没有办法向队列添加 / 取出元素时让线程进入阻塞状态 | |
*/ | |
// 配合 poll 使用,在队列为空时 poll 方法没办法继续执行了,那么就放到 headWaits 中等待 | |
Condition headWaits = lock.newCondition(); | |
// 配合 offer 使用,在队列满时 offer 方法没办法继续执行了,那么就放到 tailWaits 中等待 | |
Condition tailWaits = lock.newCondition(); | |
// 通过构造函数 给数组 赋 初始值 | |
public BlockingQueue(int capacity) | |
{ | |
array = (E[]) new Object[capacity]; | |
} | |
// 判断 队列是否为空 | |
private boolean isEmpty() | |
{ | |
return size == 0; | |
} | |
// 判断 队列是否满了 | |
private boolean isFull() | |
{ | |
return size == array.length; | |
} | |
@Override | |
public void offer(E e) throws InterruptedException //poll 等待队列非空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
tailWaits.wait(); | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
@Override | |
public boolean offer(E e, long timeout) throws InterruptedException | |
{ | |
return false; | |
} | |
@Override | |
public E poll() throws InterruptedException //offer 等待线程为空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 判断队列是否为空,如果队列为空就将当前取出元素的线程进行阻塞 | |
while(isEmpty()) | |
{ | |
headWaits.wait(); | |
} | |
// 取出 head 处的元素 | |
E e = array[head]; | |
// 将当前取出的元素位置赋值为 null 让 GC 垃圾回收机制进行处理 | |
array[head] = null; // help GC | |
// 判断 head 是否指向了 数组长度,如果是那么就重置 head | |
if(++head == array.length) | |
head = 0; | |
// 每次取出元素就将 元素个数 自减 | |
size--; | |
// 取出元素后就表示 队列中被取出一个元素 那么此时队列肯定是不满了 那么就唤醒 tailWaits 中的阻塞对象让其进行添加元素 | |
tailWaits.signal(); | |
// 返回 取出的元素 | |
return e; | |
} | |
finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
} |
上面代码中确实是实现了 对于 队列的线程安全的需求了,但是呢 如果 offer 方法 线程等待后迟迟没有人 唤醒它呢 ,所以我们要实现一个可以给等待加上时间上限的 offer 方法
代码如下:
public class BlockingQueue<E> implements BlockingQueueImpl<E> | |
{ | |
// 定义数组 | |
private final E[] array; | |
// 定义 head 和 tail 指针 | |
private int head; | |
private int tail; | |
// 队列中元素个数 | |
private int size; | |
// 保证 队列的 线程安全 创建一个 锁对象 | |
private ReentrantLock lock = new ReentrantLock(); | |
/** | |
* 两个条件变量 | |
* 在我们将来 offer 以及 poll 这两个方法 暂时没有办法向队列添加 / 取出元素时让线程进入阻塞状态 | |
*/ | |
// 配合 poll 使用,在队列为空时 poll 方法没办法继续执行了,那么就放到 headWaits 中等待 | |
Condition headWaits = lock.newCondition(); | |
// 配合 offer 使用,在队列满时 offer 方法没办法继续执行了,那么就放到 tailWaits 中等待 | |
Condition tailWaits = lock.newCondition(); | |
// 通过构造函数 给数组 赋 初始值 | |
public BlockingQueue(int capacity) | |
{ | |
array = (E[]) new Object[capacity]; | |
} | |
// 判断 队列是否为空 | |
private boolean isEmpty() | |
{ | |
return size == 0; | |
} | |
// 判断 队列是否满了 | |
private boolean isFull() | |
{ | |
return size == array.length; | |
} | |
@Override | |
public void offer(E e) throws InterruptedException //poll 等待队列非空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
tailWaits.wait(); | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
// 在上面原有的 offer 的基础上多了一个选择 | |
/** | |
* 原有的 offer 方法当 队列 满了之后就会进入等待,那么问题就来了 它会等多久呢? | |
* 它需要等到有其它的线程来唤醒它 比如说 下面的 poll 线程 | |
* 如果我不想等这么久,比如 迟迟等不到 有线程来 唤醒,我想给它加一个等待的时间上限呢? | |
* 这时我们就需要用到 下面新的 offer 方法了 | |
* @param e 要添加的元素 | |
* @param timeout 等待的时间上限 (超时 时间) | |
* @return 如果超时没有添加成功就返回 false 添加成功返回 true | |
* @throws InterruptedException | |
*/ | |
@Override | |
public boolean offer(E e, long timeout) throws InterruptedException | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 将毫秒转换为纳秒 方便 传值 | |
// 因为下面的 awaitNanos 需要的是 纳秒 而我们传入 纳秒是不方便的 所以我们使用上面的 API 将用户传入的毫秒转换为纳秒使用 | |
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout); | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
//awaitNanos 接收一个 时间上限的等待 | |
// 时间单位是什么呢? 就是 它本身后半段语句 Nanos 就是 纳秒的意思 | |
// 纳秒使用呢 用户传入是不方便的,我们通过上面的 TimeUnit.MILLISECONDS.toNanos 来将 用户传入的毫秒转换为纳秒使用 | |
tailWaits.awaitNanos(nanos); | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
return false; | |
} | |
@Override | |
public E poll() throws InterruptedException //offer 等待线程为空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 判断队列是否为空,如果队列为空就将当前取出元素的线程进行阻塞 | |
while(isEmpty()) | |
{ | |
headWaits.wait(); | |
} | |
// 取出 head 处的元素 | |
E e = array[head]; | |
// 将当前取出的元素位置赋值为 null 让 GC 垃圾回收机制进行处理 | |
array[head] = null; // help GC | |
// 判断 head 是否指向了 数组长度,如果是那么就重置 head | |
if(++head == array.length) | |
head = 0; | |
// 每次取出元素就将 元素个数 自减 | |
size--; | |
// 取出元素后就表示 队列中被取出一个元素 那么此时队列肯定是不满了 那么就唤醒 tailWaits 中的阻塞对象让其进行添加元素 | |
tailWaits.signal(); | |
// 返回 取出的元素 | |
return e; | |
} | |
finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
} |
注意:我们使用的 给等待加上时间上限的 offer 方法是有问题的,它有 虚假唤醒的问题。
比如 timeout 参数 用户传入了 时间上限为 5 秒,5 秒之内如果队列还是满的那么应该返回 false 表示没有添加成功
我们考虑虚假唤醒 比如时间已经过去了 1 秒了,过去 1 秒之后 poll 线程 取走一个元素此时 队列布满了 poll 线程把 tailWaits 中等待的线程唤醒,然后到了 tailWaits.awaitNanos (nanos); 唤醒,但是这次唤醒是个虚假唤醒 也就是并发还有另外一个 offer 线程,人家又把队列放满了 当 while (isFull) 再次循环的时候 isFull 还是 true 那 还得接着等待,正常 等待是 从 4 秒开始等待就行了,因为已经过去了 1 秒了,但是按照现在的代码它是从参数传过来的时间值,它是每次循环的时候都会等待 5 秒,所以我们需要对等待时间做一个调整
上面的问题其实很好解决,因为 人家 API 已经帮我们考虑好了这个 tailWaits.awaitNanos (nanos) 会有一个返回值,这个返回值就代表等待的剩余时间,比如说已经 等了 1 秒 剩下还有 4 秒,那么返回值就是 剩下的 4 秒时间
所以我们只需要判断 nanos 时间是否 等完了 也就是 是否小于等于 0 了 表示没有剩余时间了,这个队列还是满的 那么就放弃等待 返回 false 表示添加失败了
对应代码:
//nanos 小于等于表示没有等待时间了 超时了 返回 false 表示 添加失败了 | |
if(nanos <= 0) | |
return false; |
如果 nanos 是大于 0 的说明 遭遇了虚假唤醒,还有一些时间需要继续等待 那么我就继续等 上次更新后的 nanos 时间
对应代码:
// 每次都将 nanos 时间更新为 当前剩余的时间 | |
nanos = tailWaits.awaitNanos(nanos); |
如果 队列不满 并且 元素添加 成功了 那么就返回 true 表示添加元素成功了
offer 方法完整代码如下:
// 在上面原有的 offer 的基础上多了一个选择 | |
/** | |
* 原有的 offer 方法当 队列 满了之后就会进入等待,那么问题就来了 它会等多久呢? | |
* 它需要等到有其它的线程来唤醒它 比如说 下面的 poll 线程 | |
* 如果我不想等这么久,比如 迟迟等不到 有线程来 唤醒,我想给它加一个等待的时间上限呢? | |
* 这时我们就需要用到 下面新的 offer 方法了 | |
* @param e 要添加的元素 | |
* @param timeout 等待的时间上限 (超时 时间) | |
* @return 如果超时没有添加成功就返回 false 添加成功返回 true | |
* @throws InterruptedException | |
*/ | |
@Override | |
public boolean offer(E e, long timeout) throws InterruptedException | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 将毫秒转换为纳秒 方便 传值 | |
// 因为下面的 awaitNanos 需要的是 纳秒 而我们传入 纳秒是不方便的 所以我们使用上面的 API 将用户传入的毫秒转换为纳秒使用 | |
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout); | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
{ | |
//awaitNanos 接收一个 时间上限的等待 | |
// 时间单位是什么呢? 就是 它本身后半段语句 Nanos 就是 纳秒的意思 | |
// 纳秒使用呢 用户传入是不方便的,我们通过上面的 TimeUnit.MILLISECONDS.toNanos 来将 用户传入的毫秒转换为纳秒使用 | |
//nanos 小于等于表示没有等待时间了 超时了 返回 false 表示 添加失败了 | |
if(nanos <= 0) | |
return false; | |
// 每次都将 nanos 时间更新为 当前剩余的时间 | |
nanos = tailWaits.awaitNanos(nanos); | |
} | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
return true; | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} |
完整的代码:
public class BlockingQueue<E> implements BlockingQueueImpl<E> | |
{ | |
// 定义数组 | |
private final E[] array; | |
// 定义 head 和 tail 指针 | |
private int head; | |
private int tail; | |
// 队列中元素个数 | |
private int size; | |
// 保证 队列的 线程安全 创建一个 锁对象 | |
private ReentrantLock lock = new ReentrantLock(); | |
/** | |
* 两个条件变量 | |
* 在我们将来 offer 以及 poll 这两个方法 暂时没有办法向队列添加 / 取出元素时让线程进入阻塞状态 | |
*/ | |
// 配合 poll 使用,在队列为空时 poll 方法没办法继续执行了,那么就放到 headWaits 中等待 | |
Condition headWaits = lock.newCondition(); | |
// 配合 offer 使用,在队列满时 offer 方法没办法继续执行了,那么就放到 tailWaits 中等待 | |
Condition tailWaits = lock.newCondition(); | |
// 通过构造函数 给数组 赋 初始值 | |
public BlockingQueue(int capacity) | |
{ | |
array = (E[]) new Object[capacity]; | |
} | |
// 判断 队列是否为空 | |
private boolean isEmpty() | |
{ | |
return size == 0; | |
} | |
// 判断 队列是否满了 | |
private boolean isFull() | |
{ | |
return size == array.length; | |
} | |
@Override | |
public void offer(E e) throws InterruptedException //poll 等待队列非空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
tailWaits.wait(); | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
// 在上面原有的 offer 的基础上多了一个选择 | |
/** | |
* 原有的 offer 方法当 队列 满了之后就会进入等待,那么问题就来了 它会等多久呢? | |
* 它需要等到有其它的线程来唤醒它 比如说 下面的 poll 线程 | |
* 如果我不想等这么久,比如 迟迟等不到 有线程来 唤醒,我想给它加一个等待的时间上限呢? | |
* 这时我们就需要用到 下面新的 offer 方法了 | |
* @param e 要添加的元素 | |
* @param timeout 等待的时间上限 (超时 时间) | |
* @return 如果超时没有添加成功就返回 false 添加成功返回 true | |
* @throws InterruptedException | |
*/ | |
@Override | |
public boolean offer(E e, long timeout) throws InterruptedException | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 将毫秒转换为纳秒 方便 传值 | |
// 因为下面的 awaitNanos 需要的是 纳秒 而我们传入 纳秒是不方便的 所以我们使用上面的 API 将用户传入的毫秒转换为纳秒使用 | |
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout); | |
// 使用 while 来进行 重复检查 队列是否满了,如果队列满了就阻塞线程 | |
while(isFull()) | |
{ | |
//awaitNanos 接收一个 时间上限的等待 | |
// 时间单位是什么呢? 就是 它本身后半段语句 Nanos 就是 纳秒的意思 | |
// 纳秒使用呢 用户传入是不方便的,我们通过上面的 TimeUnit.MILLISECONDS.toNanos 来将 用户传入的毫秒转换为纳秒使用 | |
//nanos 小于等于表示没有等待时间了 超时了 返回 false 表示 添加失败了 | |
if(nanos <= 0) | |
return false; | |
// 每次都将 nanos 时间更新为 当前剩余的时间 | |
nanos = tailWaits.awaitNanos(nanos); | |
} | |
// 将 要添加的元素 添加到 array [tail] 位置 | |
array[tail] = e; | |
// 判断 tail 是否 指向了 数组长度 也就是是否满了 如果满了 就将 tail 重置为 0 | |
if(++tail == array.length) | |
tail = 0; | |
// 自增 增加 队列中元素个数 (记录) | |
size++; | |
// 由于 队列中 添加了一个元素 队列部位空了 就唤醒 headWaits 中的阻塞线程让其进行取出元素 | |
headWaits.signal(); | |
return true; | |
}finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
@Override | |
public E poll() throws InterruptedException //offer 等待线程为空 | |
{ | |
// 加锁 (阻塞时可以随时打断) | |
lock.lockInterruptibly(); | |
try | |
{ | |
// 判断队列是否为空,如果队列为空就将当前取出元素的线程进行阻塞 | |
while(isEmpty()) | |
{ | |
headWaits.wait(); | |
} | |
// 取出 head 处的元素 | |
E e = array[head]; | |
// 将当前取出的元素位置赋值为 null 让 GC 垃圾回收机制进行处理 | |
array[head] = null; // help GC | |
// 判断 head 是否指向了 数组长度,如果是那么就重置 head | |
if(++head == array.length) | |
head = 0; | |
// 每次取出元素就将 元素个数 自减 | |
size--; | |
// 取出元素后就表示 队列中被取出一个元素 那么此时队列肯定是不满了 那么就唤醒 tailWaits 中的阻塞对象让其进行添加元素 | |
tailWaits.signal(); | |
// 返回 取出的元素 | |
return e; | |
} | |
finally | |
{ | |
// 释放锁 | |
lock.unlock(); | |
} | |
} | |
@Override | |
public String toString() | |
{ | |
return "BlockingQueue{" + "array=" + Arrays.toString(array) + '}'; | |
} | |
} |
测试代码:
public class TestBlockingQueue1 { | |
public static void main(String[] args) throws InterruptedException { | |
BlockingQueue<String> queue = new BlockingQueue<>(3); | |
Thread t1 = new Thread(()->{ | |
try { | |
System.out.println(System.currentTimeMillis() + " begin"); | |
queue.offer("任务1"); | |
System.out.println(queue); | |
queue.offer("任务2"); | |
System.out.println(queue); | |
queue.offer("任务3"); | |
System.out.println(queue); | |
queue.offer("任务4", 5000); | |
System.out.println(queue); | |
System.out.println(System.currentTimeMillis() + " end"); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
},"生产者"); | |
t1.start(); | |
Thread.sleep(2000); | |
queue.poll(); | |
} | |
} |
打印结果:
1705760500582 begin
BlockingQueue{array=[任务1, null, null]}
BlockingQueue{array=[任务1, 任务2, null]}
BlockingQueue{array=[任务1, 任务2, 任务3]}
# 等待2秒后 poll 线程从队列中取出了一个元素 offer(e, timeout) 方法得以执行 添加了 任务4
BlockingQueue{array=[任务4, 任务2, 任务3]}
1705760502589 end