IRecordProcessorFactory createRecordProcessorFactory(String stream) {
return () -> {
// This code is executed in Kinesis thread context.
try {
SystemStreamPartition ssp = sspAllocator.allocate(stream);
KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
+ " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
return processor;
} catch (Exception e) {
callbackException = e;
// This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
// Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
// will be required at this point. After the job restart, the newly created shards will be discovered and enough
// ssps will be added to sspAllocator freePool.
throw new SamzaException(e);
}
};
}
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
IntStream.range(0, numShards)
.forEach(p -> {
String shardId = String.format("shard-%05d", p);
// Create Kinesis processor
KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
// Initialize the shard
ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
InitializationInput initializationInput =
new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
processor.initialize(initializationInput);
processorMap.put(shardId, processor);
});
return processorMap;
}
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
KinesisClientLibConfiguration kclConfig =
new KinesisClientLibConfiguration(
conf.applicationName,
conf.streamName,
credentials,
getWorkerId()
);
kclConfig
.withMaxRecords(maxBatchSize)
.withCallProcessRecordsEvenForEmptyRecordList(false)
.withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
.withKinesisClientConfig(clientConfiguration);
if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
} else if (conf.initialPositionInStream == InitialPositionInStream.LATEST || conf.initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
kclConfig.withInitialPositionInStream(conf.initialPositionInStream);
}
if (conf.region == AwsRegion.OTHER) {
kclConfig.withKinesisEndpoint(conf.endpoint);
} else {
kclConfig.withRegionName(conf.region.getId());
}
return new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.metricsFactory(metricsFactory)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(cloudWatchClient)
.execService(executor)
.config(kclConfig)
.build();
}
/**
* Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
* 1. Creation of record processors.
* 2. Initialization of record processors.
* 3. Processing records via record processors.
* 4. Calling checkpoint on record processors.
* 5. Shutting down (due to re-assignment or lease expiration) record processors.
*/
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
KinesisConfig kConfig = new KinesisConfig(new MapConfig());
// Create consumer
KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
initializeMetrics(consumer, stream);
List<SystemStreamPartition> ssps = new LinkedList<>();
IntStream.range(0, numShards)
.forEach(p -> {
SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
ssps.add(ssp);
});
ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
// Create Kinesis record processor factory
IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);
// Create and initialize Kinesis record processor
Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());
// Generate records to Kinesis record processor
Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);
// Verification steps
// Read events from the BEM queue
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
if (numRecordsPerShard > 0) {
Assert.assertEquals(messages.size(), numShards);
} else {
// No input records and hence no messages
Assert.assertEquals(messages.size(), 0);
return;
}
Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
ssps.forEach(ssp -> {
try {
KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
// Verify that the read messages are received in order and are the same as input records
Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
List<Record> inputRecords = inputRecordMap.get(processor);
verifyRecords(envelopes, inputRecords, processor.getShardId());
// Call checkpoint on consumer and verify that the checkpoint is called with the right offset
IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
verify(getCheckpointer(processor)).checkpoint(argument.capture());
Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
// Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
Assert.assertFalse(sspToProcessorMap.containsValue(processor));
Assert.assertTrue(isSspAvailable(consumer, ssp));
} catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
throw new RuntimeException(ex);
}
});
}