Java源码示例:org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent

示例1
@Test
public void testIncrementNodeCapacityUnderCapacity() throws Exception {
  resetNodeTotalCapability(nodeOne, 0, 0);
  resetNodeTotalCapability(nodeTwo, 2, 512);
  Offer offerOne = TestObjectFactory.getOffer("localhost-one", "slave-one", "mock", "offer-one", 1.0, 512.0);
  Offer offerTwo = TestObjectFactory.getOffer("localhost-two", "slave-two", "mock", "offer-two", 3.0, 1024.0);
  olManager.addOffers(offerOne);
  olManager.addOffers(offerTwo);
      
  RMNodeStatusEvent eventOne = getRMStatusEvent(nodeOne);
  handler.beforeRMNodeEventHandled(eventOne, context);
  RMNodeStatusEvent eventTwo = getRMStatusEvent(nodeTwo);
  handler.beforeRMNodeEventHandled(eventTwo, context);
  
  assertEquals(512, nodeOne.getTotalCapability().getMemory());
  assertEquals(1, nodeOne.getTotalCapability().getVirtualCores());
  assertEquals(1024, nodeTwo.getTotalCapability().getMemory());
  assertEquals(3, nodeTwo.getTotalCapability().getVirtualCores());
}
 
示例2
@Test
public void testIncrementNodeCapacityOverCapacity() throws Exception {
  resetNodeTotalCapability(nodeOne, 1, 512);
  resetNodeTotalCapability(nodeTwo, 2, 2048);
  
  //Test over memory upper limit
  Offer offerOne = TestObjectFactory.getOffer("localhost-one", "slave-one", "mock", "offer-one", 0.2, 3072.0);
  //Test over CPU cores upper limit
  Offer offerTwo = TestObjectFactory.getOffer("localhost-two", "slave-two", "mock", "offer-two", 8.0, 1024.0);
  olManager.addOffers(offerOne);
  olManager.addOffers(offerTwo);

  RMNodeStatusEvent eventOne = getRMStatusEvent(nodeOne);  
  handler.beforeRMNodeEventHandled(eventOne, context);
  RMNodeStatusEvent eventTwo = getRMStatusEvent(nodeTwo);
  handler.beforeRMNodeEventHandled(eventTwo, context);
  
  assertEquals(512, nodeOne.getTotalCapability().getMemory());
  assertEquals(1, nodeOne.getTotalCapability().getVirtualCores()); 
  assertEquals(2048, nodeTwo.getTotalCapability().getMemory());
  assertEquals(2, nodeTwo.getTotalCapability().getVirtualCores());
}
 
示例3
@Test
public void testNodesDefaultWithUnHealthyNode() throws JSONException,
    Exception {

  WebResource r = resource();
  MockNM nm1 = rm.registerNode("h1:1234", 5120);
  MockNM nm2 = rm.registerNode("h2:1235", 5121);
  rm.sendNodeStarted(nm1);
  rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
  rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);

  MockNM nm3 = rm.registerNode("h3:1236", 5122);
  rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW);
  rm.sendNodeStarted(nm3);
  rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING);
  RMNodeImpl node = (RMNodeImpl) rm.getRMContext().getRMNodes()
      .get(nm3.getNodeId());
  NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
      "test health report", System.currentTimeMillis());
  node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
      new ArrayList<ContainerStatus>(), null, null));
  rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);

  ClientResponse response =
      r.path("ws").path("v1").path("cluster").path("nodes")
        .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);

  assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
  JSONObject json = response.getEntity(JSONObject.class);
  assertEquals("incorrect number of elements", 1, json.length());
  JSONObject nodes = json.getJSONObject("nodes");
  assertEquals("incorrect number of elements", 1, nodes.length());
  JSONArray nodeArray = nodes.getJSONArray("node");
  // 3 nodes, including the unhealthy node and the new node.
  assertEquals("incorrect number of elements", 3, nodeArray.length());
}
 
示例4
private RMNodeStatusEvent getMockRMNodeStatusEvent() {
  NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);

  NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
  Boolean yes = new Boolean(true);
  doReturn(yes).when(healthStatus).getIsNodeHealthy();
  
  RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
  doReturn(healthStatus).when(event).getNodeHealthStatus();
  doReturn(response).when(event).getLatestResponse();
  doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
  return event;
}
 
