我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例:
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));
String avroSchemaName = null;
// some of my AVRO schemas are unions, some are simple:
if (_avroSchema.getTypes().size() == 1) {
avroSchemaName = _avroSchema.getTypes().get(0).getName();
} else if (_avroSchema.getTypes().size() == 2) {
avroSchemaName = _avroSchema.getTypes().get(1).getName();
}
// some custom header items...
producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
avroSchemaName.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
avroConverter.getSchemaId().toString().getBytes());
if (multiline) {
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
MULTILINE_RECORD_NAME.getBytes());
}
try {
Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
RecordMetadata sendResult = result.get();
MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
sendResult.offset());
} catch (Exception e) {
MessageLogger.logError(e);
throw e;
}
代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子):
这把我们的Graylog砸了。
我使用类似的代码发送字符串格式的消息。此代码在不产生INFO消息的情况下执行…
ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
recordToSend.headers().add("messageID", messageID.getBytes());
Future<RecordMetadata> result = _producerConnection.send(recordToSend);
我知道INFO消息是从classorg.apache.kafka.clients.producer.KafkaProducer
记录的。我需要删除这些消息,但我无法访问日志记录。mxl定义Graylog的记录器属性。
有没有办法通过POM条目或编程来消除这些消息?
代码行为的原因是一个设计缺陷:上面帖子中的代码被放在一个方法中,该方法被调用来向Kafka发送消息。KafkaProducer
类在该方法中以及每次调用该方法时都被实例化。令人惊讶的是,KafkaProducer
不仅在调用代码的显式close()处,而且在实例的强引用丢失时(在我的例子中,当代码离开方法时),发出关闭timeoutMillis=
Kafka Producer的命令。在后一种情况下,timeoutMillits设置为9223372036854775807(最大的长数字)。
为了消除许多消息,我将KafkaProducer
实例化移出了方法,并将实例变量设置为类属性,我不再在send(…)
之后调用显式的。
此外,我将实例化 KafkaProducer
的类的实例更改为强引用类成员。
通过这样做,我在实例化时收到Kafka生产者
的一些消息,然后沉默了。