Java源码示例:org.apache.kafka.streams.processor.ProcessorSupplier

示例1
@Bean
public java.util.function.BiConsumer<KStream<Object, String>, KStream<Object, String>> process() {
	return (input0, input1) ->
			input0.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
				@Override
				@SuppressWarnings("unchecked")
				public void init(ProcessorContext context) {
					state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
					state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
				}

				@Override
				public void process(Object key, String value) {
					processed1 = true;
				}

				@Override
				public void close() {

				}
			}, "my-store", "other-store");
}
 
示例2
private TopologyBuilder processingTopologyBuilder() {

        StateStoreSupplier machineToAvgCPUUsageStore
                = Stores.create(AVG_STORE_NAME)
                        .withStringKeys()
                        .withDoubleValues()
                        .inMemory()
                        .build();

        StateStoreSupplier machineToNumOfRecordsReadStore
                = Stores.create(NUM_RECORDS_STORE_NAME)
                        .withStringKeys()
                        .withIntegerValues()
                        .inMemory()
                        .build();

        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource(SOURCE_NAME, TOPIC_NAME)
                .addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
                    @Override
                    public Processor get() {
                        return new CPUCumulativeAverageProcessor();
                    }
                }, SOURCE_NAME)
                .addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
                .addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);

        LOGGER.info("Kafka streams processing topology ready");

        return builder;
    }
 
示例3
/**
 * Constructs an instance of {@link SinkEntry}.
 *
 * @param formatterSupplier - Formats {@link ProcessingResult}s for output to the sink. (not null)
 * @param keySerializer - Serializes keys that are used to write to the sink. (not null)
 * @param valueSerializer - Serializes values that are used to write to the sink. (not null)
 */
public SinkEntry(
        final ProcessorSupplier<Object, ProcessorResult> formatterSupplier,
        final Serializer<K> keySerializer,
        final Serializer<V> valueSerializer) {
    this.keySerializer = requireNonNull(keySerializer);
    this.valueSerializer = requireNonNull(valueSerializer);
    this.formatterSupplier = requireNonNull(formatterSupplier);
}
 
示例4
TracingProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
  String spanName,
  ProcessorSupplier<K, V> processorSupplier) {
  this.kafkaStreamsTracing = kafkaStreamsTracing;
  this.spanName = spanName;
  this.delegateProcessorSupplier = processorSupplier;
}
 
示例5
@Test
public void should_create_spans_from_stream_with_tracing_processor() {
  ProcessorSupplier<String, String> processorSupplier =
    kafkaStreamsTracing.processor(
      "forward-1", () ->
        new AbstractProcessor<String, String>() {
          @Override
          public void process(String key, String value) {
            try {
              Thread.sleep(100L);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });

  String inputTopic = testName.getMethodName() + "-input";

  StreamsBuilder builder = new StreamsBuilder();
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
    .process(processorSupplier);
  Topology topology = builder.build();

  KafkaStreams streams = buildKafkaStreams(topology);

  send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));

  waitForStreamToRun(streams);

  MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
  assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);

  MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
  assertChildOf(spanProcessor, spanInput);

  streams.close();
  streams.cleanUp();
}
 
示例6
@Test
public void should_create_spans_from_stream_without_tracing_and_tracing_processor() {
  ProcessorSupplier<String, String> processorSupplier =
    kafkaStreamsTracing.processor(
      "forward-1", () ->
        new AbstractProcessor<String, String>() {
          @Override
          public void process(String key, String value) {
            try {
              Thread.sleep(100L);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });

  String inputTopic = testName.getMethodName() + "-input";

  StreamsBuilder builder = new StreamsBuilder();
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
    .process(processorSupplier);
  Topology topology = builder.build();

  KafkaStreams streams = buildKafkaStreamsWithoutTracing(topology);

  send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));

  waitForStreamToRun(streams);

  assertThat(testSpanHandler.takeLocalSpan().tags())
    .containsOnlyKeys("kafka.streams.application.id", "kafka.streams.task.id");

  streams.close();
  streams.cleanUp();
}
 
示例7
/**
 * @return Formats {@link ProcessingResult}s for output to the sink.
 */
public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() {
    return formatterSupplier;
}
 
示例8
/**
 * Creates a new {@link ProcessorEntry}.
 *
 * @param node - The RDF node to be added as a processor. (not null)
 * @param id - The id for the {@link TupleExpr} node. (not null)
 * @param downstreamSide - Which side the current node is on from its downstream processor. (not null)
 * @param supplier - Supplies the {@link Processor} for this node. (not null)
 * @param upstreamNodes - The RDF nodes that will become upstream processing nodes. (not null)
 */
public ProcessorEntry(final TupleExpr node, final String id, final Optional<Side> downstreamSide, final ProcessorSupplier<?, ?> supplier, final List<TupleExpr> upstreamNodes) {
    this.node = requireNonNull(node);
    this.id = requireNonNull(id);
    this.downstreamSide = requireNonNull(downstreamSide);
    this.supplier = requireNonNull(supplier);
    this.upstreamNodes = requireNonNull(upstreamNodes);
}
 
示例9
/**
 * Create a foreach processor, similar to {@link KStream#foreach(ForeachAction)}, where its action
 * will be recorded in a new span with the indicated name.
 *
 * <p>Simple example using Kafka Streams DSL:
 * <pre>{@code
 * StreamsBuilder builder = new StreamsBuilder();
 * builder.stream(inputTopic)
 *        .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
 * }</pre>
 */
public <K, V> ProcessorSupplier<K, V> foreach(String spanName, ForeachAction<K, V> action) {
  return new TracingProcessorSupplier<>(this, spanName, () ->
    new AbstractProcessor<K, V>() {
      @Override public void process(K key, V value) {
        action.apply(key, value);
      }
    });
}
 
示例10
/**
 * @return - The {@link ProcessorSupplier} used to supply the
 *         {@link Processor} for this node.
 */
public ProcessorSupplier<?, ?> getSupplier() {
    return supplier;
}
 
示例11
/**
 * Create a tracing-decorated {@link ProcessorSupplier}
 *
 * <p>Simple example using Kafka Streams DSL:
 * <pre>{@code
 * StreamsBuilder builder = new StreamsBuilder();
 * builder.stream(inputTopic)
 *        .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
 * }</pre>
 *
 * @see TracingKafkaClientSupplier
 */
public <K, V> ProcessorSupplier<K, V> processor(String spanName,
  ProcessorSupplier<K, V> processorSupplier) {
  return new TracingProcessorSupplier<>(this, spanName, processorSupplier);
}