Java源码示例:org.apache.flink.runtime.state.StateSnapshotContext

示例1
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	ListState<StreamElement> partitionableState =
		getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
	partitionableState.clear();

	Collection<StreamElementQueueEntry<?>> values = queue.values();

	try {
		for (StreamElementQueueEntry<?> value : values) {
			partitionableState.add(value.getStreamElement());
		}

		// add the pending stream element queue entry if the stream element queue is currently full
		if (pendingStreamElementQueueEntry != null) {
			partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
		}
	} catch (Exception e) {
		partitionableState.clear();

		throw new Exception("Could not add stream element queue entries to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}
}
 
示例2
public static void snapshotFunctionState(
		StateSnapshotContext context,
		OperatorStateBackend backend,
		Function userFunction) throws Exception {

	Preconditions.checkNotNull(context);
	Preconditions.checkNotNull(backend);

	while (true) {

		if (trySnapshotFunctionState(context, backend, userFunction)) {
			break;
		}

		// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
		if (userFunction instanceof WrappingFunction) {
			userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
		} else {
			break;
		}
	}
}
 
示例3
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	ListState<StreamElement> partitionableState =
		getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
	partitionableState.clear();

	Collection<StreamElementQueueEntry<?>> values = queue.values();

	try {
		for (StreamElementQueueEntry<?> value : values) {
			partitionableState.add(value.getStreamElement());
		}

		// add the pending stream element queue entry if the stream element queue is currently full
		if (pendingStreamElementQueueEntry != null) {
			partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
		}
	} catch (Exception e) {
		partitionableState.clear();

		throw new Exception("Could not add stream element queue entries to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}
}
 
示例4
public static void snapshotFunctionState(
		StateSnapshotContext context,
		OperatorStateBackend backend,
		Function userFunction) throws Exception {

	Preconditions.checkNotNull(context);
	Preconditions.checkNotNull(backend);

	while (true) {

		if (trySnapshotFunctionState(context, backend, userFunction)) {
			break;
		}

		// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
		if (userFunction instanceof WrappingFunction) {
			userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
		} else {
			break;
		}
	}
}
 
示例5
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	ListState<StreamElement> partitionableState =
		getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
	partitionableState.clear();

	try {
		partitionableState.addAll(queue.values());
	} catch (Exception e) {
		partitionableState.clear();

		throw new Exception("Could not add stream element queue entries to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}
}
 
示例6
public static void snapshotFunctionState(
		StateSnapshotContext context,
		OperatorStateBackend backend,
		Function userFunction) throws Exception {

	Preconditions.checkNotNull(context);
	Preconditions.checkNotNull(backend);

	while (true) {

		if (trySnapshotFunctionState(context, backend, userFunction)) {
			break;
		}

		// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
		if (userFunction instanceof WrappingFunction) {
			userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
		} else {
			break;
		}
	}
}
 
示例7
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	checkState(checkpointedState != null,
		"The operator state has not been properly initialized.");

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

	checkpointedState.clear();

	List<TimestampedFileInputSplit> readerState = reader.getReaderState();

	try {
		for (TimestampedFileInputSplit split : readerState) {
			// create a new partition for each entry.
			checkpointedState.add(split);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add timestamped file input splits to to operator " +
			"state backend of operator " + getOperatorName() + '.', e);
	}

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
			getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
	}
}
 
示例8
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	Preconditions.checkState(this.checkpointedState != null,
		"The operator state has not been properly initialized.");

	saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());

	this.checkpointedState.clear();

	try {
		for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
			// create a new partition for each entry.
			this.checkpointedState.add(pendingCheckpoint);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add panding checkpoints to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
	}
}
 
示例9
private static boolean trySnapshotFunctionState(
		StateSnapshotContext context,
		OperatorStateBackend backend,
		Function userFunction) throws Exception {

	if (userFunction instanceof CheckpointedFunction) {
		((CheckpointedFunction) userFunction).snapshotState(context);

		return true;
	}

	if (userFunction instanceof ListCheckpointed) {
		@SuppressWarnings("unchecked")
		List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
				snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());

		ListState<Serializable> listState = backend.
				getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

		listState.clear();

		if (null != partitionableState) {
			try {
				for (Serializable statePartition : partitionableState) {
					listState.add(statePartition);
				}
			} catch (Exception e) {
				listState.clear();

				throw new Exception("Could not write partitionable state to operator " +
					"state backend.", e);
			}
		}

		return true;
	}

	return false;
}
 
示例10
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	ListState<Integer> partitionableState =
		getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
	partitionableState.clear();

	partitionableState.add(42);
	partitionableState.add(4711);

	++numberSnapshotCalls;
}
 
示例11
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();

	IN_CHECKPOINT_LATCH.trigger();

	// this should lock
	outStream.write(1);
}
 
示例12
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);
	// clear state first
	bufferState.clear();

	List<Tuple2<BaseRow, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
	inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));

	// batch put
	bufferState.addAll(dataToFlush);
}
 
