Java源码示例:org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability
示例1
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
sequenceNumber++;
if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
throw new IllegalStateException("no credit available");
}
return new BufferAndAvailability(
next.buffer(), isAvailable(next), next.buffersInBacklog());
} else {
return null;
}
}
示例2
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
sequenceNumber++;
if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
throw new IllegalStateException("no credit available");
}
return new BufferAndAvailability(
next.buffer(), isAvailable(next), next.buffersInBacklog());
} else {
return null;
}
}
示例3
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
if (hasReceivedAllEndOfPartitionEvents) {
return Optional.empty();
}
if (closeFuture.isDone()) {
throw new IllegalStateException("Released");
}
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input));
}
示例4
@Override
public BufferAndAvailability getNextBuffer() throws IOException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
sequenceNumber++;
if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
throw new IllegalStateException("no credit available");
}
return new BufferAndAvailability(
next.buffer(), isAvailable(next), next.buffersInBacklog());
} else {
return null;
}
}
示例5
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
if (hasReceivedAllEndOfPartitionEvents) {
return Optional.empty();
}
if (closeFuture.isDone()) {
throw new CancelTaskException("Input gate is already closed.");
}
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input));
}
示例6
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
sequenceNumber++;
return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
} else {
return null;
}
}
示例7
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
inputIterator = iterator;
serializer = new SpanningRecordSerializer<T>();
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {
private boolean hasData = inputIterator.next(reuse) != null;
@Override
public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
if (hasData) {
serializer.serializeRecord(reuse);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
serializer.copyToBufferBuilder(bufferBuilder);
hasData = inputIterator.next(reuse) != null;
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
} else {
inputChannel.setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
false,
0));
}
}
};
inputChannel.addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
return this;
}
示例8
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
sequenceNumber++;
return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
} else {
return null;
}
}
示例9
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
Optional<InputChannel> inputChannel = getChannel(blocking);
if (!inputChannel.isPresent()) {
return Optional.empty();
}
// Do not query inputChannel under the lock, to avoid potential deadlocks coming from
// notifications.
Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();
synchronized (inputChannelsWithData) {
if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
inputChannelsWithData.add(inputChannel.get());
enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
}
if (inputChannelsWithData.isEmpty()) {
resetIsAvailable();
}
if (result.isPresent()) {
return Optional.of(new InputWithData<>(
inputChannel.get(),
result.get(),
!inputChannelsWithData.isEmpty()));
}
}
}
}
示例10
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
inputIterator = iterator;
serializer = new SpanningRecordSerializer<T>();
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {
private boolean hasData = inputIterator.next(reuse) != null;
@Override
public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
if (hasData) {
serializer.serializeRecord(reuse);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
serializer.copyToBufferBuilder(bufferBuilder);
hasData = inputIterator.next(reuse) != null;
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
} else {
inputChannel.setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
false,
0));
}
}
};
inputChannel.addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
return this;
}
示例11
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
Optional<InputChannel> inputChannel = getChannel(blocking);
if (!inputChannel.isPresent()) {
return Optional.empty();
}
// Do not query inputChannel under the lock, to avoid potential deadlocks coming from
// notifications.
Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();
synchronized (inputChannelsWithData) {
if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
inputChannelsWithData.add(inputChannel.get());
enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
}
if (inputChannelsWithData.isEmpty()) {
availabilityHelper.resetUnavailable();
}
if (result.isPresent()) {
return Optional.of(new InputWithData<>(
inputChannel.get(),
result.get(),
!inputChannelsWithData.isEmpty()));
}
}
}
}
示例12
private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) {
return () -> {
// process all the queued state buffers and verify the data
int numProcessedStates = 0;
while (numProcessedStates < totalStates) {
if (verifyRelease && inputChannel.isReleased()) {
break;
}
if (inputChannel.getNumberOfQueuedBuffers() == 0) {
Thread.sleep(1);
continue;
}
try {
Optional<BufferAndAvailability> bufferAndAvailability = inputChannel.getNextBuffer();
if (bufferAndAvailability.isPresent()) {
Buffer buffer = bufferAndAvailability.get().buffer();
BufferBuilderAndConsumerTest.assertContent(buffer, null, states);
buffer.recycleBuffer();
numProcessedStates++;
}
} catch (Throwable t) {
if (!(verifyRelease && inputChannel.isReleased())) {
throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
}
}
}
return null;
};
}
示例13
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
inputIterator = iterator;
serializer = new SpanningRecordSerializer<T>();
// The input iterator can produce an infinite stream. That's why we have to serialize each
// record on demand and cannot do it upfront.
final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {
private boolean hasData = inputIterator.next(reuse) != null;
@Override
public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
if (hasData) {
serializer.serializeRecord(reuse);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
serializer.copyToBufferBuilder(bufferBuilder);
hasData = inputIterator.next(reuse) != null;
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(bufferConsumer.build(), true, 0));
} else {
inputChannel.setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
false,
0));
}
}
};
inputChannel.addBufferAndAvailability(answer);
inputGate.setInputChannels(inputChannel);
return this;
}
示例14
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
if (hasReceivedAllEndOfPartitionEvents) {
return Optional.empty();
}
if (isReleased) {
throw new IllegalStateException("Released");
}
requestPartitions();
InputChannel currentChannel;
boolean moreAvailable;
Optional<BufferAndAvailability> result = Optional.empty();
do {
synchronized (inputChannelsWithData) {
while (inputChannelsWithData.size() == 0) {
if (isReleased) {
throw new IllegalStateException("Released");
}
if (blocking) {
inputChannelsWithData.wait();
}
else {
return Optional.empty();
}
}
currentChannel = inputChannelsWithData.remove();
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
moreAvailable = !inputChannelsWithData.isEmpty();
}
result = currentChannel.getNextBuffer();
} while (!result.isPresent());
// this channel was now removed from the non-empty channels queue
// we re-add it in case it has more data, because in that case no "non-empty" notification
// will come for that channel
if (result.get().moreAvailable()) {
queueChannel(currentChannel);
moreAvailable = true;
}
final Buffer buffer = result.get().buffer();
if (buffer.isBuffer()) {
return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
if (event.getClass() == EndOfPartitionEvent.class) {
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
// Because of race condition between:
// 1. releasing inputChannelsWithData lock in this method and reaching this place
// 2. empty data notification that re-enqueues a channel
// we can end up with moreAvailable flag set to true, while we expect no more data.
checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent());
moreAvailable = false;
hasReceivedAllEndOfPartitionEvents = true;
}
currentChannel.notifySubpartitionConsumed();
currentChannel.releaseAllResources();
}
return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
}
}
示例15
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
示例16
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
inputChannels[channelIndex]);
}
}
示例17
@SuppressWarnings("unchecked")
private void setupInputChannels() {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
final BufferAndAvailabilityProvider answer = () -> {
ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
InputValue<Object> input;
boolean moreAvailable;
synchronized (inputQueue) {
input = inputQueue.poll();
moreAvailable = !inputQueue.isEmpty();
}
if (input != null && input.isStreamEnd()) {
inputChannels[channelIndex].setReleased();
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
delegate.setInstance(inputElement);
recordSerializer.serializeRecord(delegate);
BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
recordSerializer.copyToBufferBuilder(bufferBuilder);
bufferBuilder.finish();
// Call getCurrentBuffer to ensure size is set
return Optional.of(new BufferAndAvailability(bufferConsumer.build(), moreAvailable, 0));
} else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
if (event instanceof EndOfPartitionEvent) {
inputChannels[channelIndex].setReleased();
}
return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
} else {
return Optional.empty();
}
};
inputChannels[channelIndex].addBufferAndAvailability(answer);
}
inputGate.setInputChannels(inputChannels);
}
示例18
BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;
示例19
BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;
示例20
BufferAndAvailability getNextBuffer() throws IOException;