# 阻塞队列 - 双锁实现 3 - 最后阶段

到此 我们还没有讲述完 双锁实现 因为 还有一段 最为 精华 的代码没有加上,是什么呢?

我们当时为什么使用 双锁来实现?

使用双锁 它的性能 要比使用 单锁 高,因为 两个线程之间是可以同时 获取自己的锁 然后去同时工作 而不用等对方

我们接下来的优化就是 减少 唤醒时 加锁的次数 也就是 offer 方法 中唤醒时 我尽量要把它的加锁次数 降到最低 类似的 poll 也是

我们考虑如下情况:

假设来了 3 个 poll 线程 poll_1 poll_2 poll_3 它们要去获取元素但是队列是空的,如下代码成立

while(isEmpty())
    headWaits.await();

它会都进入了 阻塞

而后 又来了 3 个 offer 线程,每个 offer 线程都会向队列去添加一个元素

如果是按照原来的方式就是 offer_1 添加元素后 拿到 headLock 锁 然后 唤醒 poll_1 线程 然后释放锁 让 poll_1 去 取元素

offer_2 做类似的操作 offer_3 也做类似的操作

那么按照原来的方式呢 3 个 offer 线程 加了 3 次锁 做了 3 次 唤醒操作,但是我们现在想要 减少 offer 线程 使用 headLock 锁的机会 (次数) 给它使用 haedLock 锁的次数尽量降低,所以我们想出了这样一个策略 如下:

offer 线程有 3 个 我们不让 3 个 offer 线程都去执行 headLock 加锁 ,headWaits.signal 唤醒操作,我们只让其中一个线程做这件事儿,比如说 offer_1 它去执行 headLock 加锁,headWaits.signal 唤醒 。其它两个线程 offer_2 和 offer_3 它们只做添加元素操作就行了它们不用去做加锁唤醒,那么它只能 唤醒 poll_1 这时 我们再让 poll_1 去唤醒 poll_2 让 poll_2 呢 再去唤醒 poll_3 这样就可以了。

也就是让 offer 线程 它只做一次 唤醒 剩余唤醒的活儿 就交给 poll 线程这边来完成了,这个思想我们称之为 级联通知

# 实现代码如下:

在如下代码中 的 offer 和 poll 方法中 分别定义了 c 变量 用于表示 是否要唤醒 poll 的 阻塞线程

offer 中的 c 变量的解释:当我们第一次添加元素时那么 肯定 是先由于 队列是空的 poll 线程才会被阻塞 所以 我们用 c 变量来 判断 如果 size 的值也就是 元素个数为 0 那么就赋值给 c 变量 下面进行判断 如果 c == 0 说明我们需要去唤醒一下 poll 线程 剩下的 offer 线程 size 肯定就不是 0 了 那么就不需要去唤醒 poll 线程,就只做添加元素操作即可

poll 中 c 变量的解释:通过 size 赋值给 c 判断剩余的元素还有多少 如果 元素个数 大于 1 那么就自己唤醒 poll 线程中阻塞的线程即可

如下是完整的代码:

