阻塞队列
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- SynchrosQueue
操作方法
- 插入元素
add/offer()/put
- 删除/获取元素
remove/poll/take
源码分析
public class BlockingQueueDemo{
// add method
/**
* 将元素插入队列中
* @param e
* @return
*/
public boolean add(E e) {
// 判断队列是否满了 如果没满则进行插入元素
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果计数器和元素列表长度相等 说明队列已满
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 当放入元素数量到达队列最大容量时,重置存放index 防止索引越界异常
if (++putIndex == items.length)
putIndex = 0;
// 记录队列长度
count++;
// 基于condition阻塞队列完成线程的阻塞
// 唤醒阻塞队列的线程
notEmpty.signal();
}
// take method
/**
* 可中断的从数组中取出元素
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 当数组中没有元素时 进行阻塞等待
// 基于condition实现阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 从数组中取出元素
* @return
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 从数组中取出元素
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 当取出元素的index为数组长度时 重置index 防止数组索引越界异常
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 清理无效的数据
if (itrs != null)
itrs.elementDequeued();
// 唤醒阻塞的线程
notFull.signal();
return x;
}
// remove method
/**
* 删除无效的元素
* @param o
* @return
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
}