Java源码示例:org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage

示例1
/**
 * Connect to the first item in the target list.  Pass along the 
 * entire target list, the block, and the data.
 */
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
    ExtendedBlock b, BlockConstructionStage stage,
    final String clientname)  {
  if (DataTransferProtocol.LOG.isDebugEnabled()) {
    DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
        + b + " (numBytes=" + b.getNumBytes() + ")"
        + ", stage=" + stage
        + ", clientname=" + clientname
        + ", targets=" + Arrays.asList(targets)
        + ", target storage types=" + (targetStorageTypes == null ? "[]" :
        Arrays.asList(targetStorageTypes)));
  }
  this.targets = targets;
  this.targetStorageTypes = targetStorageTypes;
  this.b = b;
  this.stage = stage;
  BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
  bpReg = bpos.bpRegistration;
  this.clientname = clientname;
  this.cachingStrategy =
      new CachingStrategy(true, getDnConf().readaheadLength);
}
 
示例2
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
    String description, Boolean eofExcepted) throws IOException {
  sendBuf.reset();
  recvBuf.reset();
  writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
  if (eofExcepted) {
    sendResponse(Status.ERROR, null, null, recvOut);
    sendRecvData(description, true);
  } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
    //ok finally write a block with 0 len
    sendResponse(Status.SUCCESS, "", null, recvOut);
    sendRecvData(description, false);
  } else {
    writeZeroLengthPacket(block, description);
  }
}
 
示例3
/**
 * Connect to the first item in the target list.  Pass along the 
 * entire target list, the block, and the data.
 */
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
    ExtendedBlock b, BlockConstructionStage stage,
    final String clientname)  {
  if (DataTransferProtocol.LOG.isDebugEnabled()) {
    DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
        + b + " (numBytes=" + b.getNumBytes() + ")"
        + ", stage=" + stage
        + ", clientname=" + clientname
        + ", targets=" + Arrays.asList(targets)
        + ", target storage types=" + (targetStorageTypes == null ? "[]" :
        Arrays.asList(targetStorageTypes)));
  }
  this.targets = targets;
  this.targetStorageTypes = targetStorageTypes;
  this.b = b;
  this.stage = stage;
  BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
  bpReg = bpos.bpRegistration;
  this.clientname = clientname;
  this.cachingStrategy =
      new CachingStrategy(true, getDnConf().readaheadLength);
}
 
示例4
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
    String description, Boolean eofExcepted) throws IOException {
  sendBuf.reset();
  recvBuf.reset();
  writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
  if (eofExcepted) {
    sendResponse(Status.ERROR, null, null, recvOut);
    sendRecvData(description, true);
  } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
    //ok finally write a block with 0 len
    sendResponse(Status.SUCCESS, "", null, recvOut);
    sendRecvData(description, false);
  } else {
    writeZeroLengthPacket(block, description);
  }
}
 
示例5
/**
 * Initialize for data streaming
 */
private void initDataStreaming() {
  this.setName("DataStreamer for file " + src +
      " block " + block);
  response = new ResponseProcessor(nodes);
  response.start();
  stage = BlockConstructionStage.DATA_STREAMING;
}
 
示例6
private void endBlock() {
  if(DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("Closing old block " + block);
  }
  this.setName("DataStreamer for file " + src);
  closeResponder();
  closeStream();
  setPipeline(null, null, null);
  stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
 
示例7
/**
 * Transfer a replica to the datanode targets.
 * @param b the block to transfer.
 *          The corresponding replica must be an RBW or a Finalized.
 *          Its GS and numBytes will be set to
 *          the stored GS and the visible length. 
 * @param targets targets to transfer the block to
 * @param client client name
 */
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
    final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
    final String client) throws IOException {
  final long storedGS;
  final long visible;
  final BlockConstructionStage stage;

  //get replica information
  synchronized(data) {
    Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
        b.getBlockId());
    if (null == storedBlock) {
      throw new IOException(b + " not found in datanode.");
    }
    storedGS = storedBlock.getGenerationStamp();
    if (storedGS < b.getGenerationStamp()) {
      throw new IOException(storedGS
          + " = storedGS < b.getGenerationStamp(), b=" + b);
    }
    // Update the genstamp with storedGS
    b.setGenerationStamp(storedGS);
    if (data.isValidRbw(b)) {
      stage = BlockConstructionStage.TRANSFER_RBW;
    } else if (data.isValidBlock(b)) {
      stage = BlockConstructionStage.TRANSFER_FINALIZED;
    } else {
      final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
      throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
    }
    visible = data.getReplicaVisibleLength(b);
  }
  //set visible length
  b.setNumBytes(visible);

  if (targets.length > 0) {
    new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
  }
}
 
