Java源码示例:org.apache.flink.runtime.state.StateSnapshotContext
示例1
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
示例2
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
示例3
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
示例4
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
示例5
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
try {
partitionableState.addAll(queue.values());
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
示例6
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
示例7
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
checkState(checkpointedState != null,
"The operator state has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
checkpointedState.clear();
List<TimestampedFileInputSplit> readerState = reader.getReaderState();
try {
for (TimestampedFileInputSplit split : readerState) {
// create a new partition for each entry.
checkpointedState.add(split);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add timestamped file input splits to to operator " +
"state backend of operator " + getOperatorName() + '.', e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
}
}
示例8
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
Preconditions.checkState(this.checkpointedState != null,
"The operator state has not been properly initialized.");
saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
this.checkpointedState.clear();
try {
for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
// create a new partition for each entry.
this.checkpointedState.add(pendingCheckpoint);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add panding checkpoints to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
}
}
示例9
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
if (userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator " +
"state backend.", e);
}
}
return true;
}
return false;
}
示例10
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
ListState<Integer> partitionableState =
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
partitionableState.clear();
partitionableState.add(42);
partitionableState.add(4711);
++numberSnapshotCalls;
}
示例11
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
IN_CHECKPOINT_LATCH.trigger();
// this should lock
outStream.write(1);
}
示例12
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
// clear state first
bufferState.clear();
List<Tuple2<BaseRow, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));
// batch put
bufferState.addAll(dataToFlush);
}
示例13
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
checkState(checkpointedState != null,
"The operator state has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
checkpointedState.clear();
List<TimestampedFileInputSplit> readerState = reader.getReaderState();
try {
for (TimestampedFileInputSplit split : readerState) {
// create a new partition for each entry.
checkpointedState.add(split);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add timestamped file input splits to to operator " +
"state backend of operator " + getOperatorName() + '.', e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
}
}
示例14
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
Preconditions.checkState(this.checkpointedState != null,
"The operator state has not been properly initialized.");
saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
this.checkpointedState.clear();
try {
for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
// create a new partition for each entry.
this.checkpointedState.add(pendingCheckpoint);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add panding checkpoints to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
}
}
示例15
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
if (userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator " +
"state backend.", e);
}
}
return true;
}
return false;
}
示例16
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
ListState<Integer> partitionableState =
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
partitionableState.clear();
partitionableState.add(42);
partitionableState.add(4711);
++numberSnapshotCalls;
}
示例17
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
IN_CHECKPOINT_LATCH.trigger();
// this should lock
outStream.write(1);
}
示例18
@Override
public final void snapshotState(StateSnapshotContext context) throws Exception {
if (checkpointStats != null) {
checkpointStats.snapshotStart(context.getCheckpointId());
}
if (requiresStableInput) {
// We notify the BufferingDoFnRunner to associate buffered state with this
// snapshot id and start a new buffer for elements arriving after this snapshot.
bufferingDoFnRunner.checkpoint(context.getCheckpointId());
}
try {
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted) {
invokeFinishBundle();
}
outputManager.closeBuffer();
} catch (Exception e) {
// https://jira.apache.org/jira/browse/FLINK-14653
// Any regular exception during checkpointing will be tolerated by Flink because those
// typically do not affect the execution flow. We need to fail hard here because errors
// in bundle execution are application errors which are not related to checkpointing.
throw new Error("Checkpointing failed because bundle failed to finalize.", e);
}
super.snapshotState(context);
}
示例19
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
// clear state first
bufferState.clear();
List<Tuple2<RowData, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));
// batch put
bufferState.addAll(dataToFlush);
}
示例20
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) {
DeclineSink.waitLatch.await();
}
super.snapshotState(context);
}
示例21
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
checkState(checkpointedState != null,
"The operator state has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
checkpointedState.clear();
List<T> readerState = getReaderState();
try {
for (T split : readerState) {
checkpointedState.add(split);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add timestamped file input splits to to operator " +
"state backend of operator " + getOperatorName() + '.', e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
}
}
示例22
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
Preconditions.checkState(this.checkpointedState != null,
"The operator state has not been properly initialized.");
saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
this.checkpointedState.clear();
try {
for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
// create a new partition for each entry.
this.checkpointedState.add(pendingCheckpoint);
}
} catch (Exception e) {
checkpointedState.clear();
throw new Exception("Could not add panding checkpoints to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
}
}
示例23
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
ListState<Integer> partitionableState =
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
partitionableState.clear();
partitionableState.add(42);
partitionableState.add(4711);
++numberSnapshotCalls;
}
示例24
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
IN_CHECKPOINT_LATCH.trigger();
// this should lock
outStream.write(1);
}
示例25
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
counterState.update(counter);
snapshotOutData = counter;
}
示例26
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
示例27
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
super.snapshotState(context);
}
示例28
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
counterState.add(counter);
}
示例29
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
}
示例30
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
counterState.update(counter);
snapshotOutData = counter;
}