/**
 * 双锁实现
 *
 * @param <E> 元素类型
 */
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueueImpl<E>
{
    private final E[] array;
    private int head;
    private int tail;
    // 创建 原子变量 保证 size 在 多线程下 的线程安全问题
    private AtomicInteger size = new AtomicInteger();
    // 创建两个锁 用于保护不同的线程 并且 创建对应的 条件变量让 某个线程不满足某条件后就 阻塞线程进入等待状态
    // 保护尾指针 住要给 offer 方法用
    private ReentrantLock tailLock = new ReentrantLock();
    // 队列满了就不能添加了 那就到 tailWaits 中等待
    Condition tailWaits = tailLock.newCondition();
    // 保护头指针 主要给 poll 方法用
    private ReentrantLock headLock = new ReentrantLock();
    // 队列空了就不能取元素了 那就到 headWaits 中等待
    Condition headWaits = headLock.newCondition();
    public BlockingQueue2(int capacity)
    {
        this.array = (E[]) new Object[capacity];
    }
    private boolean isEmpty()
    {
        return size.get() == 0;
    }
    private boolean isFull()
    {
        return size.get() == array.length;
    }
    @Override
    public void offer(E e) throws InterruptedException
    {
        // 加锁 (阻塞时可以随时打断)
        tailLock.lockInterruptibly();
        // 添加前的元素个数 通过这个变量 我们 判断是否 让某一个线程去 唤醒 poll 线程而不是都去唤醒
        int c;
        try
        {
            // 队列满就等待
            while(isFull())
                tailWaits.await();
            // 队列不满则将元素添加到尾部
            array[tail] = e;
            if(++tail == array.length)
                tail = 0;
            // 元素个数增加
            //getAndIncrement 效果就是 size++ 保证 size 的 原子性
            c = size.getAndIncrement(); // size++;
            /**
             * 1. 读取成员变量 size 的值
             * 2. 自增
             * 3. 结果写回成员变量 size
             */
            // 判断如果 c + 1 小于 数组长度说明还有 添加空间 那么就 级联唤醒 offer 线程
            if(c + 1 < array.length)
                tailWaits.signal();
        }finally
        {
            // 释放锁
            tailLock.unlock();
        }
        // 如果从 0 变为非空,由 offer 这边唤醒等待非空的 poll 线程
        //    0 -> 1  1 -> 2  2 -> 3
        if(c == 0)
        {
            // 加上 headWaits.signal (); 需要的 headLock 锁
            headLock.lockInterruptibly();
            try
            {
                // 必须 配合 配对的锁一起使用否则会就报错
                headWaits.signal();
            }finally
            {
                // 释放 headLock 的锁
                headLock.unlock();
            }
        }
    }
    @Override
    public E poll() throws InterruptedException
    {
        E e;
        // 加锁 (阻塞时可以随时打断)
        headLock.lockInterruptibly();
        // 取走前的元素个数
        int c;
        try
        {
            // 1. 队列为空则等待
            // 使用 while 来判断队列是否满了 防止 虚假唤醒
            while(isEmpty())
                headWaits.await();
            // 2. 非空则出队列
            e = array[head];
            array[head] = null; // help GC
            if(++head == array.length)
                head = 0;
            // 元素个数 自减
            //getAndDecrement 效果就是 size-- 保证 size 的 原子性
            c = size.getAndDecrement(); // size--;
            /**
             * 1. 读取成员变量 size 的值
             * 2. 自减
             * 3. 结果写回成员变量 size
             */
            // 判读 如果 c 也就是元素个数 不小于 1 说明还有元素可以取 那么就 级联唤醒 poll 线程
            if(c > 1)
                headWaits.signal();
        }finally
        {
            // 释放锁
            headLock.unlock();
        }
        // 队列从满 -> 不满时 由 poll 唤醒等待不满的 offer 线程
        if(c == array.length)
        {
            // 加上 tailWaits.signal (); 需要的 tailLock 锁
            tailLock.lockInterruptibly();
            try
            {
                // 必须 配合 配对的锁一起使用否则会就报错
                tailWaits.signal();
            }finally
            {
                // 释放 headLock 的锁
                tailLock.unlock();
            }
        }
        return e;
    }
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException
    {
        return false;
    }
    @Override
    public String toString()
    {
        return "BlockingQueue2{" + "array=" + Arrays.toString(array) + '}';
    }
    public static void main(String[] args) throws InterruptedException
    {
        BlockingQueue2<String> queue = new BlockingQueue2<>(3);
        queue.offer("元素1");
        queue.offer("元素2");
        new Thread(()->{
            try {
                queue.offer("元素3");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "offer").start();
        new Thread(()->{
            try {
                queue.poll();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "poll").start();
    }
}