示例5
@Test (timeout = 5000)
public void testExpiredContainer() {
  // Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
  
  // Expire a container
  ContainerId completedContainerId = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
  Assert.assertEquals(1, node.getContainersToCleanUp().size());
  
  // Now verify that scheduler isn't notified of an expired container
  // by checking number of 'completedContainers' it got in the previous event
  RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
  ContainerStatus containerStatus = mock(ContainerStatus.class);
  doReturn(completedContainerId).when(containerStatus).getContainerId();
  doReturn(Collections.singletonList(containerStatus)).
      when(statusEvent).getContainers();
  node.handle(statusEvent);
  /* Expect the scheduler call handle function 2 times
   * 1. RMNode status from new to Running, handle the add_node event
   * 2. handle the node update event
   */
  verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));     
}
 
示例6
@Test (timeout = 5000)
public void testStatusChange(){
  //Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  //Add info to the queue first
  node.setNextHeartBeat(false);

  ContainerId completedContainerId1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  ContainerId completedContainerId2 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 1);
      
  RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();

  ContainerStatus containerStatus1 = mock(ContainerStatus.class);
  ContainerStatus containerStatus2 = mock(ContainerStatus.class);

  doReturn(completedContainerId1).when(containerStatus1).getContainerId();
  doReturn(Collections.singletonList(containerStatus1))
      .when(statusEvent1).getContainers();
   
  doReturn(completedContainerId2).when(containerStatus2).getContainerId();
  doReturn(Collections.singletonList(containerStatus2))
      .when(statusEvent2).getContainers();

  verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); 
  node.handle(statusEvent1);
  node.handle(statusEvent2);
  verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
  Assert.assertEquals(2, node.getQueueSize());
  node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
  Assert.assertEquals(0, node.getQueueSize());
}
 
示例7
@Test(timeout=20000)
public void testUpdateHeartbeatResponseForCleanup() {
  RMNodeImpl node = getRunningNode();
  NodeId nodeId = node.getNodeID();

  // Expire a container
ContainerId completedContainerId = BuilderUtils.newContainerId(
		BuilderUtils.newApplicationAttemptId(
				BuilderUtils.newApplicationId(0, 0), 0), 0);
  node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
  Assert.assertEquals(1, node.getContainersToCleanUp().size());

  // Finish an application
  ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
  node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
  Assert.assertEquals(1, node.getAppsToCleanup().size());

  // Verify status update does not clear containers/apps to cleanup
  // but updating heartbeat response for cleanup does
  RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
  node.handle(statusEvent);
  Assert.assertEquals(1, node.getContainersToCleanUp().size());
  Assert.assertEquals(1, node.getAppsToCleanup().size());
  NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
  node.updateNodeHeartbeatResponseForCleanup(hbrsp);
  Assert.assertEquals(0, node.getContainersToCleanUp().size());
  Assert.assertEquals(0, node.getAppsToCleanup().size());
  Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
  Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
  Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
  Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
}
 
示例8
private RMNodeImpl getUnhealthyNode() {
  RMNodeImpl node = getRunningNode();
  NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
      System.currentTimeMillis());
  node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
      new ArrayList<ContainerStatus>(), null, null));
  Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
  return node;
}
 
示例9
@Test
public void testNodesDefaultWithUnHealthyNode() throws JSONException,
    Exception {

  WebResource r = resource();
  MockNM nm1 = rm.registerNode("h1:1234", 5120);
  MockNM nm2 = rm.registerNode("h2:1235", 5121);
  rm.sendNodeStarted(nm1);
  rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
  rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);

  MockNM nm3 = rm.registerNode("h3:1236", 5122);
  rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW);
  rm.sendNodeStarted(nm3);
  rm.NMwaitForState(nm3.getNodeId(), NodeState.RUNNING);
  RMNodeImpl node = (RMNodeImpl) rm.getRMContext().getRMNodes()
      .get(nm3.getNodeId());
  NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
      "test health report", System.currentTimeMillis());
  node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
      new ArrayList<ContainerStatus>(), null, null));
  rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);

  ClientResponse response =
      r.path("ws").path("v1").path("cluster").path("nodes")
        .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);

  assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
  JSONObject json = response.getEntity(JSONObject.class);
  assertEquals("incorrect number of elements", 1, json.length());
  JSONObject nodes = json.getJSONObject("nodes");
  assertEquals("incorrect number of elements", 1, nodes.length());
  JSONArray nodeArray = nodes.getJSONArray("node");
  // 3 nodes, including the unhealthy node and the new node.
  assertEquals("incorrect number of elements", 3, nodeArray.length());
}
 