示例8
void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
    long newGS, DataChecksum checksum) throws IOException {
  sender.writeBlock(block, StorageType.DEFAULT,
      BlockTokenSecretManager.DUMMY_TOKEN, "cl",
      new DatanodeInfo[1], new StorageType[1], null, stage,
      0, block.getNumBytes(), block.getNumBytes(), newGS,
      checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
}
 
示例9
/**
 * Initialize for data streaming
 */
private void initDataStreaming() {
  this.setName("DataStreamer for file " + src +
      " block " + block);
  response = new ResponseProcessor(nodes);
  response.start();
  stage = BlockConstructionStage.DATA_STREAMING;
}
 
示例10
private void endBlock() {
  if(DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("Closing old block " + block);
  }
  this.setName("DataStreamer for file " + src);
  closeResponder();
  closeStream();
  setPipeline(null, null, null);
  stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
 
示例11
/**
 * Transfer a replica to the datanode targets.
 * @param b the block to transfer.
 *          The corresponding replica must be an RBW or a Finalized.
 *          Its GS and numBytes will be set to
 *          the stored GS and the visible length. 
 * @param targets targets to transfer the block to
 * @param client client name
 */
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
    final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
    final String client) throws IOException {
  final long storedGS;
  final long visible;
  final BlockConstructionStage stage;

  //get replica information
  synchronized(data) {
    Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
        b.getBlockId());
    if (null == storedBlock) {
      throw new IOException(b + " not found in datanode.");
    }
    storedGS = storedBlock.getGenerationStamp();
    if (storedGS < b.getGenerationStamp()) {
      throw new IOException(storedGS
          + " = storedGS < b.getGenerationStamp(), b=" + b);
    }
    // Update the genstamp with storedGS
    b.setGenerationStamp(storedGS);
    if (data.isValidRbw(b)) {
      stage = BlockConstructionStage.TRANSFER_RBW;
    } else if (data.isValidBlock(b)) {
      stage = BlockConstructionStage.TRANSFER_FINALIZED;
    } else {
      final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
      throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
    }
    visible = data.getReplicaVisibleLength(b);
  }
  //set visible length
  b.setNumBytes(visible);

  if (targets.length > 0) {
    new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
  }
}
 
示例12
void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
    long newGS, DataChecksum checksum) throws IOException {
  sender.writeBlock(block, StorageType.DEFAULT,
      BlockTokenSecretManager.DUMMY_TOKEN, "cl",
      new DatanodeInfo[1], new StorageType[1], null, stage,
      0, block.getNumBytes(), block.getNumBytes(), newGS,
      checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
}
 
示例13
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
  isAppend = false;
  isLazyPersistFile = isLazyPersist(stat);
  this.block = block;
  stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
 
示例14
/**
 * Construct a data streamer for appending to the last partial block
 * @param lastBlock last block of the file to be appended
 * @param stat status of the file to be appended
 * @param bytesPerChecksum number of bytes per checksum
 * @throws IOException if error occurs
 */
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
    int bytesPerChecksum) throws IOException {
  isAppend = true;
  stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
  block = lastBlock.getBlock();
  bytesSent = block.getNumBytes();
  accessToken = lastBlock.getBlockToken();
  isLazyPersistFile = isLazyPersist(stat);
  long usedInLastBlock = stat.getLen() % blockSize;
  int freeInLastBlock = (int)(blockSize - usedInLastBlock);

  // calculate the amount of free space in the pre-existing 
  // last crc chunk
  int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
  int freeInCksum = bytesPerChecksum - usedInCksum;

  // if there is space in the last block, then we have to 
  // append to that block
  if (freeInLastBlock == blockSize) {
    throw new IOException("The last block for file " + 
        src + " is full.");
  }

  if (usedInCksum > 0 && freeInCksum > 0) {
    // if there is space in the last partial chunk, then 
    // setup in such a way that the next packet will have only 
    // one chunk that fills up the partial chunk.
    //
    computePacketChunkSize(0, freeInCksum);
    setChecksumBufSize(freeInCksum);
    appendChunk = true;
  } else {
    // if the remaining space in the block is smaller than 
    // that expected size of of a packet, then create 
    // smaller size packet.
    //
    computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 
        bytesPerChecksum);
  }

  // setup pipeline to append to the last block XXX retries??
  setPipeline(lastBlock);
  errorIndex = -1;   // no errors yet.
  if (nodes.length < 1) {
    throw new IOException("Unable to retrieve blocks locations " +
        " for last block " + block +
        "of file " + src);

  }
}
 
