多线程之阻塞队列

目录

阻塞队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • SynchrosQueue

操作方法

  • 插入元素

add/offer()/put

  • 删除/获取元素

remove/poll/take

源码分析


public class BlockingQueueDemo{
    
    // add method 
    /**
    * 将元素插入队列中  
    * @param e
    * @return 
    */
    public boolean add(E e) {
        // 判断队列是否满了 如果没满则进行插入元素
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果计数器和元素列表长度相等 说明队列已满
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        // 当放入元素数量到达队列最大容量时,重置存放index 防止索引越界异常
        if (++putIndex == items.length)
            putIndex = 0;
        
        // 记录队列长度    
        count++;
        
        // 基于condition阻塞队列完成线程的阻塞
        // 唤醒阻塞队列的线程
        notEmpty.signal();
    }
    
    
    // take method
    
    /**
    * 可中断的从数组中取出元素
    * @return 
    * @throws InterruptedException
    */
     public E take() throws InterruptedException {
         final ReentrantLock lock = this.lock;
         lock.lockInterruptibly();
         try {
             // 当数组中没有元素时 进行阻塞等待
             // 基于condition实现阻塞
             while (count == 0)
                 notEmpty.await();
             return dequeue();
         } finally {
             lock.unlock();
         }
     }
     
     /**
     * 从数组中取出元素
     * @return 
     */
     private E dequeue() {
         // assert lock.getHoldCount() == 1;
         // assert items[takeIndex] != null;
         final Object[] items = this.items;
         @SuppressWarnings("unchecked")
         // 从数组中取出元素
         E x = (E) items[takeIndex];
         items[takeIndex] = null;
         // 当取出元素的index为数组长度时 重置index 防止数组索引越界异常
         if (++takeIndex == items.length)
             takeIndex = 0;
         
         count--;
         
         // 清理无效的数据
         if (itrs != null)
             itrs.elementDequeued();
         // 唤醒阻塞的线程
         notFull.signal();
         return x;
     }
     
     
    // remove method
    
    /**
    * 删除无效的元素
    * @param o
    * @return 
    */
    public boolean remove(Object o) {
          if (o == null) return false;
          final Object[] items = this.items;
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              if (count > 0) {
                  final int putIndex = this.putIndex;
                  int i = takeIndex;
                  do {
                      if (o.equals(items[i])) {
                          removeAt(i);
                          return true;
                      }
                      if (++i == items.length)
                          i = 0;
                  } while (i != putIndex);
              }
              return false;
          } finally {
              lock.unlock();
          }
      }
}

image

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