Java源码示例:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
示例1
/**
* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped
* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}.
*/
@Test
public void testResponsePartitionNotFoundException() {
final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
new PartitionRequestQueue(),
true);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
final ResultPartitionID partitionId = new ResultPartitionID();
// Write the message of partition request to server
channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2));
channel.runPendingTasks();
// Read the response message after handling partition request
final Object msg = channel.readOutbound();
assertThat(msg, instanceOf(ErrorResponse.class));
final ErrorResponse err = (ErrorResponse) msg;
assertThat(err.cause, instanceOf(PartitionNotFoundException.class));
final ResultPartitionID actualPartitionId = ((PartitionNotFoundException) err.cause).getPartitionId();
assertThat(partitionId, is(actualPartitionId));
}
示例2
/**
* Tests that {@link RemoteInputChannel#retriggerSubpartitionRequest(int)} would throw
* the {@link PartitionNotFoundException} if backoff is 0.
*/
@Test
public void testPartitionNotFoundExceptionWhileRetriggeringRequest() throws Exception {
final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(
createSingleInputGate(1), 0, new TestingConnectionManager());
// Request partition to initialize client to avoid illegal state after retriggering partition
inputChannel.requestSubpartition(0);
// The default backoff is 0 then it would set PartitionNotFoundException on this channel
inputChannel.retriggerSubpartitionRequest(0);
try {
inputChannel.checkError();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(inputChannel.getPartitionId(), is(notFound.getPartitionId()));
}
}
示例3
/**
* Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
* then it would be thrown directly via {@link SingleInputGate#getNextBufferOrEvent()}. So we
* could confirm the {@link SingleInputGate} would not swallow or transform the original exception.
*/
@Test
public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final ResultPartitionID partitionId = localChannel.getPartitionId();
inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
localChannel.setError(new PartitionNotFoundException(partitionId));
try {
inputGate.getNext();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(partitionId, is(notFound.getPartitionId()));
}
}
示例4
/**
* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped
* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}.
*/
@Test
public void testResponsePartitionNotFoundException() {
final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
new PartitionRequestQueue());
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
final ResultPartitionID partitionId = new ResultPartitionID();
// Write the message of partition request to server
channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2));
channel.runPendingTasks();
// Read the response message after handling partition request
final Object msg = channel.readOutbound();
assertThat(msg, instanceOf(ErrorResponse.class));
final ErrorResponse err = (ErrorResponse) msg;
assertThat(err.cause, instanceOf(PartitionNotFoundException.class));
final ResultPartitionID actualPartitionId = ((PartitionNotFoundException) err.cause).getPartitionId();
assertThat(partitionId, is(actualPartitionId));
}
示例5
/**
* Tests that {@link RemoteInputChannel#retriggerSubpartitionRequest(int)} would throw
* the {@link PartitionNotFoundException} if backoff is 0.
*/
@Test
public void testPartitionNotFoundExceptionWhileRetriggeringRequest() throws Exception {
final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(
createSingleInputGate(1), 0, new TestingConnectionManager());
// Request partition to initialize client to avoid illegal state after retriggering partition
inputChannel.requestSubpartition(0);
// The default backoff is 0 then it would set PartitionNotFoundException on this channel
inputChannel.retriggerSubpartitionRequest(0);
try {
inputChannel.checkError();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(inputChannel.getPartitionId(), is(notFound.getPartitionId()));
}
}
示例6
/**
* Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
* then it would be thrown directly via {@link SingleInputGate#getNext()}. So we
* could confirm the {@link SingleInputGate} would not swallow or transform the original exception.
*/
@Test
public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final ResultPartitionID partitionId = localChannel.getPartitionId();
inputGate.setInputChannels(localChannel);
localChannel.setError(new PartitionNotFoundException(partitionId));
try {
inputGate.getNext();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(partitionId, is(notFound.getPartitionId()));
}
}
示例7
/**
* Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
* {@link PartitionNotFoundException} is received.
*/
@Test
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final ErrorResponse partitionNotFound = new ErrorResponse(
new PartitionNotFoundException(new ResultPartitionID()),
inputChannel.getInputChannelId());
final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
client.addInputChannel(inputChannel);
// Mock channel context
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(mock(Channel.class));
client.channelActive(ctx);
client.channelRead(ctx, partitionNotFound);
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
示例8
/**
* Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
* {@link PartitionNotFoundException} is received.
*/
@Test
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final ErrorResponse partitionNotFound = new ErrorResponse(
new PartitionNotFoundException(new ResultPartitionID()),
inputChannel.getInputChannelId());
final CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
client.addInputChannel(inputChannel);
// Mock channel context
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(mock(Channel.class));
client.channelActive(ctx);
client.channelRead(ctx, partitionNotFound);
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
示例9
/**
* Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = 10000L)
public void testRemotePartitionNotFound() throws Exception {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
// Remote location (on the same TM though) for the partition
NettyShuffleDescriptor sdd =
NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
TaskDeploymentDescriptor tdd = createReceiver(sdd);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(2)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setConfiguration(config)
.setLocalCommunication(false)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
示例10
/**
* Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = 10000L)
public void testLocalPartitionNotFound() throws Exception {
ResourceID producerLocation = ResourceID.generate();
NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
TaskDeploymentDescriptor tdd = createReceiver(shuffleDescriptor);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setResourceID(producerLocation)
.setSlotSize(1)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setConfiguration(config)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertSame(taskSlotTable.getTask(eid).getExecutionState(), ExecutionState.FAILED);
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
示例11
/**
* Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
* {@link PartitionNotFoundException} is received.
*/
@Test
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final ErrorResponse partitionNotFound = new ErrorResponse(
new PartitionNotFoundException(new ResultPartitionID()),
inputChannel.getInputChannelId());
final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
client.addInputChannel(inputChannel);
// Mock channel context
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(mock(Channel.class));
client.channelActive(ctx);
client.channelRead(ctx, partitionNotFound);
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
示例12
/**
* Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
* {@link PartitionNotFoundException} is received.
*/
@Test
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final ErrorResponse partitionNotFound = new ErrorResponse(
new PartitionNotFoundException(new ResultPartitionID()),
inputChannel.getInputChannelId());
final CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
client.addInputChannel(inputChannel);
// Mock channel context
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(mock(Channel.class));
client.channelActive(ctx);
client.channelRead(ctx, partitionNotFound);
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
示例13
/**
* Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
* if the result partition was not registered in {@link ResultPartitionManager} and no backoff.
*/
@Test
public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
try {
localChannel.requestSubpartition(0);
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId()));
}
}
示例14
/**
* Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
* {@link PartitionNotFoundException} which is set onto the input channel then.
*/
@Test
public void testChannelErrorWhileRetriggeringRequest() {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final Timer timer = new Timer(true) {
@Override
public void schedule(TimerTask task, long delay) {
task.run();
try {
localChannel.checkError();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId()));
} catch (IOException ex) {
fail("Should throw a PartitionNotFoundException.");
}
}
};
try {
localChannel.retriggerSubpartitionRequest(timer, 0);
} finally {
timer.cancel();
}
}
示例15
/**
* Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = TEST_TIMEOUT)
public void testRemotePartitionNotFound() throws Exception {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
// Remote location (on the same TM though) for the partition
NettyShuffleDescriptor sdd =
NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
TaskDeploymentDescriptor tdd = createReceiver(sdd);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(2)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setConfiguration(config)
.setLocalCommunication(false)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
示例16
/**
* Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = TEST_TIMEOUT)
public void testLocalPartitionNotFound() throws Exception {
ResourceID producerLocation = ResourceID.generate();
NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
TaskDeploymentDescriptor tdd = createReceiver(shuffleDescriptor);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setResourceID(producerLocation)
.setSlotSize(1)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setConfiguration(config)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertSame(taskSlotTable.getTask(eid).getExecutionState(), ExecutionState.FAILED);
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
示例17
/**
* Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
* {@link PartitionNotFoundException} is received.
*/
@Test
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final ErrorResponse partitionNotFound = new ErrorResponse(
new PartitionNotFoundException(new ResultPartitionID()),
inputChannel.getInputChannelId());
final CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
client.addInputChannel(inputChannel);
// Mock channel context
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(mock(Channel.class));
client.channelActive(ctx);
client.channelRead(ctx, partitionNotFound);
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
示例18
/**
* Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
* if the result partition was not registered in {@link ResultPartitionManager} and no backoff.
*/
@Test
public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
try {
localChannel.requestSubpartition(0);
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId()));
}
}
示例19
/**
* Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
* {@link PartitionNotFoundException} which is set onto the input channel then.
*/
@Test
public void testChannelErrorWhileRetriggeringRequest() {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final Timer timer = new Timer(true) {
@Override
public void schedule(TimerTask task, long delay) {
task.run();
try {
localChannel.checkError();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId()));
} catch (IOException ex) {
fail("Should throw a PartitionNotFoundException.");
}
}
};
try {
localChannel.retriggerSubpartitionRequest(timer, 0);
} finally {
timer.cancel();
}
}
示例20
private void failPartitionRequest() {
setError(new PartitionNotFoundException(partitionId));
}
示例21
/**
* Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test
public void testLocalPartitionNotFound() throws Exception {
new JavaTestKit(system){{
ActorGateway jobManager = null;
ActorGateway taskManager = null;
final ActorGateway testActorGateway = new AkkaActorGateway(
getTestActor(),
LEADER_SESSION_ID);
try {
final IntermediateDataSetID resultId = new IntermediateDataSetID();
// Create the JM
ActorRef jm = system.actorOf(Props.create(
new SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, getTestActor())));
jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
final Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
taskManager = TestingUtils.createTaskManager(
system,
highAvailabilityServices,
config,
true,
true);
// ---------------------------------------------------------------------------------
final ActorGateway tm = taskManager;
final JobID jid = new JobID();
final JobVertexID vid = new JobVertexID();
final ExecutionAttemptID eid = new ExecutionAttemptID();
final ResultPartitionID partitionId = new ResultPartitionID();
// Local location (on the same TM though) for the partition
final ResultPartitionLocation loc = ResultPartitionLocation.createLocal();
final InputChannelDeploymentDescriptor[] icdd =
new InputChannelDeploymentDescriptor[] {
new InputChannelDeploymentDescriptor(partitionId, loc)};
final InputGateDeploymentDescriptor igdd =
new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
jid, "TestJob", vid, eid,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 1, 0, 1, 0,
new Configuration(), new Configuration(),
Tasks.AgnosticReceiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(igdd),
Collections.emptyList(),
Collections.emptyList(), 0);
new Within(new FiniteDuration(120, TimeUnit.SECONDS)) {
@Override
protected void run() {
// Submit the task
tm.tell(new SubmitTask(tdd), testActorGateway);
expectMsgClass(Acknowledge.get().getClass());
// Wait to be notified about the final execution state by the mock JM
TaskExecutionState msg = expectMsgClass(TaskExecutionState.class);
// The task should fail after repeated requests
assertEquals(msg.getExecutionState(), ExecutionState.FAILED);
Throwable error = msg.getError(getClass().getClassLoader());
if (error.getClass() != PartitionNotFoundException.class) {
error.printStackTrace();
fail("Wrong exception: " + error.getMessage());
}
}
};
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
TestingUtils.stopActor(taskManager);
TestingUtils.stopActor(jobManager);
}
}};
}
示例22
private void failPartitionRequest() {
setError(new PartitionNotFoundException(partitionId));
}
示例23
private void failPartitionRequest() {
setError(new PartitionNotFoundException(partitionId));
}
示例24
/**
* Tests for scenes that a task fails for data consumption error, in which case the
* region containing the failed task, the region containing the unavailable result partition
* and all their consumer regions should be restarted.
* <pre>
* (v1) -+-> (v4)
* x
* (v2) -+-> (v5)
*
* (v3) -+-> (v6)
*
* ^
* |
* (blocking)
* </pre>
* Each vertex is in an individual region.
*/
@Test
public void testRegionFailoverForDataConsumptionErrors() throws Exception {
TestingSchedulingTopology topology = new TestingSchedulingTopology();
TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
topology.connect(v1, v4, ResultPartitionType.BLOCKING);
topology.connect(v1, v5, ResultPartitionType.BLOCKING);
topology.connect(v2, v4, ResultPartitionType.BLOCKING);
topology.connect(v2, v5, ResultPartitionType.BLOCKING);
topology.connect(v3, v6, ResultPartitionType.BLOCKING);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
// when v4 fails to consume data from v1, {v1,v4,v5} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Iterator<TestingSchedulingResultPartition> v4InputEdgeIterator = v4.getConsumedResults().iterator();
expectedResult.add(v1.getId());
expectedResult.add(v4.getId());
expectedResult.add(v5.getId());
assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v4.getId(),
new PartitionConnectionException(
new ResultPartitionID(
v4InputEdgeIterator.next().getId(),
new ExecutionAttemptID()),
new Exception("Test failure"))));
// when v4 fails to consume data from v2, {v2,v4,v5} should be restarted
expectedResult.clear();
expectedResult.add(v2.getId());
expectedResult.add(v4.getId());
expectedResult.add(v5.getId());
assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v4.getId(),
new PartitionNotFoundException(
new ResultPartitionID(
v4InputEdgeIterator.next().getId(),
new ExecutionAttemptID()))));
// when v5 fails to consume data from v1, {v1,v4,v5} should be restarted
expectedResult.clear();
Iterator<TestingSchedulingResultPartition> v5InputEdgeIterator = v5.getConsumedResults().iterator();
expectedResult.add(v1.getId());
expectedResult.add(v4.getId());
expectedResult.add(v5.getId());
assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v5.getId(),
new PartitionConnectionException(
new ResultPartitionID(
v5InputEdgeIterator.next().getId(),
new ExecutionAttemptID()),
new Exception("Test failure"))));
// when v5 fails to consume data from v2, {v2,v4,v5} should be restarted
expectedResult.clear();
expectedResult.add(v2.getId());
expectedResult.add(v4.getId());
expectedResult.add(v5.getId());
assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v5.getId(),
new PartitionNotFoundException(
new ResultPartitionID(
v5InputEdgeIterator.next().getId(),
new ExecutionAttemptID()))));
// when v6 fails to consume data from v3, {v3,v6} should be restarted
expectedResult.clear();
Iterator<TestingSchedulingResultPartition> v6InputEdgeIterator = v6.getConsumedResults().iterator();
expectedResult.add(v3.getId());
expectedResult.add(v6.getId());
assertEquals(expectedResult,
strategy.getTasksNeedingRestart(v6.getId(),
new PartitionConnectionException(
new ResultPartitionID(
v6InputEdgeIterator.next().getId(),
new ExecutionAttemptID()),
new Exception("Test failure"))));
}