示例15
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
  writeBlock(new ExtendedBlock(poolId, blockId),
      BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
}
 
示例16
/**
 * Test that when there is a failure replicating a block the temporary
 * and meta files are cleaned up and subsequent replication succeeds.
 */
@Test
public void testReplicationError() throws Exception {
  // create a file of replication factor of 1
  final Path fileName = new Path("/test.txt");
  final int fileLen = 1;
  DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
  DFSTestUtil.waitReplication(fs, fileName, (short)1);

  // get the block belonged to the created file
  LocatedBlocks blocks = NameNodeAdapter.getBlockLocations(
      cluster.getNameNode(), fileName.toString(), 0, (long)fileLen);
  assertEquals("Should only find 1 block", blocks.locatedBlockCount(), 1);
  LocatedBlock block = blocks.get(0);

  // bring up a second datanode
  cluster.startDataNodes(conf, 1, true, null, null);
  cluster.waitActive();
  final int sndNode = 1;
  DataNode datanode = cluster.getDataNodes().get(sndNode);
  
  // replicate the block to the second datanode
  InetSocketAddress target = datanode.getXferAddress();
  Socket s = new Socket(target.getAddress(), target.getPort());
  // write the header.
  DataOutputStream out = new DataOutputStream(s.getOutputStream());

  DataChecksum checksum = DataChecksum.newDataChecksum(
      DataChecksum.Type.CRC32, 512);
  new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
      BlockTokenSecretManager.DUMMY_TOKEN, "",
      new DatanodeInfo[0], new StorageType[0], null,
      BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
      checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
  out.flush();

  // close the connection before sending the content of the block
  out.close();

  // the temporary block & meta files should be deleted
  String bpid = cluster.getNamesystem().getBlockPoolId();
  File storageDir = cluster.getInstanceStorageDir(sndNode, 0);
  File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
  storageDir = cluster.getInstanceStorageDir(sndNode, 1);
  File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
  while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
    Thread.sleep(100);
  }

  // then increase the file's replication factor
  fs.setReplication(fileName, (short)2);
  // replication should succeed
  DFSTestUtil.waitReplication(fs, fileName, (short)1);

  // clean up the file
  fs.delete(fileName, false);
}
 
示例17
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
  isAppend = false;
  isLazyPersistFile = isLazyPersist(stat);
  this.block = block;
  stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
 
示例18
/**
 * Construct a data streamer for appending to the last partial block
 * @param lastBlock last block of the file to be appended
 * @param stat status of the file to be appended
 * @param bytesPerChecksum number of bytes per checksum
 * @throws IOException if error occurs
 */
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
    int bytesPerChecksum) throws IOException {
  isAppend = true;
  stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
  block = lastBlock.getBlock();
  bytesSent = block.getNumBytes();
  accessToken = lastBlock.getBlockToken();
  isLazyPersistFile = isLazyPersist(stat);
  long usedInLastBlock = stat.getLen() % blockSize;
  int freeInLastBlock = (int)(blockSize - usedInLastBlock);

  // calculate the amount of free space in the pre-existing 
  // last crc chunk
  int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
  int freeInCksum = bytesPerChecksum - usedInCksum;

  // if there is space in the last block, then we have to 
  // append to that block
  if (freeInLastBlock == blockSize) {
    throw new IOException("The last block for file " + 
        src + " is full.");
  }

  if (usedInCksum > 0 && freeInCksum > 0) {
    // if there is space in the last partial chunk, then 
    // setup in such a way that the next packet will have only 
    // one chunk that fills up the partial chunk.
    //
    computePacketChunkSize(0, freeInCksum);
    setChecksumBufSize(freeInCksum);
    appendChunk = true;
  } else {
    // if the remaining space in the block is smaller than 
    // that expected size of of a packet, then create 
    // smaller size packet.
    //
    computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 
        bytesPerChecksum);
  }

  // setup pipeline to append to the last block XXX retries??
  setPipeline(lastBlock);
  errorIndex = -1;   // no errors yet.
  if (nodes.length < 1) {
    throw new IOException("Unable to retrieve blocks locations " +
        " for last block " + block +
        "of file " + src);

  }
}
 