示例10
private RMNodeStatusEvent getMockRMNodeStatusEvent() {
  NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);

  NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
  Boolean yes = new Boolean(true);
  doReturn(yes).when(healthStatus).getIsNodeHealthy();
  
  RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
  doReturn(healthStatus).when(event).getNodeHealthStatus();
  doReturn(response).when(event).getLatestResponse();
  doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
  return event;
}
 
示例11
@Test (timeout = 5000)
public void testExpiredContainer() {
  // Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
  
  // Expire a container
  ContainerId completedContainerId = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
  Assert.assertEquals(1, node.getContainersToCleanUp().size());
  
  // Now verify that scheduler isn't notified of an expired container
  // by checking number of 'completedContainers' it got in the previous event
  RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
  ContainerStatus containerStatus = mock(ContainerStatus.class);
  doReturn(completedContainerId).when(containerStatus).getContainerId();
  doReturn(Collections.singletonList(containerStatus)).
      when(statusEvent).getContainers();
  node.handle(statusEvent);
  /* Expect the scheduler call handle function 2 times
   * 1. RMNode status from new to Running, handle the add_node event
   * 2. handle the node update event
   */
  verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));     
}
 
示例12
@Test (timeout = 5000)
public void testStatusChange(){
  //Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  //Add info to the queue first
  node.setNextHeartBeat(false);

  ContainerId completedContainerId1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  ContainerId completedContainerId2 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 1);
      
  RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();

  ContainerStatus containerStatus1 = mock(ContainerStatus.class);
  ContainerStatus containerStatus2 = mock(ContainerStatus.class);

  doReturn(completedContainerId1).when(containerStatus1).getContainerId();
  doReturn(Collections.singletonList(containerStatus1))
      .when(statusEvent1).getContainers();
   
  doReturn(completedContainerId2).when(containerStatus2).getContainerId();
  doReturn(Collections.singletonList(containerStatus2))
      .when(statusEvent2).getContainers();

  verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); 
  node.handle(statusEvent1);
  node.handle(statusEvent2);
  verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
  Assert.assertEquals(2, node.getQueueSize());
  node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
  Assert.assertEquals(0, node.getQueueSize());
}
 
示例13
@Test(timeout=20000)
public void testUpdateHeartbeatResponseForCleanup() {
  RMNodeImpl node = getRunningNode();
  NodeId nodeId = node.getNodeID();

  // Expire a container
ContainerId completedContainerId = BuilderUtils.newContainerId(
		BuilderUtils.newApplicationAttemptId(
				BuilderUtils.newApplicationId(0, 0), 0), 0);
  node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
  Assert.assertEquals(1, node.getContainersToCleanUp().size());

  // Finish an application
  ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
  node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
  Assert.assertEquals(1, node.getAppsToCleanup().size());

  // Verify status update does not clear containers/apps to cleanup
  // but updating heartbeat response for cleanup does
  RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
  node.handle(statusEvent);
  Assert.assertEquals(1, node.getContainersToCleanUp().size());
  Assert.assertEquals(1, node.getAppsToCleanup().size());
  NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
  node.updateNodeHeartbeatResponseForCleanup(hbrsp);
  Assert.assertEquals(0, node.getContainersToCleanUp().size());
  Assert.assertEquals(0, node.getAppsToCleanup().size());
  Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
  Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
  Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
  Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
}
 
示例14
private RMNodeImpl getUnhealthyNode() {
  RMNodeImpl node = getRunningNode();
  NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
      System.currentTimeMillis());
  node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
      new ArrayList<ContainerStatus>(), null, null));
  Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
  return node;
}
 
