
 * Add a metric event to be aggregated.
 * Events with the same name, unit, and attributes will have their values
 * aggregated into {@link StatisticSet}s, with the aggregated data
 * available via {@link #flush}.
 * @param context Metric context to use for dimension information
 * @param name Metric name
 * @param value Recorded value for the metric event
 * @param unit Unit for interpreting the value
public void add(
        final MetricRecorder.RecorderContext context,
        final Metric name,
        final double value,
        final StandardUnit unit)
    //TODO: avoid doing this every time for a context - caching, or?
    List<Dimension> dimensions = dimensionMapper.getDimensions(name, context);

    DatumKey key = new DatumKey(name.toString(), unit, dimensions);
            new StatisticSet()
 * Flush all the current aggregated MetricDatum and return as a list.
 * This is safe to call concurrently with {@link #add}.
 * All data added prior to a flush call will be included in the returned aggregate.
 * Any data added after the flush call returns will be included in a subsequent flush.
 * Data added while a flush call is processing may be included in the current flush
 * or a subsequent flush, but will not be included twice.
 * The timestamp on the aggregated data will be the time it was flushed,
 * not the time of any of the original metric events.
 * @return list of all data aggregated since the last flush
public List<MetricDatum> flush() {
    if (statisticsMap.size() == 0) {
        return Collections.emptyList();

    // Capture all the current metrics, as represented by the set of keys
    // at this time in the statisticsMap.
    // Note that this iterates over the key set of the underlying map, and
    // removes keys from the map at the same time. It is possible keys may
    // be added during this iteration, or data for keys modified between
    // a key being chosen for iteration and being removed from the map.
    // This is ok.  Any new keys will be picked up on subsequent flushes.
    //TODO: use two maps and swap between, to ensure 'perfect' segmentation?
    List<MetricDatum> metricData = new ArrayList<>();
    for (DatumKey key : statisticsMap.keySet()) {
        StatisticSet value = statisticsMap.remove(key);
        //TODO: better to have no timestamp at all?
        MetricDatum metricDatum = key.getDatum().withTimestamp(Date.from(

    return metricData;
private MetricDatum makeDatum(
        final String id,
        final String name,
        final double sum,
        final double min,
        final double max,
        final int count,
        final StandardUnit unit)
    MetricDatum md = new MetricDatum().withMetricName(name).withUnit(unit);

    final StatisticSet statSet = new StatisticSet()

    List<Dimension> dimensions = new ArrayList<>(1);
    Dimension trace = new Dimension().withName(;


    return md;
 * @param rescale the submitted sum by this multiplier. 1.0 is the identity (no rescale).
void reportSampling(Map.Entry<String, ? extends Sampling> entry, String typeDimValue, double rescale, List<MetricDatum> data) {
    Sampling metric = entry.getValue();
    Snapshot snapshot = metric.getSnapshot();
    double scaledSum = sum(snapshot.getValues()) * rescale;
    final StatisticSet statisticSet = new StatisticSet()
            .withSampleCount((double) snapshot.size())
            .withMinimum((double) snapshot.getMin() * rescale)
            .withMaximum((double) snapshot.getMax() * rescale);

    DemuxedKey key = new DemuxedKey(appendGlobalDimensions(entry.getKey()));
    Iterables.addAll(data, key.newDatums(typeDimName, typeDimValue, new Function<MetricDatum, MetricDatum>() {
        public MetricDatum apply(MetricDatum datum) {
            return datum.withStatisticValues(statisticSet);
private static StatisticSet sum( StatisticSet v1, StatisticSet v2 ) {
    //TODO: reuse one of the passed sets, and pollute a MetricDatum?
    StatisticSet stats = new StatisticSet();

    stats.setMaximum(Math.max(v1.getMaximum(), v2.getMaximum()));
    stats.setMinimum(Math.min(v1.getMinimum(), v2.getMinimum()));
    stats.setSampleCount(v1.getSampleCount() + v2.getSampleCount());
    stats.setSum(v1.getSum() + v2.getSum());

    return stats;
public void single() throws Exception {
    final DimensionMapper mapper = new DimensionMapper.Builder()

    final Metric name = Metric.define("SomeMetric");
    final double value = 3.14;
    final StandardUnit unit = StandardUnit.Terabits;

    final MetricRecorder.RecorderContext context = new MetricRecorder.RecorderContext(ContextData.withId(UUID.randomUUID().toString()).build());

    MetricDataAggregator aggregator = new MetricDataAggregator(mapper);
    aggregator.add(context, name, value, unit);

    List<MetricDatum> ags = aggregator.flush();

    assertEquals("One metric datum should aggregate to one entry", 1, ags.size());
    assertEquals("Metric datum has wrong name", name.toString(), ags.get(0).getMetricName());
    assertEquals("Metric datum has wrong unit", unit.toString(), ags.get(0).getUnit());

    StatisticSet stats = ags.get(0).getStatisticValues();
    assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getSum());
    assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getMinimum());
    assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getMaximum());
    assertEquals("Metric datum has wrong stats count", Double.valueOf(1), stats.getSampleCount());

    assertTrue("Flush with no attributes was non-empty", aggregator.flush().isEmpty());
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
    MetricDatum datum = new MetricDatum();

    try {
        final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue();
        if (valueString != null) {
        } else {
            StatisticSet statisticSet = new StatisticSet();


        final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
        if (timestamp != null) {
            datum.setTimestamp(new Date(Long.parseLong(timestamp)));

        final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue();
        if (unit != null) {

        // add dynamic properties as dimensions
        if (!dynamicPropertyNames.isEmpty()) {
            final List<Dimension> dimensions = new ArrayList<>(dynamicPropertyNames.size());
            for (String propertyName : dynamicPropertyNames) {
                final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
                if (StringUtils.isNotBlank(propertyValue)) {
                    dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue));

        final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest()

        session.transfer(flowFile, REL_SUCCESS);
        getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile});
    } catch (final Exception e) {
        getLogger().error("Failed to publish cloudwatch metric for {} due to {}", new Object[]{flowFile, e});
        flowFile = session.penalize(flowFile);
        session.transfer(flowFile, REL_FAILURE);