示例19
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
  writeBlock(new ExtendedBlock(poolId, blockId),
      BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
}
 
示例20
/**
 * Test that when there is a failure replicating a block the temporary
 * and meta files are cleaned up and subsequent replication succeeds.
 */
@Test
public void testReplicationError() throws Exception {
  // create a file of replication factor of 1
  final Path fileName = new Path("/test.txt");
  final int fileLen = 1;
  DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
  DFSTestUtil.waitReplication(fs, fileName, (short)1);

  // get the block belonged to the created file
  LocatedBlocks blocks = NameNodeAdapter.getBlockLocations(
      cluster.getNameNode(), fileName.toString(), 0, (long)fileLen);
  assertEquals("Should only find 1 block", blocks.locatedBlockCount(), 1);
  LocatedBlock block = blocks.get(0);

  // bring up a second datanode
  cluster.startDataNodes(conf, 1, true, null, null);
  cluster.waitActive();
  final int sndNode = 1;
  DataNode datanode = cluster.getDataNodes().get(sndNode);
  
  // replicate the block to the second datanode
  InetSocketAddress target = datanode.getXferAddress();
  Socket s = new Socket(target.getAddress(), target.getPort());
  // write the header.
  DataOutputStream out = new DataOutputStream(s.getOutputStream());

  DataChecksum checksum = DataChecksum.newDataChecksum(
      DataChecksum.Type.CRC32, 512);
  new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
      BlockTokenSecretManager.DUMMY_TOKEN, "",
      new DatanodeInfo[0], new StorageType[0], null,
      BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
      checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
  out.flush();

  // close the connection before sending the content of the block
  out.close();

  // the temporary block & meta files should be deleted
  String bpid = cluster.getNamesystem().getBlockPoolId();
  File storageDir = cluster.getInstanceStorageDir(sndNode, 0);
  File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
  storageDir = cluster.getInstanceStorageDir(sndNode, 1);
  File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
  while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
    Thread.sleep(100);
  }

  // then increase the file's replication factor
  fs.setReplication(fileName, (short)2);
  // replication should succeed
  DFSTestUtil.waitReplication(fs, fileName, (short)1);

  // clean up the file
  fs.delete(fileName, false);
}
 
示例21
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
    Class<? extends Channel> channelClass) {
  StorageType[] storageTypes = locatedBlock.getStorageTypes();
  DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
  boolean connectToDnViaHostname =
      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
  int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
  ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
  blockCopy.setNumBytes(locatedBlock.getBlockSize());
  ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
    .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
      .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
    .setClientName(clientName).build();
  ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
  OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
      .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
      .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
      .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
      .setRequestedChecksum(checksumProto)
      .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
  List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
  for (int i = 0; i < datanodeInfos.length; i++) {
    DatanodeInfo dnInfo = datanodeInfos[i];
    StorageType storageType = storageTypes[i];
    Promise<Channel> promise = eventLoopGroup.next().newPromise();
    futureList.add(promise);
    String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
    new Bootstrap().group(eventLoopGroup).channel(channelClass)
        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {

          @Override
          protected void initChannel(Channel ch) throws Exception {
            // we need to get the remote address of the channel so we can only move on after
            // channel connected. Leave an empty implementation here because netty does not allow
            // a null handler.
          }
        }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {

          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
                timeoutMs, client, locatedBlock.getBlockToken(), promise);
            } else {
              promise.tryFailure(future.cause());
            }
          }
        });
  }
  return futureList;
}