Java源码示例:org.apache.kafka.streams.processor.ProcessorSupplier
示例1
@Bean
public java.util.function.BiConsumer<KStream<Object, String>, KStream<Object, String>> process() {
return (input0, input1) ->
input0.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
processed1 = true;
}
@Override
public void close() {
}
}, "my-store", "other-store");
}
示例2
private TopologyBuilder processingTopologyBuilder() {
StateStoreSupplier machineToAvgCPUUsageStore
= Stores.create(AVG_STORE_NAME)
.withStringKeys()
.withDoubleValues()
.inMemory()
.build();
StateStoreSupplier machineToNumOfRecordsReadStore
= Stores.create(NUM_RECORDS_STORE_NAME)
.withStringKeys()
.withIntegerValues()
.inMemory()
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource(SOURCE_NAME, TOPIC_NAME)
.addProcessor(PROCESSOR_NAME, new ProcessorSupplier() {
@Override
public Processor get() {
return new CPUCumulativeAverageProcessor();
}
}, SOURCE_NAME)
.addStateStore(machineToAvgCPUUsageStore, PROCESSOR_NAME)
.addStateStore(machineToNumOfRecordsReadStore, PROCESSOR_NAME);
LOGGER.info("Kafka streams processing topology ready");
return builder;
}
示例3
/**
* Constructs an instance of {@link SinkEntry}.
*
* @param formatterSupplier - Formats {@link ProcessingResult}s for output to the sink. (not null)
* @param keySerializer - Serializes keys that are used to write to the sink. (not null)
* @param valueSerializer - Serializes values that are used to write to the sink. (not null)
*/
public SinkEntry(
final ProcessorSupplier<Object, ProcessorResult> formatterSupplier,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this.keySerializer = requireNonNull(keySerializer);
this.valueSerializer = requireNonNull(valueSerializer);
this.formatterSupplier = requireNonNull(formatterSupplier);
}
示例4
TracingProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing,
String spanName,
ProcessorSupplier<K, V> processorSupplier) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.spanName = spanName;
this.delegateProcessorSupplier = processorSupplier;
}
示例5
@Test
public void should_create_spans_from_stream_with_tracing_processor() {
ProcessorSupplier<String, String> processorSupplier =
kafkaStreamsTracing.processor(
"forward-1", () ->
new AbstractProcessor<String, String>() {
@Override
public void process(String key, String value) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
String inputTopic = testName.getMethodName() + "-input";
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.process(processorSupplier);
Topology topology = builder.build();
KafkaStreams streams = buildKafkaStreams(topology);
send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));
waitForStreamToRun(streams);
MutableSpan spanInput = testSpanHandler.takeRemoteSpan(CONSUMER);
assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic);
MutableSpan spanProcessor = testSpanHandler.takeLocalSpan();
assertChildOf(spanProcessor, spanInput);
streams.close();
streams.cleanUp();
}
示例6
@Test
public void should_create_spans_from_stream_without_tracing_and_tracing_processor() {
ProcessorSupplier<String, String> processorSupplier =
kafkaStreamsTracing.processor(
"forward-1", () ->
new AbstractProcessor<String, String>() {
@Override
public void process(String key, String value) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
String inputTopic = testName.getMethodName() + "-input";
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.process(processorSupplier);
Topology topology = builder.build();
KafkaStreams streams = buildKafkaStreamsWithoutTracing(topology);
send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE));
waitForStreamToRun(streams);
assertThat(testSpanHandler.takeLocalSpan().tags())
.containsOnlyKeys("kafka.streams.application.id", "kafka.streams.task.id");
streams.close();
streams.cleanUp();
}
示例7
/**
* @return Formats {@link ProcessingResult}s for output to the sink.
*/
public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() {
return formatterSupplier;
}
示例8
/**
* Creates a new {@link ProcessorEntry}.
*
* @param node - The RDF node to be added as a processor. (not null)
* @param id - The id for the {@link TupleExpr} node. (not null)
* @param downstreamSide - Which side the current node is on from its downstream processor. (not null)
* @param supplier - Supplies the {@link Processor} for this node. (not null)
* @param upstreamNodes - The RDF nodes that will become upstream processing nodes. (not null)
*/
public ProcessorEntry(final TupleExpr node, final String id, final Optional<Side> downstreamSide, final ProcessorSupplier<?, ?> supplier, final List<TupleExpr> upstreamNodes) {
this.node = requireNonNull(node);
this.id = requireNonNull(id);
this.downstreamSide = requireNonNull(downstreamSide);
this.supplier = requireNonNull(supplier);
this.upstreamNodes = requireNonNull(upstreamNodes);
}
示例9
/**
* Create a foreach processor, similar to {@link KStream#foreach(ForeachAction)}, where its action
* will be recorded in a new span with the indicated name.
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
* StreamsBuilder builder = new StreamsBuilder();
* builder.stream(inputTopic)
* .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
* }</pre>
*/
public <K, V> ProcessorSupplier<K, V> foreach(String spanName, ForeachAction<K, V> action) {
return new TracingProcessorSupplier<>(this, spanName, () ->
new AbstractProcessor<K, V>() {
@Override public void process(K key, V value) {
action.apply(key, value);
}
});
}
示例10
/**
* @return - The {@link ProcessorSupplier} used to supply the
* {@link Processor} for this node.
*/
public ProcessorSupplier<?, ?> getSupplier() {
return supplier;
}
示例11
/**
* Create a tracing-decorated {@link ProcessorSupplier}
*
* <p>Simple example using Kafka Streams DSL:
* <pre>{@code
* StreamsBuilder builder = new StreamsBuilder();
* builder.stream(inputTopic)
* .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
* }</pre>
*
* @see TracingKafkaClientSupplier
*/
public <K, V> ProcessorSupplier<K, V> processor(String spanName,
ProcessorSupplier<K, V> processorSupplier) {
return new TracingProcessorSupplier<>(this, spanName, processorSupplier);
}