Java源码示例:org.apache.kafka.common.metrics.JmxReporter
示例1
public static void initialize() {
MetricConfig metricConfig = new MetricConfig()
.samples(100)
.timeWindow(
1000,
TimeUnit.MILLISECONDS
);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter("io.confluent.ksql.metrics"));
// Replace all static contents other than Time to ensure they are cleaned for tests that are
// not aware of the need to initialize/cleanup this test, in case test processes are reused.
// Tests aware of the class clean everything up properly to get the state into a clean state,
// a full, fresh instantiation here ensures something like KsqlEngineMetricsTest running after
// another test that used MetricsCollector without running cleanUp will behave correctly.
metrics = new Metrics(metricConfig, reporters, new SystemTime());
collectorMap = new ConcurrentHashMap<>();
}
示例2
public ClusterTopicManipulationService(String name, AdminClient adminClient) {
LOGGER.info("ClusterTopicManipulationService constructor initiated {}", this.getClass().getName());
_isOngoingTopicCreationDone = true;
_isOngoingTopicDeletionDone = true;
_adminClient = adminClient;
_executor = Executors.newSingleThreadScheduledExecutor();
_reportIntervalSecond = Duration.ofSeconds(1);
_running = new AtomicBoolean(false);
_configDefinedServiceName = name;
// TODO: instantiate a new instance of ClusterTopicManipulationMetrics(..) here.
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(Service.JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", name);
_clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags);
}
示例3
/**
* Mainly contains services for three metrics:
* 1 - ConsumeAvailability metrics
* 2 - CommitOffsetAvailability metrics
* 2.1 - commitAvailabilityMetrics records offsets committed upon success. that is, no exception upon callback
* 2.2 - commitAvailabilityMetrics records offsets commit fail upon failure. that is, exception upon callback
* 3 - CommitOffsetLatency metrics
* 3.1 - commitLatencyMetrics records the latency between last successful callback and start of last recorded commit.
*
* @param name Name of the Monitor instance
* @param topicPartitionResult The completable future for topic partition
* @param consumerFactory Consumer Factory object.
* @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception
* @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied and the thread is interrupted
*/
public ConsumeService(String name,
CompletableFuture<Void> topicPartitionResult,
ConsumerFactory consumerFactory)
throws ExecutionException, InterruptedException {
_baseConsumer = consumerFactory.baseConsumer();
_latencySlaMs = consumerFactory.latencySlaMs();
_name = name;
_adminClient = consumerFactory.adminClient();
_running = new AtomicBoolean(false);
// Returns a new CompletionStage (topicPartitionFuture) which
// executes the given action - code inside run() - when this stage (topicPartitionResult) completes normally,.
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, new SystemTime());
tags = new HashMap<>();
tags.put(TAGS_NAME, name);
_topic = consumerFactory.topic();
_sensors = new ConsumeMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags);
_consumeThread = new Thread(() -> {
try {
consume();
} catch (Exception e) {
LOG.error(name + "/ConsumeService failed", e);
}
}, name + " consume-service");
_consumeThread.setDaemon(true);
});
// In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result.
topicPartitionFuture.get();
}
示例4
/**
* XinfraMonitor constructor creates apps and services for each of the individual clusters (properties) that's passed in.
* For example, if there are 10 clusters to be monitored, then this Constructor will create 10 * num_apps_per_cluster
* and 10 * num_services_per_cluster.
* @param allClusterProps the properties of ALL kafka clusters for which apps and services need to be appended.
* @throws Exception when exception occurs while assigning Apps and Services
*/
@SuppressWarnings({"rawtypes"})
public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();
for (Map.Entry<String, Map> clusterProperty : allClusterProps.entrySet()) {
String name = clusterProperty.getKey();
Map props = clusterProperty.getValue();
if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG))
throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG);
String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG);
Class<?> aClass = Class.forName(className);
if (App.class.isAssignableFrom(aClass)) {
App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_apps.put(name, clusterApp);
} else if (Service.class.isAssignableFrom(aClass)) {
ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY)
.getConstructor(Map.class, String.class)
.newInstance(props, name);
Service service = serviceFactory.createService();
_services.put(name, service);
} else {
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
}
}
_executor = Executors.newSingleThreadScheduledExecutor();
_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
(config, now) -> _offlineRunnables.size());
}
示例5
/**
* Configure Application MetricReport instances.
*/
private List<MetricsReporter> configureMetricsReporters(T appConfig) {
List<MetricsReporter> reporters =
appConfig.getConfiguredInstances(
RestConfig.METRICS_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter());
reporters.forEach(r -> r.configure(appConfig.originals()));
return reporters;
}
示例6
public KarelDbLeaderElector(KarelDbConfig config, KarelDbEngine engine) throws KarelDbElectionException {
try {
this.engine = engine;
this.clientId = "kdb-" + KDB_CLIENT_ID_SEQUENCE.getAndIncrement();
this.myIdentity = findIdentity(
config.getList(KarelDbConfig.LISTENERS_CONFIG),
config.getBoolean(KarelDbConfig.LEADER_ELIGIBILITY_CONFIG));
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().tags(metricsTags);
List<MetricsReporter> reporters = Collections.singletonList(new JmxReporter(JMX_PREFIX));
Time time = Time.SYSTEM;
ClientConfig clientConfig = new ClientConfig(config.originalsWithPrefix("kafkacache."), false);
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = clientConfig.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
String groupId = config.getString(KarelDbConfig.CLUSTER_GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[KarelDB clientId=" + clientId + ", groupId="
+ groupId + "] ");
this.metadata = new Metadata(
retryBackoffMs,
clientConfig.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext,
new ClusterResourceListeners()
);
List<String> bootstrapServers
= config.getList(KarelDbConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(bootstrapServers,
clientConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "kareldb";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(clientConfig, time);
long maxIdleMs = clientConfig.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG);
NetworkClient netClient = new NetworkClient(
new Selector(maxIdleMs, metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
clientConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
clientConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
clientConfig.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
clientConfig.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
clientConfig.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
ClientDnsLookup.forConfig(clientConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
new ApiVersions(),
logContext);
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
clientConfig.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
Integer.MAX_VALUE
);
this.coordinator = new KarelDbCoordinator(
logContext,
this.client,
groupId,
300000, // Default MAX_POLL_INTERVAL_MS_CONFIG
10000, // Default SESSION_TIMEOUT_MS_CONFIG)
3000, // Default HEARTBEAT_INTERVAL_MS_CONFIG
metrics,
metricGrpPrefix,
time,
retryBackoffMs,
myIdentity,
this
);
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
initTimeout = config.getInt(KarelDbConfig.KAFKACACHE_INIT_TIMEOUT_CONFIG);
LOG.debug("Group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
stop(true);
// now propagate the exception
throw new KarelDbElectionException("Failed to construct kafka consumer", t);
}
}
示例7
public WorkersMetrics(WorkersConfig config) {
List<MetricsReporter> reporters = config.getConfiguredInstances(WorkersConfig.METRIC_REPORTER_CLASSES, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(new MetricConfig(), reporters, Time.SYSTEM);
}
示例8
public WorkerGroupMember(WorkerConfig config,
String restUrl,
TaskConfigManager jobTaskConfigManager,
WorkerRebalanceListener listener,
Time time) {
try {
this.time = time;
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
clientId = clientIdConfig.length() <= 0 ? "datalink-worker-" + DATALINK_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "datalink.worker";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)){
@Override
public boolean awaitMetadataUpdate(long timeout) {
metadata.update(Cluster.bootstrap(addresses),time.milliseconds());
return super.awaitMetadataUpdate(timeout);
}
};
this.coordinator = new WorkerCoordinator(this.client,
config.getString(WorkerConfig.GROUP_ID_CONFIG),
config.getInt(WorkerConfig.REBALANCE_TIMEOUT_MS_CONFIG),
config.getInt(WorkerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(WorkerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
restUrl,
jobTaskConfigManager,
listener);
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("datalink worker group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak.
stop(true);
// now propagate the errors
throw new DatalinkException("Failed to construct datalink worker", t);
}
}
示例9
AbstractMirusJmxReporter(Metrics metrics) {
this.metrics = metrics;
this.metrics.addReporter(new JmxReporter("mirus"));
}
示例10
public ProduceService(Map<String, Object> props, String name) throws Exception {
_name = name;
ProduceServiceConfig config = new ProduceServiceConfig(props);
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);
int latencyPercentileMaxMs = config.getInt(ProduceServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
int latencyPercentileGranularityMs = config.getInt(ProduceServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
_partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class);
_threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG);
_topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG);
_producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG);
_produceDelayMs = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_DELAY_MS_CONFIG);
_recordSize = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_SIZE_BYTE_CONFIG);
_sync = config.getBoolean(ProduceServiceConfig.PRODUCE_SYNC_CONFIG);
boolean treatZeroThroughputAsUnavailable =
config.getBoolean(ProduceServiceConfig.PRODUCER_TREAT_ZERO_THROUGHPUT_AS_UNAVAILABLE_CONFIG);
_partitionNum = new AtomicInteger(0);
_running = new AtomicBoolean(false);
_nextIndexPerPartition = new ConcurrentHashMap<>();
_producerPropsOverride = props.containsKey(ProduceServiceConfig.PRODUCER_PROPS_CONFIG)
? (Map) props.get(ProduceServiceConfig.PRODUCER_PROPS_CONFIG) : new HashMap<>();
for (String property: NON_OVERRIDABLE_PROPERTIES) {
if (_producerPropsOverride.containsKey(property)) {
throw new ConfigException("Override must not contain " + property + " config.");
}
}
_adminClient = AdminClient.create(props);
if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) {
_producerClassName = NewProducer.class.getCanonicalName();
} else {
_producerClassName = producerClass;
}
initializeProducer(props);
_produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory());
_handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory());
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", _name);
_sensors =
new ProduceMetrics(metrics, tags, latencyPercentileGranularityMs, latencyPercentileMaxMs, _partitionNum,
treatZeroThroughputAsUnavailable);
}