BlockingQueue Interface In Java学习
一.从“生产者”和”消费者“模型谈起
生产者消费者问题,也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。通过队列能够容易的实现多线程环境下的数据共享.生产者线程把准备好的数据从队尾插入,消费者线程从队头消费数据,以此解决其数据共享.但这是「柏拉图的理想国」,现实不尽是如此.有个前提是,队列长度是有限的.对于若干个生产者和消费者线程,其数据处理速率是不一致的,当生产者生产数据的速率大于消费者消费数据的速率,且经过一段时间,数据积累到快要挤满队列长度时,生产者线程就需要挂起(阻塞),等待消费者处理积累的数据,并由其唤醒;同样,当消费者消费数据速率大于生产者时,两个线程间就需要做相反的操作.
以上的操作,在Doug Lea写的concurrent包发布之前是需要程序员自己去实现的,而他带来了BlockingQueue,一种兼顾了线程安全和效率的接口,最大程度的保留了广大程序员的发量.因为我们站在了巨人的肩膀上,不用再去关心什么时候挂起线程,什么时候唤醒线程.
二.BlockingQueue Interface In Java简介
这是一个图:
Java提供了很几种BlockingQueue的实现,例如LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue等等.其所有方法本质上都是原子的,并使用可重入锁或其他形式的并发控制.(这里膜一下这个留着德王威廉二世的胡子,脸上挂着腼腆笑容的Doug Lea大佬)
根据其实例化方法不同,分为两种类型的BlokingQueue.
- Unbounded Queue(无限队列):阻塞队列的容量将默认设置为Integer.MAX_VALUE.队列将名不副实,它将永远不会阻塞.因为它可能会随着你元素的添加,而增长到很大的规模. 创建方法:
BlockingQueue unboundedQueue = new LinkedBlockingDeque();
- Bounded Queue(有限队列):通过在队列构造函数中传递队列的容量来创建它:
BlockingQueue boundedQueue = new LinkedBlockingDeque(7);
三.读一读BlockingQueue,ArrayBlockingQueue源码
1. BlockingQueue
BlockingQueue继承自Queue接口.包含三个添加元素的方法(add,offer,put)和三个删除元素的方法(take,poll,remove).下面分别介绍以上六个方法.
三种添加元素的方法:
- add:添加指定元素到队列中,如果添加成功则返回true.如果元素由于队列大小限制无法添加,则会抛出IllegalStateException.
- offer:添加指定元素到队列中,如果添加成功返回true,添加失败则返回false.
- put:添加指定元素到队列中,如果队列满了,会阻塞到其不满为止.
三种删除元素的方法
- take:检索并删除队头元素,如果队列为空,会阻塞到有元素删除为止.
- poll:检索并删除队头元素,如果队列为空,会在规定的等待时间内阻塞直到有元素删除为止,若在阻塞时间内被中断,会抛出InterruptedException.
- remove:基于对象找到队列中的元素,并删除.如果元素存在,删除,并返回true;如果指定元素的类和队列不兼容,会抛出ClassCastException.
值得一提的是,队列中不允许有空值,不然会抛空指针异常.
2. ArrayBlockingQueue
ArrayBlockingQueue继承自AbstractQueue,实现了BlockingQueue接口,包含了八个属性:
- items:存储队列元素的数组.
- takeIndex:取数据的索引,用于put,poll,peek或者remove方法.
- putIndex:放数据的索引,用于put,offer或者add方法.
- count:队列中元素的总数.
- lock:可重入锁.
- notEmpty:由lock创建的notEmpty条件对象.
- notFull:由lock创建的notFull条件对象.
- itrs:共享当前活跃的迭代器状态,如果没有则为空.运行队列操作更新迭代器状态.
接下来是关于ArrayBlockingQueue的入队(enqueue)和出队(dequeue)方法.
private void enqueue(E e) {final Object[] items = this.items;items[putIndex] = e;if (++putIndex == items.length) putIndex = 0;count++;notEmpty.signal(); //入队完,notEmpty条件对象的siganl方法进行通知}
private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E e = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); //出队完,调用notFull条件对象的signal方法进行通知 return e;}
ArrayBlockingQueue就是使用一个可重入锁和这个锁创建的两个条件对象进行并发控制.ArrayBlockingQueue直译过来就是数组阻塞队列,它特性就像数组一样,需要指定队列大小,并且指定之后不允许修改.
然后是关于ArrayBlockingQueue的三个添加元素的方法:add,offer,put.
- add
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
add方法调用了offer方法.其实现如下:
public boolean offer(E e) {Objects.requireNonNull(e); //入队元素判空final ReentrantLock lock = this.lock;lock.lock(); //加可重入锁,保证调用offer方法的时候只有一个线程try {if (count == items.length) //数组判满return false;else {enqueue(e); //数组没满的情况下就入队return true;}} finally {lock.unlock(); //入队完了释放锁,让其他线程能够调用offer方法}}
- put
public void put(E e) throws InterruptedException {Objects.requireNonNull(e); //入队元素判空final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //加可重入锁,保证调用offer方法的时候只有一个线程try {while (count == items.length) //数组判满notFull.await(); //如果队列满了,阻塞当前线程并挂起,同时释放锁,调用notFull条件对象的await方法加入到其等待队列里.enqueue(e); //入队} finally {lock.unlock(); //释放锁}}
ArrayBlocking Queue提供的三个元素删除的方法:poll,take,remove.
- poll
public E poll() {final ReentrantLock lock = this.lock;lock.lock(); //加可重入锁,保证调用poll方法时只有一个线程try {return (count == 0) ? null : dequeue();//如果队列里没元素了,返回空;否则,调用dequeue方法出队.} finally {lock.unlock();}}
- take
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //加可重入锁,保证调用take方法的只有一个线程try {while (count == 0) notEmpty.await(); //如果队列为空,阻塞当前线程并挂起,同时释放锁,调用notEmpty条件对象的await方法加入到其等待队列里.return dequeue(); //调用dequeue方法进行出队操作} finally {lock.unlock();}}
- Remove
public boolean remove(Object o) {if (o == null) return false;final ReentrantLock lock = this.lock;lock.lock(); // 加可重入锁,保证调用remove方法的只有一个线程try {if (count > 0) {final Object[] items = this.items;for (int i = takeIndex, end = putIndex,to = (i < end) ? end : items.length;; i = 0, to = end) {for (; i < to; i++)if (o.equals(items[i])) { //待删除元素与数组中的元素一致removeAt(i); //只有才持有锁的时候才能调用removeAt方法删除指定下标的元素 return true;}if (to == end) break;}}return false;} finally {lock.unlock(); //释放锁,让其他线程能够调用remove方法}}
- remove方法调用的removeAt方法
void removeAt(final int removeIndex) {final Object[] items = this.items;if (removeIndex == takeIndex) { //如果要删除的索引刚好是取索引的位置,直接删除取索引指向的元素,然后取索引+1items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0;count--;if (itrs != null) itrs.elementDequeued(); //更新迭代器状态} else { // 若元素不在取索引位置,则移动元素,更新取索引和放索引的位置for (int i = removeIndex, putIndex = this.putIndex;;) {int pred = i;if (++i == items.length) i = 0;if (i == putIndex) {items[pred] = null;this.putIndex = pred;break;}items[pred] = items[i];}count--;if (itrs != null)itrs.removedAt(removeIndex); // 迭代器的方式删除在待删除索引位置的元素}notFull.signal(); // 调用notFull条件对象的signal方法进行通知 }
关于ArraryBlockingQueue的关于新增和删除元素的方法总结如下:
- 关于add方法:内部调用了offer方法.添加成功返回true,队列满了则抛出IllegalStateException.
- 关于offer方法:添加成功返回true,队列满了则返回false.
- 关于put方法:越王勾践式的添加方法.队列满了,会阻塞线程,直到有消费者消费了队列中的元素,才有可能被唤醒,进行元素添加.
- 关于poll方法:队列为空就返回null,否则返回队头元素.
- 关于take方法:越王勾践式的删除方法.队列为空,阻塞线程,直到有生产者往队列中生产了元素,才有被唤醒的可能,进行元素删除.
- 关于remove方法:调用了removeAt方法,删除元素成功则返回true,否则返回false.
- 以上的六个方法均使用了可重入锁,保证操作的原子性.
BlockingQueue Interface In Java学习
一.从“生产者”和”消费者“模型谈起
生产者消费者问题,也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。通过队列能够容易的实现多线程环境下的数据共享.生产者线程把准备好的数据从队尾插入,消费者线程从队头消费数据,以此解决其数据共享.但这是「柏拉图的理想国」,现实不尽是如此.有个前提是,队列长度是有限的.对于若干个生产者和消费者线程,其数据处理速率是不一致的,当生产者生产数据的速率大于消费者消费数据的速率,且经过一段时间,数据积累到快要挤满队列长度时,生产者线程就需要挂起(阻塞),等待消费者处理积累的数据,并由其唤醒;同样,当消费者消费数据速率大于生产者时,两个线程间就需要做相反的操作.
以上的操作,在Doug Lea写的concurrent包发布之前是需要程序员自己去实现的,而他带来了BlockingQueue,一种兼顾了线程安全和效率的接口,最大程度的保留了广大程序员的发量.因为我们站在了巨人的肩膀上,不用再去关心什么时候挂起线程,什么时候唤醒线程.
二.BlockingQueue Interface In Java简介
这是一个图:
Java提供了很几种BlockingQueue的实现,例如LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue等等.其所有方法本质上都是原子的,并使用可重入锁或其他形式的并发控制.(这里膜一下这个留着德王威廉二世的胡子,脸上挂着腼腆笑容的Doug Lea大佬)
根据其实例化方法不同,分为两种类型的BlokingQueue.
- Unbounded Queue(无限队列):阻塞队列的容量将默认设置为Integer.MAX_VALUE.队列将名不副实,它将永远不会阻塞.因为它可能会随着你元素的添加,而增长到很大的规模. 创建方法:
BlockingQueue unboundedQueue = new LinkedBlockingDeque();
- Bounded Queue(有限队列):通过在队列构造函数中传递队列的容量来创建它:
BlockingQueue boundedQueue = new LinkedBlockingDeque(7);
三.读一读BlockingQueue,ArrayBlockingQueue源码
1. BlockingQueue
BlockingQueue继承自Queue接口.包含三个添加元素的方法(add,offer,put)和三个删除元素的方法(take,poll,remove).下面分别介绍以上六个方法.
三种添加元素的方法:
- add:添加指定元素到队列中,如果添加成功则返回true.如果元素由于队列大小限制无法添加,则会抛出IllegalStateException.
- offer:添加指定元素到队列中,如果添加成功返回true,添加失败则返回false.
- put:添加指定元素到队列中,如果队列满了,会阻塞到其不满为止.
三种删除元素的方法
- take:检索并删除队头元素,如果队列为空,会阻塞到有元素删除为止.
- poll:检索并删除队头元素,如果队列为空,会在规定的等待时间内阻塞直到有元素删除为止,若在阻塞时间内被中断,会抛出InterruptedException.
- remove:基于对象找到队列中的元素,并删除.如果元素存在,删除,并返回true;如果指定元素的类和队列不兼容,会抛出ClassCastException.
值得一提的是,队列中不允许有空值,不然会抛空指针异常.
2. ArrayBlockingQueue
ArrayBlockingQueue继承自AbstractQueue,实现了BlockingQueue接口,包含了八个属性:
- items:存储队列元素的数组.
- takeIndex:取数据的索引,用于put,poll,peek或者remove方法.
- putIndex:放数据的索引,用于put,offer或者add方法.
- count:队列中元素的总数.
- lock:可重入锁.
- notEmpty:由lock创建的notEmpty条件对象.
- notFull:由lock创建的notFull条件对象.
- itrs:共享当前活跃的迭代器状态,如果没有则为空.运行队列操作更新迭代器状态.
接下来是关于ArrayBlockingQueue的入队(enqueue)和出队(dequeue)方法.
private void enqueue(E e) {final Object[] items = this.items;items[putIndex] = e;if (++putIndex == items.length) putIndex = 0;count++;notEmpty.signal(); //入队完,notEmpty条件对象的siganl方法进行通知}
private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E e = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); //出队完,调用notFull条件对象的signal方法进行通知 return e;}
ArrayBlockingQueue就是使用一个可重入锁和这个锁创建的两个条件对象进行并发控制.ArrayBlockingQueue直译过来就是数组阻塞队列,它特性就像数组一样,需要指定队列大小,并且指定之后不允许修改.
然后是关于ArrayBlockingQueue的三个添加元素的方法:add,offer,put.
- add
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
add方法调用了offer方法.其实现如下:
public boolean offer(E e) {Objects.requireNonNull(e); //入队元素判空final ReentrantLock lock = this.lock;lock.lock(); //加可重入锁,保证调用offer方法的时候只有一个线程try {if (count == items.length) //数组判满return false;else {enqueue(e); //数组没满的情况下就入队return true;}} finally {lock.unlock(); //入队完了释放锁,让其他线程能够调用offer方法}}
- put
public void put(E e) throws InterruptedException {Objects.requireNonNull(e); //入队元素判空final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //加可重入锁,保证调用offer方法的时候只有一个线程try {while (count == items.length) //数组判满notFull.await(); //如果队列满了,阻塞当前线程并挂起,同时释放锁,调用notFull条件对象的await方法加入到其等待队列里.enqueue(e); //入队} finally {lock.unlock(); //释放锁}}
ArrayBlocking Queue提供的三个元素删除的方法:poll,take,remove.
- poll
public E poll() {final ReentrantLock lock = this.lock;lock.lock(); //加可重入锁,保证调用poll方法时只有一个线程try {return (count == 0) ? null : dequeue();//如果队列里没元素了,返回空;否则,调用dequeue方法出队.} finally {lock.unlock();}}
- take
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //加可重入锁,保证调用take方法的只有一个线程try {while (count == 0) notEmpty.await(); //如果队列为空,阻塞当前线程并挂起,同时释放锁,调用notEmpty条件对象的await方法加入到其等待队列里.return dequeue(); //调用dequeue方法进行出队操作} finally {lock.unlock();}}
- Remove
public boolean remove(Object o) {if (o == null) return false;final ReentrantLock lock = this.lock;lock.lock(); // 加可重入锁,保证调用remove方法的只有一个线程try {if (count > 0) {final Object[] items = this.items;for (int i = takeIndex, end = putIndex,to = (i < end) ? end : items.length;; i = 0, to = end) {for (; i < to; i++)if (o.equals(items[i])) { //待删除元素与数组中的元素一致removeAt(i); //只有才持有锁的时候才能调用removeAt方法删除指定下标的元素 return true;}if (to == end) break;}}return false;} finally {lock.unlock(); //释放锁,让其他线程能够调用remove方法}}
- remove方法调用的removeAt方法
void removeAt(final int removeIndex) {final Object[] items = this.items;if (removeIndex == takeIndex) { //如果要删除的索引刚好是取索引的位置,直接删除取索引指向的元素,然后取索引+1items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0;count--;if (itrs != null) itrs.elementDequeued(); //更新迭代器状态} else { // 若元素不在取索引位置,则移动元素,更新取索引和放索引的位置for (int i = removeIndex, putIndex = this.putIndex;;) {int pred = i;if (++i == items.length) i = 0;if (i == putIndex) {items[pred] = null;this.putIndex = pred;break;}items[pred] = items[i];}count--;if (itrs != null)itrs.removedAt(removeIndex); // 迭代器的方式删除在待删除索引位置的元素}notFull.signal(); // 调用notFull条件对象的signal方法进行通知 }
关于ArraryBlockingQueue的关于新增和删除元素的方法总结如下:
- 关于add方法:内部调用了offer方法.添加成功返回true,队列满了则抛出IllegalStateException.
- 关于offer方法:添加成功返回true,队列满了则返回false.
- 关于put方法:越王勾践式的添加方法.队列满了,会阻塞线程,直到有消费者消费了队列中的元素,才有可能被唤醒,进行元素添加.
- 关于poll方法:队列为空就返回null,否则返回队头元素.
- 关于take方法:越王勾践式的删除方法.队列为空,阻塞线程,直到有生产者往队列中生产了元素,才有被唤醒的可能,进行元素删除.
- 关于remove方法:调用了removeAt方法,删除元素成功则返回true,否则返回false.
- 以上的六个方法均使用了可重入锁,保证操作的原子性.
发布评论