Java源码示例:org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle
示例1
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例2
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState());
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
operatorIdentifier, backendUID);
if (!instanceRocksDBPath.mkdirs()) {
String errMsg = "Could not create RocksDB data directory: " + instanceBasePath.getAbsolutePath();
LOG.error(errMsg);
throw new IOException(errMsg);
}
restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
openDB();
registerColumnFamilyHandles(stateMetaInfoSnapshots);
}
示例3
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(
restoreStateHandle,
temporaryRestoreInstancePath,
cancelStreamRegistry);
}
// since we transferred all remote state to a local directory, we can use the same code as for
// local recovery.
return new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
}
示例4
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例5
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState());
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
operatorIdentifier, backendUID);
if (!instanceRocksDBPath.mkdirs()) {
String errMsg = "Could not create RocksDB data directory: " + instanceBasePath.getAbsolutePath();
LOG.error(errMsg);
throw new IOException(errMsg);
}
restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
openDB();
registerColumnFamilyHandles(stateMetaInfoSnapshots);
}
示例6
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(
restoreStateHandle,
temporaryRestoreInstancePath,
cancelStreamRegistry);
}
// since we transferred all remote state to a local directory, we can use the same code as for
// local recovery.
return new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
}
示例7
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例8
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState());
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
operatorIdentifier, backendUID);
if (!instanceRocksDBPath.mkdirs()) {
String errMsg = "Could not create RocksDB data directory: " + instanceBasePath.getAbsolutePath();
LOG.error(errMsg);
throw new IOException(errMsg);
}
restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
openDB();
registerColumnFamilyHandles(stateMetaInfoSnapshots);
}
示例9
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(
restoreStateHandle,
temporaryRestoreInstancePath,
cancelStreamRegistry);
}
// since we transferred all remote state to a local directory, we can use the same code as for
// local recovery.
return new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
}
示例10
@Override
protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
boolean completed = false;
// Handle to the meta data file
SnapshotResult<StreamStateHandle> metaStateHandle = null;
// Handles to new sst files since the last completed checkpoint will go here
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
try {
metaStateHandle = materializeMetaData();
// Sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
"Metadata for job manager was not properly created.");
uploadSstFiles(sstFiles, miscFiles);
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
new IncrementalRemoteKeyedStateHandle(
backendUID,
keyGroupRange,
checkpointId,
sstFiles,
miscFiles,
metaStateHandle.getJobManagerOwnedSnapshot());
final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
final SnapshotResult<KeyedStateHandle> snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
new IncrementalLocalKeyedStateHandle(
backendUID,
checkpointId,
directoryStateHandle,
keyGroupRange,
metaStateHandle.getTaskLocalSnapshot(),
sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
return snapshotResult;
} finally {
if (!completed) {
final List<StateObject> statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
示例11
@Override
protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
boolean completed = false;
// Handle to the meta data file
SnapshotResult<StreamStateHandle> metaStateHandle = null;
// Handles to new sst files since the last completed checkpoint will go here
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
try {
metaStateHandle = materializeMetaData();
// Sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
"Metadata for job manager was not properly created.");
uploadSstFiles(sstFiles, miscFiles);
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
new IncrementalRemoteKeyedStateHandle(
backendUID,
keyGroupRange,
checkpointId,
sstFiles,
miscFiles,
metaStateHandle.getJobManagerOwnedSnapshot());
final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
final SnapshotResult<KeyedStateHandle> snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
new IncrementalLocalKeyedStateHandle(
backendUID,
checkpointId,
directoryStateHandle,
keyGroupRange,
metaStateHandle.getTaskLocalSnapshot(),
sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
return snapshotResult;
} finally {
if (!completed) {
final List<StateObject> statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
示例12
@Override
protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
boolean completed = false;
// Handle to the meta data file
SnapshotResult<StreamStateHandle> metaStateHandle = null;
// Handles to new sst files since the last completed checkpoint will go here
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
try {
metaStateHandle = materializeMetaData();
// Sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
"Metadata for job manager was not properly created.");
uploadSstFiles(sstFiles, miscFiles);
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
new IncrementalRemoteKeyedStateHandle(
backendUID,
keyGroupRange,
checkpointId,
sstFiles,
miscFiles,
metaStateHandle.getJobManagerOwnedSnapshot());
final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
final SnapshotResult<KeyedStateHandle> snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
new IncrementalLocalKeyedStateHandle(
backendUID,
checkpointId,
directoryStateHandle,
keyGroupRange,
metaStateHandle.getTaskLocalSnapshot(),
sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
return snapshotResult;
} finally {
if (!completed) {
final List<StateObject> statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}
}