我实现了我的自定义BlockingQueue
这是我的实现:
public class CustomBlockingQueue<T> implements BlockingQueue<T> {
private final T[] table;
private final int capacity;
private int head = 0;
private int tail = 0;
private volatile int size;
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
@SuppressWarnings("unchecked")
public CustomBlockingQueue(final int capacity) {
this.capacity = capacity;
this.table = (T[]) new Object[this.capacity];
size = 0;
}
@Override
public void add(final T item) throws InterruptedException {
lock.lock();
try {
while (size >= table.length) {
notFull.await();
}
if (tail == table.length) {
tail = 0;
}
table[tail] = item;
size++;
tail++;
if (size == 1) {
notEmpty.signalAll();
}
} finally {
lock.unlock();
}
}
@Override
public T poll() throws InterruptedException {
lock.lock();
try {
while (size == 0) {
notEmpty.await();
}
if (head == table.length) {
head = 0;
}
final T result = table[head];
table[head] = null;
size--;
head++;
if (size == capacity - 1) {
notFull.signalAll();
}
return result;
} finally {
lock.unlock();
}
}
@Override
public int size() {
return size;
}
}
我的实现基于数组。
我不要求你审查代码,但帮助我澄清我和Java之间的区别。
在我的代码中,我在if
子句中做了notAir. signalAll()
或notFull.signalAll()
,但是java.util.concurrent
在每种情况下都简单地调用信号()
?
即使没有必要,每次都通知另一个线程的原因是什么?
如果一个线程在可以读取或添加到队列之前一直处于阻塞状态,那么它的最佳位置是在适用条件的等待集中。这样它就不会主动竞争锁,也不会进行上下文切换。
如果只有一个项目被添加到队列中,我们希望只向一个消费者发出信号。我们不想向更多的消费者发出信号,因为系统必须管理并向所有无法取得进展的线程提供时间片,这会增加更多的工作。
这就是为什么ArrayBlockingQueue在每个项目入队或出队时一次发出一个信号,以避免不必要的唤醒。在您的实现中,waitset中的每个人都会在转换时被唤醒(从空到非空,或从满到不满),无论这些线程中有多少个能够完成他们的工作。
随着更多的线程同时访问,这一点变得更加重要。想象一个系统有100个线程等待从队列中消费某些东西,但每10秒只添加一个项目。最好不要为了让99个线程必须返回而从等待集中踢出100个线程。