Java源码示例:org.apache.flink.util.CloseableIterable
示例1
public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception {
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
streamOperator.initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, closeableRegistry);
closeFromRegistry(keyedStateInputs, closeableRegistry);
}
}
示例2
protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(
Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw operator state, but found state alternative.");
if (rawOperatorState != null) {
return new CloseableIterable<StatePartitionStreamProvider>() {
final CloseableRegistry closeableRegistry = new CloseableRegistry();
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Nonnull
@Override
public Iterator<StatePartitionStreamProvider> iterator() {
return new OperatorStateStreamIterator(
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
rawOperatorState.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例3
protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(
Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<KeyedStateHandle> rawKeyedState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw keyed state, but found state alternative.");
if (rawKeyedState != null) {
Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState);
final CloseableRegistry closeableRegistry = new CloseableRegistry();
return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Override
public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例4
StreamOperatorStateContextImpl(
boolean restored,
OperatorStateBackend operatorStateBackend,
AbstractKeyedStateBackend<?> keyedStateBackend,
InternalTimeServiceManager<?> internalTimeServiceManager,
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs,
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {
this.restored = restored;
this.operatorStateBackend = operatorStateBackend;
this.keyedStateBackend = keyedStateBackend;
this.internalTimeServiceManager = internalTimeServiceManager;
this.rawOperatorStateInputs = rawOperatorStateInputs;
this.rawKeyedStateInputs = rawKeyedStateInputs;
}
示例5
@Override
public void initializeState(StateInitializationContext context) throws Exception {
keyedStateBackend = (AbstractKeyedStateBackend<?>) getKeyedStateBackend();
operatorStateBackend = getOperatorStateBackend();
rawOperatorStateInputs =
(CloseableIterable<StatePartitionStreamProvider>) context.getRawOperatorStateInputs();
rawKeyedStateInputs =
(CloseableIterable<KeyGroupStatePartitionStreamProvider>) context.getRawKeyedStateInputs();
super.initializeState(context);
}
示例6
protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(
Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw operator state, but found state alternative.");
if (rawOperatorState != null) {
return new CloseableIterable<StatePartitionStreamProvider>() {
final CloseableRegistry closeableRegistry = new CloseableRegistry();
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Nonnull
@Override
public Iterator<StatePartitionStreamProvider> iterator() {
return new OperatorStateStreamIterator(
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
rawOperatorState.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例7
protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(
Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<KeyedStateHandle> rawKeyedState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw keyed state, but found state alternative.");
if (rawKeyedState != null) {
Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState);
final CloseableRegistry closeableRegistry = new CloseableRegistry();
return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Override
public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例8
StreamOperatorStateContextImpl(
boolean restored,
OperatorStateBackend operatorStateBackend,
AbstractKeyedStateBackend<?> keyedStateBackend,
InternalTimeServiceManager<?> internalTimeServiceManager,
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs,
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {
this.restored = restored;
this.operatorStateBackend = operatorStateBackend;
this.keyedStateBackend = keyedStateBackend;
this.internalTimeServiceManager = internalTimeServiceManager;
this.rawOperatorStateInputs = rawOperatorStateInputs;
this.rawKeyedStateInputs = rawKeyedStateInputs;
}
示例9
@Override
public void initializeState(StateInitializationContext context) throws Exception {
keyedStateBackend = (AbstractKeyedStateBackend<?>) getKeyedStateBackend();
operatorStateBackend = getOperatorStateBackend();
rawOperatorStateInputs =
(CloseableIterable<StatePartitionStreamProvider>) context.getRawOperatorStateInputs();
rawKeyedStateInputs =
(CloseableIterable<KeyGroupStatePartitionStreamProvider>) context.getRawKeyedStateInputs();
super.initializeState(context);
}
示例10
protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(
Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw operator state, but found state alternative.");
if (rawOperatorState != null) {
return new CloseableIterable<StatePartitionStreamProvider>() {
final CloseableRegistry closeableRegistry = new CloseableRegistry();
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Nonnull
@Override
public Iterator<StatePartitionStreamProvider> iterator() {
return new OperatorStateStreamIterator(
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
rawOperatorState.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例11
protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(
Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) {
if (restoreStateAlternatives.hasNext()) {
Collection<KeyedStateHandle> rawKeyedState = restoreStateAlternatives.next();
// TODO currently this does not support local state recovery, so we expect there is only one handle.
Preconditions.checkState(
!restoreStateAlternatives.hasNext(),
"Local recovery is currently not implemented for raw keyed state, but found state alternative.");
if (rawKeyedState != null) {
Collection<KeyGroupsStateHandle> keyGroupsStateHandles = transform(rawKeyedState);
final CloseableRegistry closeableRegistry = new CloseableRegistry();
return new CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
@Override
public void close() throws IOException {
closeableRegistry.close();
}
@Override
public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
}
};
}
}
return CloseableIterable.empty();
}
示例12
StreamOperatorStateContextImpl(
boolean restored,
OperatorStateBackend operatorStateBackend,
AbstractKeyedStateBackend<?> keyedStateBackend,
InternalTimeServiceManager<?> internalTimeServiceManager,
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs,
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {
this.restored = restored;
this.operatorStateBackend = operatorStateBackend;
this.keyedStateBackend = keyedStateBackend;
this.internalTimeServiceManager = internalTimeServiceManager;
this.rawOperatorStateInputs = rawOperatorStateInputs;
this.rawKeyedStateInputs = rawKeyedStateInputs;
}
示例13
@Override
public void initializeState(StateInitializationContext controller) throws Exception {
keyedStateBackend = (AbstractKeyedStateBackend<?>) getKeyedStateBackend();
operatorStateBackend = getOperatorStateBackend();
rawOperatorStateInputs =
(CloseableIterable<StatePartitionStreamProvider>) controller.getRawOperatorStateInputs();
rawKeyedStateInputs =
(CloseableIterable<KeyGroupStatePartitionStreamProvider>) controller.getRawKeyedStateInputs();
super.initializeState(controller);
}
示例14
@Override
public StreamOperatorStateContext streamOperatorStateContext(
@Nonnull OperatorID operatorID,
@Nonnull String operatorClassName,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup) throws Exception {
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
if (keyedStatedBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}
// release resource (e.g native resource)
keyedStatedBackend.dispose();
}
if (operatorStateBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
IOUtils.closeQuietly(operatorStateBackend);
}
operatorStateBackend.dispose();
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
IOUtils.closeQuietly(rawKeyedStateInputs);
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}
throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
}
}
示例15
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return rawOperatorStateInputs;
}
示例16
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return rawKeyedStateInputs;
}
示例17
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
示例18
@Test
public void testNoRestore() throws Exception {
MemoryStateBackend stateBackend = spy(new MemoryStateBackend(1024));
// No job manager provided state to restore
StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(stateBackend, null, true);
OperatorID operatorID = new OperatorID(47L, 11L);
AbstractStreamOperator<?> streamOperator = mock(AbstractStreamOperator.class);
when(streamOperator.getOperatorID()).thenReturn(operatorID);
TypeSerializer<?> typeSerializer = new IntSerializer();
CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(
streamOperator.getOperatorID(),
streamOperator.getClass().getSimpleName(),
streamOperator,
typeSerializer,
closeableRegistry,
new UnregisteredMetricsGroup());
OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend();
InternalTimeServiceManager<?> timeServiceManager = stateContext.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = stateContext.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = stateContext.rawOperatorStateInputs();
Assert.assertEquals(false, stateContext.isRestored());
Assert.assertNotNull(operatorStateBackend);
Assert.assertNotNull(keyedStateBackend);
Assert.assertNotNull(timeServiceManager);
Assert.assertNotNull(keyedStateInputs);
Assert.assertNotNull(operatorStateInputs);
checkCloseablesRegistered(
closeableRegistry,
operatorStateBackend,
keyedStateBackend,
keyedStateInputs,
operatorStateInputs);
Assert.assertFalse(keyedStateInputs.iterator().hasNext());
Assert.assertFalse(operatorStateInputs.iterator().hasNext());
}
示例19
@Override
public StreamTaskStateInitializer createStreamTaskStateInitializer() {
final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(
operatorID,
operatorClassName,
keyContext,
keySerializer,
closeableRegistry,
metricGroup);
return new StreamOperatorStateContext() {
@Override
public boolean isRestored() {
return context.isRestored();
}
@Override
public OperatorStateBackend operatorStateBackend() {
return context.operatorStateBackend();
}
@Override
public AbstractKeyedStateBackend<?> keyedStateBackend() {
return context.keyedStateBackend();
}
@Override
public InternalTimeServiceManager<?> internalTimerServiceManager() {
InternalTimeServiceManager<?> timeServiceManager = context.internalTimerServiceManager();
return timeServiceManager != null ? spy(timeServiceManager) : null;
}
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return replaceWithSpy(context.rawOperatorStateInputs());
}
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return replaceWithSpy(context.rawKeyedStateInputs());
}
public <T extends Closeable> T replaceWithSpy(T closeable) {
T spyCloseable = spy(closeable);
if (closeableRegistry.unregisterCloseable(closeable)) {
try {
closeableRegistry.registerCloseable(spyCloseable);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return spyCloseable;
}
};
};
}
示例20
@Override
public StreamOperatorStateContext streamOperatorStateContext(
@Nonnull OperatorID operatorID,
@Nonnull String operatorClassName,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup) throws Exception {
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
if (keyedStatedBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}
// release resource (e.g native resource)
keyedStatedBackend.dispose();
}
if (operatorStateBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
IOUtils.closeQuietly(operatorStateBackend);
}
operatorStateBackend.dispose();
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
IOUtils.closeQuietly(rawKeyedStateInputs);
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}
throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
}
}
示例21
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return rawOperatorStateInputs;
}
示例22
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return rawKeyedStateInputs;
}
示例23
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
示例24
@Test
public void testNoRestore() throws Exception {
MemoryStateBackend stateBackend = spy(new MemoryStateBackend(1024));
// No job manager provided state to restore
StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(stateBackend, null, true);
OperatorID operatorID = new OperatorID(47L, 11L);
AbstractStreamOperator<?> streamOperator = mock(AbstractStreamOperator.class);
when(streamOperator.getOperatorID()).thenReturn(operatorID);
TypeSerializer<?> typeSerializer = new IntSerializer();
CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(
streamOperator.getOperatorID(),
streamOperator.getClass().getSimpleName(),
streamOperator,
typeSerializer,
closeableRegistry,
new UnregisteredMetricsGroup());
OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend();
InternalTimeServiceManager<?> timeServiceManager = stateContext.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = stateContext.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = stateContext.rawOperatorStateInputs();
Assert.assertEquals(false, stateContext.isRestored());
Assert.assertNotNull(operatorStateBackend);
Assert.assertNotNull(keyedStateBackend);
Assert.assertNotNull(timeServiceManager);
Assert.assertNotNull(keyedStateInputs);
Assert.assertNotNull(operatorStateInputs);
checkCloseablesRegistered(
closeableRegistry,
operatorStateBackend,
keyedStateBackend,
keyedStateInputs,
operatorStateInputs);
Assert.assertFalse(keyedStateInputs.iterator().hasNext());
Assert.assertFalse(operatorStateInputs.iterator().hasNext());
}
示例25
@Override
public StreamTaskStateInitializer createStreamTaskStateInitializer() {
final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(
operatorID,
operatorClassName,
keyContext,
keySerializer,
closeableRegistry,
metricGroup);
return new StreamOperatorStateContext() {
@Override
public boolean isRestored() {
return context.isRestored();
}
@Override
public OperatorStateBackend operatorStateBackend() {
return context.operatorStateBackend();
}
@Override
public AbstractKeyedStateBackend<?> keyedStateBackend() {
return context.keyedStateBackend();
}
@Override
public InternalTimeServiceManager<?> internalTimerServiceManager() {
InternalTimeServiceManager<?> timeServiceManager = context.internalTimerServiceManager();
return timeServiceManager != null ? spy(timeServiceManager) : null;
}
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return replaceWithSpy(context.rawOperatorStateInputs());
}
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return replaceWithSpy(context.rawKeyedStateInputs());
}
public <T extends Closeable> T replaceWithSpy(T closeable) {
T spyCloseable = spy(closeable);
if (closeableRegistry.unregisterCloseable(closeable)) {
try {
closeableRegistry.registerCloseable(spyCloseable);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return spyCloseable;
}
};
};
}
示例26
@Override
public StreamOperatorStateContext streamOperatorStateContext(
@Nonnull OperatorID operatorID,
@Nonnull String operatorClassName,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup) throws Exception {
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, processingTimeService, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
if (keyedStatedBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}
// release resource (e.g native resource)
keyedStatedBackend.dispose();
}
if (operatorStateBackend != null) {
if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
IOUtils.closeQuietly(operatorStateBackend);
}
operatorStateBackend.dispose();
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
IOUtils.closeQuietly(rawKeyedStateInputs);
}
if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}
throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
}
}
示例27
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return rawOperatorStateInputs;
}
示例28
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return rawKeyedStateInputs;
}
示例29
@Test
public void testNoRestore() throws Exception {
MemoryStateBackend stateBackend = spy(new MemoryStateBackend(1024));
// No job manager provided state to restore
StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(stateBackend, null, true);
OperatorID operatorID = new OperatorID(47L, 11L);
AbstractStreamOperator<?> streamOperator = mock(AbstractStreamOperator.class);
when(streamOperator.getOperatorID()).thenReturn(operatorID);
TypeSerializer<?> typeSerializer = new IntSerializer();
CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(
streamOperator.getOperatorID(),
streamOperator.getClass().getSimpleName(),
new TestProcessingTimeService(),
streamOperator,
typeSerializer,
closeableRegistry,
new UnregisteredMetricsGroup());
OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
AbstractKeyedStateBackend<?> keyedStateBackend = stateContext.keyedStateBackend();
InternalTimeServiceManager<?> timeServiceManager = stateContext.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = stateContext.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = stateContext.rawOperatorStateInputs();
Assert.assertEquals(false, stateContext.isRestored());
Assert.assertNotNull(operatorStateBackend);
Assert.assertNotNull(keyedStateBackend);
Assert.assertNotNull(timeServiceManager);
Assert.assertNotNull(keyedStateInputs);
Assert.assertNotNull(operatorStateInputs);
checkCloseablesRegistered(
closeableRegistry,
operatorStateBackend,
keyedStateBackend,
keyedStateInputs,
operatorStateInputs);
Assert.assertFalse(keyedStateInputs.iterator().hasNext());
Assert.assertFalse(operatorStateInputs.iterator().hasNext());
}
示例30
@Override
public StreamTaskStateInitializer createStreamTaskStateInitializer() {
final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext(
operatorID,
operatorClassName,
processingTimeService,
keyContext,
keySerializer,
closeableRegistry,
metricGroup);
return new StreamOperatorStateContext() {
@Override
public boolean isRestored() {
return controller.isRestored();
}
@Override
public OperatorStateBackend operatorStateBackend() {
return controller.operatorStateBackend();
}
@Override
public AbstractKeyedStateBackend<?> keyedStateBackend() {
return controller.keyedStateBackend();
}
@Override
public InternalTimeServiceManager<?> internalTimerServiceManager() {
InternalTimeServiceManager<?> timeServiceManager = controller.internalTimerServiceManager();
return timeServiceManager != null ? spy(timeServiceManager) : null;
}
@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return replaceWithSpy(controller.rawOperatorStateInputs());
}
@Override
public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
return replaceWithSpy(controller.rawKeyedStateInputs());
}
public <T extends Closeable> T replaceWithSpy(T closeable) {
T spyCloseable = spy(closeable);
if (closeableRegistry.unregisterCloseable(closeable)) {
try {
closeableRegistry.registerCloseable(spyCloseable);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return spyCloseable;
}
};
};
}