@Override
public AmazonKinesisFirehose amazonKinesisFirehose() {
return decorateWithConfigsAndBuild(
AmazonKinesisFirehoseClientBuilder.standard(),
LocalstackDocker::getEndpointFirehose
);
}
@Override
protected List<ConfigIssue> init() {
List<ConfigIssue> issues = super.init();
errorRecordHandler = new DefaultErrorRecordHandler(getContext());
if (!issues.isEmpty()) {
return issues;
}
conf.init(getContext(), issues);
if (!issues.isEmpty()) {
return issues;
}
generatorFactory = conf.dataFormatConfig.getDataGeneratorFactory();
try {
AmazonKinesisFirehoseClientBuilder builder = AmazonKinesisFirehoseClientBuilder
.standard()
.withCredentials(AWSUtil.getCredentialsProvider(conf.awsConfig));
if (conf.region == AwsRegion.OTHER) {
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(conf.endpoint, null));
} else {
builder.withRegion(conf.region.getId());
}
firehoseClient = builder.build();
} catch (StageException ex) {
LOG.error(Utils.format(Errors.KINESIS_12.getMessage(), ex.toString()), ex);
issues.add(getContext().createConfigIssue(
Groups.KINESIS.name(),
"kinesisConfig.awsConfig.awsAccessKeyId",
Errors.KINESIS_12,
ex.toString()
));
}
return issues;
}
public static Map<String,List<DeliveryStreamVH>> fetchDeliveryStreamInfo(BasicSessionCredentials temporaryCredentials, String skipRegions,String accountId,String accountName) {
Map<String,List<DeliveryStreamVH>> deliveryStream = new LinkedHashMap<>();
AmazonKinesisFirehose amazonKinesisFirehose;
String expPrefix = InventoryConstants.ERROR_PREFIX_CODE+accountId + "\",\"Message\": \"Exception in fetching info for resource\" ,\"type\": \"deliverystream\"" ;
for(Region region : RegionUtils.getRegions()) {
try{
if(!skipRegions.contains(region.getName())){
amazonKinesisFirehose = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(temporaryCredentials)).withRegion(region.getName()).build();
ListDeliveryStreamsResult listDeliveryStreamsResult = amazonKinesisFirehose.listDeliveryStreams(new ListDeliveryStreamsRequest().withLimit(100));
List<String> deliveryStreamNamesTemp = listDeliveryStreamsResult.getDeliveryStreamNames();
List<String> deliveryStreamNames = new ArrayList<>(deliveryStreamNamesTemp);
while (listDeliveryStreamsResult.isHasMoreDeliveryStreams() && !deliveryStreamNamesTemp.isEmpty()) {
listDeliveryStreamsResult = amazonKinesisFirehose.listDeliveryStreams(new ListDeliveryStreamsRequest().withExclusiveStartDeliveryStreamName(deliveryStreamNamesTemp.get(deliveryStreamNamesTemp.size() - 1)).withLimit(100));
deliveryStreamNamesTemp = listDeliveryStreamsResult.getDeliveryStreamNames();
deliveryStreamNames.addAll(deliveryStreamNamesTemp);
}
List<DeliveryStreamVH> deliveryStreamList = new ArrayList<>();
for(String deliveryStreamName : deliveryStreamNames) {
DeliveryStreamDescription deliveryStreamDescription = amazonKinesisFirehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName(deliveryStreamName).withLimit(100)).getDeliveryStreamDescription();
ListTagsForDeliveryStreamResult listTagsForDeliveryStreamResult = amazonKinesisFirehose.listTagsForDeliveryStream(new ListTagsForDeliveryStreamRequest().withDeliveryStreamName(deliveryStreamName));
List<Tag> tagsTemp = listTagsForDeliveryStreamResult.getTags();
List<Tag> tags = new ArrayList<>(tagsTemp);
while (listTagsForDeliveryStreamResult.isHasMoreTags() && !tagsTemp.isEmpty()) {
listTagsForDeliveryStreamResult = amazonKinesisFirehose.listTagsForDeliveryStream(new ListTagsForDeliveryStreamRequest().withExclusiveStartTagKey(tagsTemp.get(tagsTemp.size() - 1).getKey()));
tagsTemp = listTagsForDeliveryStreamResult.getTags();
tags.addAll(tagsTemp);
}
if(deliveryStreamDescription.getDestinations().isEmpty()) {
deliveryStreamList.add(new DeliveryStreamVH(deliveryStreamDescription,null, tags));
} else {
deliveryStreamList.add(new DeliveryStreamVH(deliveryStreamDescription,deliveryStreamDescription.getDestinations().get(0), tags));
}
}
if( !deliveryStreamList.isEmpty() ) {
log.debug(InventoryConstants.ACCOUNT + accountId +" Type : deliverystream "+region.getName() + " >> "+deliveryStreamList.size());
deliveryStream.put(accountId+delimiter+accountName+delimiter+region.getName(),deliveryStreamList);
}
}
} catch(Exception e){
log.warn(expPrefix+ region.getName()+InventoryConstants.ERROR_CAUSE +e.getMessage()+"\"}");
ErrorManageUtil.uploadError(accountId, region.getName(),"deliverystream",e.getMessage());
}
}
return deliveryStream;
}