# 阻塞队列 - 双锁实现 1

# 回忆为什么加锁

之前在 阻塞队列 - 单锁实现中 我们使用 一把锁 保证了 队列 的线程安全,下面我们继续探讨下看看 这把锁有没有 优化的可能

我们先回忆我们会为什么要加如下这把锁在 offer 方法中呢?

@Override
    public void offer(E e) throws InterruptedException //poll 等待队列非空
    {
        // 加锁 (阻塞时可以随时打断)
        lock.lockInterruptibly();
        ... [省略 代码]

就是因为在 多线程 下 如果我们不加这把锁那么方法内的代码在执行时就有可能受到其它线程的影响产生执行交错的情况,比如说两个线程都来执行 offer 方法的代码 - 如下图:

image-20240120223221812

第一个红色的线程它执行完了 1 array [tail] = e 代码还没来得及执行 4 ++tail 它就切换到了 蓝色线程 去执行了 2 3 人家执行完了 CPU 才切换回去执行红色线程的 4 那这样的指令交错呢就可能导致最终的结果不正确,因此为了避免多线程下产生这种指令交错 我们需要加上锁 这样就可以保证多行代码执行的 原子性

上面讲述就是我们为什么要加锁

# 问题

但是加锁后呢会有新的问题

什么问题呢?

我们 offer 方法呢 它需要加锁 poll 方法也需要加锁 它们使用的是 同一把锁

image-20240120224140926

那么这两个方法在多线程执行时会不会相互影响呢?

比如说有一个 offer 线程它要到队列的尾部添加元素,又有一个 poll 线程 它要到队列的 头部去 取走元素,那么 offer 线程 添加元素的同时 poll 线程能执行吗?

答: 不能执行,因为 offer 线程已经加了锁了,它除非等 offer 线程把代码执行完把锁释放了 poll 线程才能获得锁

所以现在的效果就是 offer 和 poll 方法 它们是相互影响的,一个在执行的时候另一个就得阻塞

其实这样并不合理,因为 offer 线程 它主要就是操作 队列的尾部 ,都是 tail 这个尾指针去读取 尾指针索引处的值 去让尾指针自增,它操作的都是队列的尾部

而 poll 线程它操作的都是 队列的 头部 操作的都是 head 指针,按理来说 它们 应该互不干扰

但是我们现在使用一把锁它们就冲突了,一个执行的时候另一个就得阻塞,那怎么解决这个问题呢,下面就需要使用到双锁来实现了

# 分析

我们将 offer 线程和 poll 线程分别使用不同的锁 当 offer 线程上锁时并不会影响到 poll 线程,所以就有如下代码:

/**
 * 双锁实现
 *
 * @param <E> 元素类型
 */
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueueImpl<E>
{
	 // 定义数组
    private final E[] array;
    // 定义 head 和 tail 指针
    private int head;
    private int tail;
    // 队列中元素个数
    private int size;
    // 创建两个锁 用于保护不同的线程 并且 创建对应的 条件变量让 某个线程不满足某条件后就 阻塞线程进入等待状态
    // 保护尾指针 住要给 offer 方法用
    private ReentrantLock tailLock = new ReentrantLock();
    // 队列满了就不能添加了 那就到 tailWaits 中等待
    Condition tailWaits = tailLock.newCondition();
    // 保护头指针 主要给 poll 方法用
    private ReentrantLock headLock = new ReentrantLock();
    // 队列空了就不能取元素了 那就到 headWaits 中等待
    Condition headWaits = headLock.newCondition();
    public BlockingQueue2(int capacity)
    {
        this.array = (E[]) new Object[capacity];
    }
    private boolean isEmpty()
    {
        return size == 0;
    }
    private boolean isFull()
    {
        return size == array.length;
    }
    @Override
    public String toString()
    {
        return null;
    }
    @Override
    public void offer(E e) throws InterruptedException
    {
    }
    @Override
    public E poll() throws InterruptedException
    {
        return null;
    }
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException
    {
        return false;
    }
    public static void main(String[] args) throws InterruptedException
    {
    }
}

实现了 offer 方法和 poll 方法的代码如下:

/**
 * 双锁实现
 *
 * @param <E> 元素类型
 */
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueueImpl<E>
{
    private final E[] array;
    private int head;
    private int tail;
    private int size;
    // 创建两个锁 用于保护不同的线程 并且 创建对应的 条件变量让 某个线程不满足某条件后就 阻塞线程进入等待状态
    // 保护尾指针 住要给 offer 方法用
    private ReentrantLock tailLock = new ReentrantLock();
    // 队列满了就不能添加了 那就到 tailWaits 中等待
    Condition tailWaits = tailLock.newCondition();
    // 保护头指针 主要给 poll 方法用
    private ReentrantLock headLock = new ReentrantLock();
    // 队列空了就不能取元素了 那就到 headWaits 中等待
    Condition headWaits = headLock.newCondition();
    public BlockingQueue2(int capacity)
    {
        this.array = (E[]) new Object[capacity];
    }
    private boolean isEmpty()
    {
        return size == 0;
    }
    private boolean isFull()
    {
        return size == array.length;
    }
    @Override
    public String toString()
    {
        return null;
    }
    @Override
    public void offer(E e) throws InterruptedException
    {
        // 加锁 (阻塞时可以随时打断)
        tailLock.lockInterruptibly();
        try
        {
            // 队列满就等待
            while(isFull())
                tailWaits.await();
            // 队列不满则将元素添加到尾部
            array[tail] = e;
            if(++tail == array.length)
                tail = 0;
            // 元素个数增加
            size++;
        }finally
        {
            // 释放锁
            tailLock.unlock();
        }
    }
    @Override
    public E poll() throws InterruptedException
    {
        // 加锁 (阻塞时可以随时打断)
        headLock.lockInterruptibly();
        try
        {
            // 1. 队列为空则等待
            // 使用 while 来判断队列是否满了 防止 虚假唤醒
            while(isEmpty())
                headWaits.await();
            // 2. 非空则出队列
            E e = array[head];
            array[head] = null; // help GC
            if(++head == array.length)
                head = 0;
            // 元素个数 自减
            size--;
            return e;
        }finally
        {
            // 释放锁
            headLock.unlock();
        }
    }
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException
    {
        return false;
    }
   
   @Override
    public String toString()
    {
        return "BlockingQueue2{" + "array=" + Arrays.toString(array) + '}';
    }
}

测试代码如下:

public class Testdemo
{
    public static void main(String[] args) throws InterruptedException
    {
        BlockingQueue2<String> queue = new BlockingQueue2<>(3);
        queue.offer("任务1");
        new Thread(() ->
        {
            try
            {
                queue.offer("任务2");
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }, "offer").start();
        new Thread(() ->
        {
            try
            {
                System.out.println(queue.poll());
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }, "poll").start();
    }
}

下面我们 debug 查看下 双锁的 线程执行情况是怎么样的:

debug 启动

image-20240121100456104

此时我们可以看到 队列中 存在了 任务 1 元素,因为 在测试的类中 先存入了一个 任务 1

那么 offer 和 poll 之间会互相干扰吗? 先看下 offer 的线程执行情况

image-20240121100648493

进入 offer 方法后 并获取 锁

那么这时 offer 线程 获取了 tailLock 的锁 那么它会不会影响 poll 线程的锁呢?我们看下 Poll 线程的情况

image-20240121100948298

image-20240121101014230

我们接着向下运行进入 poll 方法

image-20240121101047533

它也同时 获取了 headLock 锁 执行下面代码操作,它们之间互不干扰 你放你的 任务 2 我取我的 任务 1

# 现有代码的问题

有什么问题呢?

问题就出现在 offer 和 poll 的 修改 size 那里,修改 size 看去是 一行代码,但实际上它的底层是分成了 三步 才能完成 这个自增或者自减的操作

哪三步呢?如下解释:

  1. 读取成员变量 size 的值
  2. 自增
  3. 结果写回成员变量 size

既然 size++ 和 size- - 底层都是分成了多行代码才能完成的自增和自减操作,那么问题就来了,多线程下这些指令有没有可能出现交错的情况

答:可能的

比如下面举例:

假如 size 初始值为 5,那么 size++ 底层 先是读取了 5

1.  读取成员变量size的值  5
2.  自增
3.  结果写回成员变量size

它还没来得及 进行 +1 操作 这时 CPU 就切换到了 poll 中了而 poll 线程 中的 size- - 也是 先要读取 当前 size 的值

因为 offer 线程没来得及自增 所以 poll 线程读取到的 size 值 还是 5

1.  读取成员变量size的值  5
# 接下来做了自减
2.  自减 4
# 然后把 4 写回到成员变量了
3.  结果写回成员变量size 4

当 CPU 切换到 offer 线程后,由于 offer 线程 size++ 的底层第一步 读取了 size 的值也就是 5 所以它还是从 5 的基础上 + 1 结果写回成员变量为 6

此时呢对照图如下:

image-20240121103621860

这种情况对吗?两个线程一个 加了 一个元素 一个 取了一个元素 按道理 应该还是 5 啊 怎么就多了一个呢?

这种情况显然是不对的,这就说明 有一个计算结果被不正确的覆盖掉了

# 解释这种情况产生的原因:

这是因为 由于 offer 线程使用的是 tailLock 锁,而 poll 线程使用的是 headLock 锁 两个锁不同 可以同时执行,它们都在修改 size 那么这时就是线程不安全的,因为 它们同时上锁执行时 一个正在做 + 的同时 另一个做了 - 这样就会产生指令的交错 导致 错误的结果

那怎么保证 size 的线程安全呢?

下面就讲述一个知识叫 原子变量

# 原子变量

我们使用 原子 整数类 来保证 size 的安全

如下代码改变了 size 成员变量的 方式 改为了 原子变量类型

/**
 * 双锁实现
 *
 * @param <E> 元素类型
 */
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueueImpl<E>
{
    private final E[] array;
    private int head;
    private int tail;
    // 创建 原子变量 保证 size 在 多线程下 的线程安全问题
    private AtomicInteger size = new AtomicInteger();
    // 创建两个锁 用于保护不同的线程 并且 创建对应的 条件变量让 某个线程不满足某条件后就 阻塞线程进入等待状态
    // 保护尾指针 住要给 offer 方法用
    private ReentrantLock tailLock = new ReentrantLock();
    // 队列满了就不能添加了 那就到 tailWaits 中等待
    Condition tailWaits = tailLock.newCondition();
    // 保护头指针 主要给 poll 方法用
    private ReentrantLock headLock = new ReentrantLock();
    // 队列空了就不能取元素了 那就到 headWaits 中等待
    Condition headWaits = headLock.newCondition();
    public BlockingQueue2(int capacity)
    {
        this.array = (E[]) new Object[capacity];
    }
    private boolean isEmpty()
    {
        return size.get() == 0;
    }
    private boolean isFull()
    {
        return size.get() == array.length;
    }
    @Override
    public void offer(E e) throws InterruptedException
    {
        // 加锁 (阻塞时可以随时打断)
        tailLock.lockInterruptibly();
        try
        {
            // 队列满就等待
            while(isFull())
                tailWaits.await();
            // 队列不满则将元素添加到尾部
            array[tail] = e;
            if(++tail == array.length)
                tail = 0;
            // 元素个数增加
            //getAndIncrement 效果就是 size++ 保证 size 的 原子性
            size.getAndIncrement(); // size++;
            /**
             * 1. 读取成员变量 size 的值
             * 2. 自增
             * 3. 结果写回成员变量 size
             */
        }finally
        {
            // 释放锁
            tailLock.unlock();
        }
    }
    @Override
    public E poll() throws InterruptedException
    {
        // 加锁 (阻塞时可以随时打断)
        headLock.lockInterruptibly();
        try
        {
            // 1. 队列为空则等待
            // 使用 while 来判断队列是否满了 防止 虚假唤醒
            while(isEmpty())
                headWaits.await();
            // 2. 非空则出队列
            E e = array[head];
            array[head] = null; // help GC
            if(++head == array.length)
                head = 0;
            // 元素个数 自减
            //getAndDecrement 效果就是 size-- 保证 size 的 原子性
            size.getAndDecrement(); // size--;
            /**
             * 1. 读取成员变量 size 的值
             * 2. 自减
             * 3. 结果写回成员变量 size
             */
            return e;
        }finally
        {
            // 释放锁
            headLock.unlock();
        }
    }
    @Override
    public boolean offer(E e, long timeout) throws InterruptedException
    {
        return false;
    }
    @Override
    public String toString()
    {
        return "BlockingQueue2{" + "array=" + Arrays.toString(array) + '}';
    }
}

# 如何唤醒等待线程

我们先来考虑这么一个场景比如说 一开始队列是空的,现在有一个 poll 线程从队列中获取元素 此时不能获取到因为 队列是 空的,那么就会 进入 等待状态

while(isEmpty())
    headWaits.await(); //poll_1 线程进入等待

当然可能会有多个比如 poll_2 线程也来获取元素发现还是空的就进入了等待

while(isEmpty())
    headWaits.await(); //poll_1 线程进入等待  poll_2 线程进入等待

那么 headWaits 中等待的线程我们什么时候去唤醒它们呢?

得等待队列中有元素了才能唤醒 也就是 offer 线程添加了一个元素了

那么你可能会想到这么写:

@Override
public void offer(E e) throws InterruptedException
{
   // 加锁 (阻塞时可以随时打断)
   tailLock.lockInterruptibly();
   try
   {
      // 队列满就等待
      while(isFull())
         tailWaits.await();
      // 队列不满则将元素添加到尾部
      array[tail] = e;
      if(++tail == array.length)
         tail = 0;
      // 元素个数增加
      //getAndIncrement 效果就是 size++ 保证 size 的 原子性
      size.getAndIncrement(); // size++;
      /**
             * 1. 读取成员变量 size 的值
             * 2. 自增
             * 3. 结果写回成员变量 size
             */
      headWaits.signal();
   }finally
   {
      // 释放锁
      tailLock.unlock();
   }
}

那么这么写 对不对呢

通过如下代码 进行测试看看对不对:

@Test
    public void test()
    {
        BlockingQueue2<String> queue = new BlockingQueue2<>(3);
        new Thread(() ->
        {
            try
            {
                String poll = queue.poll();
                System.out.println("poll1 线程: " + poll);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }, "poll_1").start();
        new Thread(() ->
        {
            try
            {
                String poll = queue.poll();
                System.out.println("poll2 线程: " + poll);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }, "poll_2").start();
        new Thread(() ->
        {
            try
            {
                queue.offer("任务");
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }, "offer").start();
    }

我们在 queue.offer (“任务”) 处打上断点 让它先不进行添加,先让 前面两个 poll 线程进入阻塞,然后我们重点看下 添加一个元素后能不能唤醒 上面两个 poll 其中一个线程

debug 执行后可以看到 headWaits 中加入了两个 poll 线程的阻塞

image-20240121150845350

当我们运行后就会报错如下:

image-20240121150330974

报错位置的代码如下:

offer 方法中的 headWaits.signal 既然报错了

image-20240121150940910

为什么它会出错呢?如下分析:

其实之前的文章提到过,不管是 await (阻塞) 方法还是 signal (唤醒) 方法 ,它们都必须配合锁一起使用,也就是 在调用这两个方法之前必须都加好锁,它俩调用完了后必须解锁否则将来就会报错

你可能会想说,这里报错的这个 signal 已经配合锁一起使用了呀 进入 offer 方法前 执行了 tailLock.lockInterruptibly(); 结束时必须执行 tailLock.unlock(); ,为什么会报错了

注意:我们不光是要让 signal 配合锁一起使用 而且呢 与 signal 一起使用的锁 必须配对

上面 锁 必须配对 是什么意思呢?

就是说 signal 必须是 headLock 这个锁对象一起配对使用,而 tailWaits 才能配对 tailLock 一起使用

到这里就清楚 为什么报错 会在 headWaits.signal 了吧

那这个问题怎么解决呢?

我们只需要在 headWaits.signal 的前后加上 与它配对的锁就行了,为了防止报错 没有释放锁 使用 try finally 进行包裹 headWaits.signal,代码如下:

// 加上 headWaits.signal (); 需要的 headLock 锁
headLock.lockInterruptibly();
try
{
   // 必须 配合 配对的锁一起使用否则会就报错
   headWaits.signal();
}finally
{
   // 释放 headLock 的锁
   headLock.unlock();
}

我们在 headLock.lockInterruptibly (); 代码处打上断点 进行查看当 offer 线程 添加元素时 是哪个 poll 线程 获取到的 offer 添加的元素

image-20240121152559807

当程序走到–释放锁时 此时就会 唤醒 headWaits 中的一个线程去 取元素

image-20240121152650095

打印结果:

image-20240121152904479