Java源码示例:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory

示例1
IRecordProcessorFactory createRecordProcessorFactory(String stream) {
  return () -> {
    // This code is executed in Kinesis thread context.
    try {
      SystemStreamPartition ssp = sspAllocator.allocate(stream);
      KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
      KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
      Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
              + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
      return processor;
    } catch (Exception e) {
      callbackException = e;
      // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
      // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
      // will be required at this point. After the job restart, the newly created shards will be discovered and enough
      // ssps will be added to sspAllocator freePool.
      throw new SamzaException(e);
    }
  };
}
 
示例2
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
  Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
        String shardId = String.format("shard-%05d", p);
        // Create Kinesis processor
        KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();

        // Initialize the shard
        ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
        InitializationInput initializationInput =
            new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
        processor.initialize(initializationInput);
        processorMap.put(shardId, processor);
      });
  return processorMap;
}
 
示例3
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
  KinesisClientLibConfiguration kclConfig =
      new KinesisClientLibConfiguration(
          conf.applicationName,
          conf.streamName,
          credentials,
          getWorkerId()
      );

  kclConfig
      .withMaxRecords(maxBatchSize)
      .withCallProcessRecordsEvenForEmptyRecordList(false)
      .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
      .withKinesisClientConfig(clientConfiguration);

  if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
    kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
  } else if (conf.initialPositionInStream == InitialPositionInStream.LATEST || conf.initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
    kclConfig.withInitialPositionInStream(conf.initialPositionInStream);
  }

  if (conf.region == AwsRegion.OTHER) {
    kclConfig.withKinesisEndpoint(conf.endpoint);
  } else {
    kclConfig.withRegionName(conf.region.getId());
  }

  return new Worker.Builder()
      .recordProcessorFactory(recordProcessorFactory)
      .metricsFactory(metricsFactory)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .execService(executor)
      .config(kclConfig)
      .build();
}
 
示例4
/**
 * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
 * 1. Creation of record processors.
 * 2. Initialization of record processors.
 * 3. Processing records via record processors.
 * 4. Calling checkpoint on record processors.
 * 5. Shutting down (due to re-assignment or lease expiration) record processors.
 */
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
    throws InterruptedException, NoSuchFieldException, IllegalAccessException {

  KinesisConfig kConfig = new KinesisConfig(new MapConfig());
  // Create consumer
  KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
  initializeMetrics(consumer, stream);

  List<SystemStreamPartition> ssps = new LinkedList<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
        SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
        ssps.add(ssp);
      });
  ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));

  // Create Kinesis record processor factory
  IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);

  // Create and initialize Kinesis record processor
  Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
  List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());

  // Generate records to Kinesis record processor
  Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);

  // Verification steps

  // Read events from the BEM queue
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
      readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
  if (numRecordsPerShard > 0) {
    Assert.assertEquals(messages.size(), numShards);
  } else {
    // No input records and hence no messages
    Assert.assertEquals(messages.size(), 0);
    return;
  }

  Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
  ssps.forEach(ssp -> {
    try {
      KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);

      // Verify that the read messages are received in order and are the same as input records
      Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
      List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
      List<Record> inputRecords = inputRecordMap.get(processor);
      verifyRecords(envelopes, inputRecords, processor.getShardId());

      // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
      IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
      consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
      ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
      verify(getCheckpointer(processor)).checkpoint(argument.capture());
      Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());

      // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
      shutDownProcessor(processor, ShutdownReason.ZOMBIE);
      Assert.assertFalse(sspToProcessorMap.containsValue(processor));
      Assert.assertTrue(isSspAvailable(consumer, ssp));
    } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
      throw new RuntimeException(ex);
    }
  });
}