示例13
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	checkState(checkpointedState != null,
		"The operator state has not been properly initialized.");

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

	checkpointedState.clear();

	List<TimestampedFileInputSplit> readerState = reader.getReaderState();

	try {
		for (TimestampedFileInputSplit split : readerState) {
			// create a new partition for each entry.
			checkpointedState.add(split);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add timestamped file input splits to to operator " +
			"state backend of operator " + getOperatorName() + '.', e);
	}

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
			getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
	}
}
 
示例14
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	Preconditions.checkState(this.checkpointedState != null,
		"The operator state has not been properly initialized.");

	saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());

	this.checkpointedState.clear();

	try {
		for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
			// create a new partition for each entry.
			this.checkpointedState.add(pendingCheckpoint);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add panding checkpoints to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
	}
}
 
示例15
private static boolean trySnapshotFunctionState(
		StateSnapshotContext context,
		OperatorStateBackend backend,
		Function userFunction) throws Exception {

	if (userFunction instanceof CheckpointedFunction) {
		((CheckpointedFunction) userFunction).snapshotState(context);

		return true;
	}

	if (userFunction instanceof ListCheckpointed) {
		@SuppressWarnings("unchecked")
		List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
				snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());

		ListState<Serializable> listState = backend.
				getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

		listState.clear();

		if (null != partitionableState) {
			try {
				for (Serializable statePartition : partitionableState) {
					listState.add(statePartition);
				}
			} catch (Exception e) {
				listState.clear();

				throw new Exception("Could not write partitionable state to operator " +
					"state backend.", e);
			}
		}

		return true;
	}

	return false;
}
 
示例16
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	ListState<Integer> partitionableState =
		getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
	partitionableState.clear();

	partitionableState.add(42);
	partitionableState.add(4711);

	++numberSnapshotCalls;
}
 
示例17
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();

	IN_CHECKPOINT_LATCH.trigger();

	// this should lock
	outStream.write(1);
}
 
示例18
@Override
public final void snapshotState(StateSnapshotContext context) throws Exception {
  if (checkpointStats != null) {
    checkpointStats.snapshotStart(context.getCheckpointId());
  }

  if (requiresStableInput) {
    // We notify the BufferingDoFnRunner to associate buffered state with this
    // snapshot id and start a new buffer for elements arriving after this snapshot.
    bufferingDoFnRunner.checkpoint(context.getCheckpointId());
  }

  try {
    outputManager.openBuffer();
    // Ensure that no new bundle gets started as part of finishing a bundle
    while (bundleStarted) {
      invokeFinishBundle();
    }
    outputManager.closeBuffer();
  } catch (Exception e) {
    // https://jira.apache.org/jira/browse/FLINK-14653
    // Any regular exception during checkpointing will be tolerated by Flink because those
    // typically do not affect the execution flow. We need to fail hard here because errors
    // in bundle execution are application errors which are not related to checkpointing.
    throw new Error("Checkpointing failed because bundle failed to finalize.", e);
  }

  super.snapshotState(context);
}
 
示例19
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);
	// clear state first
	bufferState.clear();

	List<Tuple2<RowData, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
	inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));

	// batch put
	bufferState.addAll(dataToFlush);
}
 
示例20
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) {
		DeclineSink.waitLatch.await();
	}
	super.snapshotState(context);
}
 
示例21
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	checkState(checkpointedState != null,
		"The operator state has not been properly initialized.");

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

	checkpointedState.clear();

	List<T> readerState = getReaderState();

	try {
		for (T split : readerState) {
			checkpointedState.add(split);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add timestamped file input splits to to operator " +
				"state backend of operator " + getOperatorName() + '.', e);
	}

	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
				getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
	}
}
 
示例22
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);

	Preconditions.checkState(this.checkpointedState != null,
		"The operator state has not been properly initialized.");

	saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());

	this.checkpointedState.clear();

	try {
		for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
			// create a new partition for each entry.
			this.checkpointedState.add(pendingCheckpoint);
		}
	} catch (Exception e) {
		checkpointedState.clear();

		throw new Exception("Could not add panding checkpoints to operator state " +
			"backend of operator " + getOperatorName() + '.', e);
	}

	int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
	if (LOG.isDebugEnabled()) {
		LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
	}
}
 
示例23
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	ListState<Integer> partitionableState =
		getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
	partitionableState.clear();

	partitionableState.add(42);
	partitionableState.add(4711);

	++numberSnapshotCalls;
}
 
示例24
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();

	IN_CHECKPOINT_LATCH.trigger();

	// this should lock
	outStream.write(1);
}
 
示例25
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	counterState.update(counter);
	snapshotOutData = counter;
}
 
示例26
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	super.snapshotState(context);
	StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
 
示例27
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
	super.snapshotState(context);
}
 
示例28
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	counterState.add(counter);
}
 
示例29
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
}
 
示例30
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
	counterState.update(counter);
	snapshotOutData = counter;
}