示例15
@VisibleForTesting
protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
  if (!(event instanceof RMNodeStatusEvent)) {
    logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName());
    return;
  }

  RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
  RMNode rmNode = context.getRMNodes().get(event.getNodeId());
  String hostName = rmNode.getNodeID().getHost();

  Node host = nodeStore.getNode(hostName);
  if (host != null) {
    host.snapshotRunningContainers();
  }

  /*
   * Set the new node capacity which is the sum of the current node resources plus those offered by Mesos. 
   * If the sum is greater than the max capacity of the node, reject the offer.
   */
  Resource offeredResources = getNewResourcesOfferedByMesos(hostName);
  Resource currentResources = getResourcesUnderUse(statusEvent);
  
  if (offerWithinResourceLimits(currentResources, offeredResources)) {
    yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(currentResources, offeredResources));
    logger.info("Updated resources for {} with {} cores and {} memory", rmNode.getNode().getName(), 
            offeredResources.getVirtualCores(), offeredResources.getMemory());
  } else {
    logger.info("Did not update {} with {} cores and {} memory, over max cpu cores and/or max memory", 
            rmNode.getNode().getName(), offeredResources.getVirtualCores(), offeredResources.getMemory());
  }
}
 
示例16
@VisibleForTesting
protected Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
  Resource usedResources = Resource.newInstance(0, 0);
  for (ContainerStatus status : statusEvent.getContainers()) {
    if (containerInUse(status)) {
      RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId());
      // (sdaingade) This check is needed as RMContainer information may not be populated
      // immediately after a RM restart.
      if (rmContainer != null) {
        Resources.addTo(usedResources, rmContainer.getAllocatedResource());
      }
    }
  }
  return usedResources;
}
 
示例17
public static RMNodeStatusEvent getRMStatusEvent(RMNode node) {
  NodeId id = node.getNodeID();
  NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, "HEALTHY", System.currentTimeMillis());
  List<ContainerStatus> cStatus = Lists.newArrayList(getContainerStatus(node));
  List<ApplicationId> keepAliveIds = Lists.newArrayList(getApplicationId(node.getHttpPort()));
  NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl();
  
  return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response);
}
 
示例18
private RMNodeStatusEvent getRMStatusEvent(RMNode node) {
  NodeId id = node.getNodeID();
  NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, "HEALTHY", System.currentTimeMillis());
  List<ContainerStatus> cStatus = Lists.newArrayList(getContainerStatus(node));
  List<ApplicationId> keepAliveIds = Lists.newArrayList(getApplicationId(node.getHttpPort()));
  NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl();
  
  return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response);
}
 
示例19
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
  //Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  
  NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
  RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
  node2.handle(new RMNodeStartedEvent(null, null, null));
  
  ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 1);
  ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 2);
 
  RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
  
  ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
  ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
  ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);

  doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode1))
      .when(statusEventFromNode1).getContainers();
  node.handle(statusEventFromNode1);
  Assert.assertEquals(1, completedContainers.size());
  Assert.assertEquals(completedContainerIdFromNode1,
      completedContainers.get(0).getContainerId());

  completedContainers.clear();

  doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode2_1))
      .when(statusEventFromNode2_1).getContainers();

  doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode2_2))
      .when(statusEventFromNode2_2).getContainers();

  node2.setNextHeartBeat(false);
  node2.handle(statusEventFromNode2_1);
  node2.setNextHeartBeat(true);
  node2.handle(statusEventFromNode2_2);

  Assert.assertEquals(2, completedContainers.size());
  Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
      .getContainerId()); 
  Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
      .getContainerId());   
}
 
示例20
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
  //Start the node
  node.handle(new RMNodeStartedEvent(null, null, null));
  
  NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
  RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
  node2.handle(new RMNodeStartedEvent(null, null, null));
  
  ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(0, 0), 0), 0);
  ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 1);
  ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
      BuilderUtils.newApplicationAttemptId(
          BuilderUtils.newApplicationId(1, 1), 1), 2);
 
  RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
  RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
  
  ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
  ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
  ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);

  doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode1))
      .when(statusEventFromNode1).getContainers();
  node.handle(statusEventFromNode1);
  Assert.assertEquals(1, completedContainers.size());
  Assert.assertEquals(completedContainerIdFromNode1,
      completedContainers.get(0).getContainerId());

  completedContainers.clear();

  doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode2_1))
      .when(statusEventFromNode2_1).getContainers();

  doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
      .getContainerId();
  doReturn(Collections.singletonList(containerStatusFromNode2_2))
      .when(statusEventFromNode2_2).getContainers();

  node2.setNextHeartBeat(false);
  node2.handle(statusEventFromNode2_1);
  node2.setNextHeartBeat(true);
  node2.handle(statusEventFromNode2_2);

  Assert.assertEquals(2, completedContainers.size());
  Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
      .getContainerId()); 
  Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
      .getContainerId());   
}