提问者:小点点

如何摆脱“用timeoutMillis = ...关闭Kafka制作人”


我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例:

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
                avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));

        String avroSchemaName = null;

        // some of my AVRO schemas are unions, some are simple:
        if (_avroSchema.getTypes().size() == 1) {
            avroSchemaName = _avroSchema.getTypes().get(0).getName();
        } else if (_avroSchema.getTypes().size() == 2) {
            avroSchemaName = _avroSchema.getTypes().get(1).getName();
        }

        // some custom header items...
        producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
                avroSchemaName.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
                avroConverter.getSchemaId().toString().getBytes());
        if (multiline) {
            producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
                    MULTILINE_RECORD_NAME.getBytes());
        }

        try {
            Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
            RecordMetadata sendResult = result.get();
            MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
                    sendResult.offset());
        } catch (Exception e) {
            MessageLogger.logError(e);
            throw e;
        }

代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子):

  • [生产者客户端 Id=生产者-27902] 关闭 Kafka 生产者超时Millis = 10000 毫秒。
  • [创建者客户端 Id=创建者-27902] 使用超时关闭 Kafka 创建者Millis = 9223372036854775807 毫秒。
  • Kafka开始计时: ...
  • Kafka commitId: ...
  • [创建者客户端 Id = 创建者-27902] 群集 ID:

这把我们的Graylog砸了。

我使用类似的代码发送字符串格式的消息。此代码在不产生INFO消息的情况下执行…

ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
    recordToSend.headers().add("messageID", messageID.getBytes());

    Future<RecordMetadata> result = _producerConnection.send(recordToSend);
    

我知道INFO消息是从classorg.apache.kafka.clients.producer.KafkaProducer记录的。我需要删除这些消息,但我无法访问日志记录。mxl定义Graylog的记录器属性。

有没有办法通过POM条目或编程来消除这些消息?


共1个答案

匿名用户

代码行为的原因是一个设计缺陷:上面帖子中的代码被放在一个方法中,该方法被调用来向Kafka发送消息。KafkaProducer类在该方法中以及每次调用该方法时都被实例化。令人惊讶的是,KafkaProducer不仅在调用代码的显式close()处,而且在实例的强引用丢失时(在我的例子中,当代码离开方法时),发出关闭timeoutMillis=Kafka Producer的命令。在后一种情况下,timeoutMillits设置为9223372036854775807(最大的长数字)。

为了消除许多消息,我将KafkaProducer实例化移出了方法,并将实例变量设置为类属性,我不再在send(…)之后调用显式的

此外,我将实例化 KafkaProducer 的类的实例更改为强引用类成员。

通过这样做,我在实例化时收到Kafka生产者的一些消息,然后沉默了。