前阵看见个面试题
请实现一个泛型类 BlockingQueue:
- 构造函数里指定队列容量
- void put (T item) 队列满了会阻塞,直到队列有空间
- T take (),队列空时会阻塞,直到队列有元素
我寻思做做看,结果写出来的答案我自己都看不下去,那干脆看看 Java 里面是咋实现的。
我的实现
先看看我写的玩意是啥德行吧。实话说这玩意看着都不像能正常工作的样子(虽然实际上它还真能用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class BlockingQueue<T> { private final Object[] items; private final int maxSize; private volatile int count;
public BlockingQueue(final int size) { this.items = new Object[size]; this.maxSize = size; }
public synchronized void put(final T item) throws InterruptedException { while (this.count >= maxSize) { wait(); }
while (true) { if (this.count < maxSize) { notifyAll(); break; } }
synchronized (this) { items[count] = item; count++; } }
public synchronized T take() throws InterruptedException { while (count == 0) { wait(); }
while (true) { if (count > 0) { notifyAll(); break;
} }
final T item = (T) items[0];
for (int i = 1; i < count; i++) { items[i - 1] = items[i]; } count--;
return item; } }
|
这段代码主要有俩问题,一个是这些 while
循环看着很不爽;另一个是在取出元素后逐个将元素左移的操作效率非常的低;而且满天飞的 wait()
和 notifyAll()
也会影响代码的可读性。
那么接下来看看这段代码怎么优化成接近 Java 自己的实现。
优化阻塞
BlockingQueue 的一个机制是,在队列空时取元素的操作会被阻塞,而在队列满时放元素的操作会被阻塞。在 Java 的 ArrayBlockingQueue
中,它并没有使用 synchronized
加锁,而是用了 ReentrantLock
对象。此外对于阻塞线程和唤醒线程操作,它也没有用 wait()
和 notifyAll()
,而是通过两个 Condition
对象 notEmpty
和 notFull
实现。
所以改进方案就是,去掉方法定义中的 synchronized
关键字,换成用 ReentrantLock
加锁解锁;去掉 wait()
和 notifyAll()
,改成用 notEmpty
和 notFull
管理状态以及负责唤醒,同时可以借助这两个变量的语义增强代码的可读性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class BlockingQueue<T> { private final Object[] items; private final int maxSize; private int count = 0;
private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public BlockingQueue(final int size) { this.items = new Object[size]; this.maxSize = size;
this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); }
public void put(final T item) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();
try { while (count >= maxSize) { notFull.await(); }
items[count] = item; count++;
notEmpty.signal(); } finally { lock.unlock(); } }
public T take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();
try { while (count == 0) { notEmpty.await(); }
final T item = (T) items[0]; for (int i = 1; i < count; i++) { items[i - 1] = items[i]; } count--;
notFull.signal();
return item; } finally { notFull.signal(); }
} }
|
优化取出元素后更新队列内容
如果这是一个非常大的队列,那么用一个 for 循环将所有元素往左移来更新队列内容的方式会消耗巨量的时间,显然它的效率是非常低的。相比于每次取出元素后都重排数组中的元素,Java 中 ArrayBlockingQueue
则是利用循环数组的思路,通过两个指针来指示当前该从什么位置取或该向什么位置存(这不是经典的双指针玩法么,前段时间刷 LeetCode 还做过呢,咋就没想起来),这样每次存取元素后,只需要更新指针指向的位置就行,效率可想而知非常高。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class BlockingQueue<T> { private final Object[] items; private int count = 0; private int takeIndex = 0; private int putIndex = 0;
private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public BlockingQueue(final int size) { this.items = new Object[size];
this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); }
public void put(final T item) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();
try { while (count >= items.length) { notFull.await(); }
items[putIndex] = item; count++;
if (++putIndex == items.length) { putIndex = 0; }
notEmpty.signal(); } finally { lock.unlock(); } }
public T take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();
try { while (count == 0) { notEmpty.await(); }
final T item = (T) items[takeIndex]; items[takeIndex] = null; count--;
if (++takeIndex == items.length) { takeIndex = 0; }
notFull.signal();
return item; } finally { notFull.signal(); } } }
|