在Kafka流中处理异常
问题内容:
曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。
我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等,
- 有人可以建议正确的做法吗?我应该使用
setUncaughtExceptionHandler
吗?或者,还有更好的方法? - 如何处理重试?
问题答案:
这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异常(例如,由于网络故障或kafka代理已死),则默认情况下流将死亡。在kafka-streams
1.1.0版中,您可以通过ProductionExceptionHandler
以下方式实现覆盖默认行为:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
从句柄方法中,CONTINUE
如果您不希望流因异常而死,则可以返回FAIL
;如果您希望流停止,则可以返回(FAIL是默认值之一)。并且您需要在流配置中指定此类:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
另外要注意的是ProductionExceptionHandler
手柄只在生产异常,以及与流方法处理邮件时不会处理异常mapValues(..)
,filter(..)
,branch(..)
等你需要用与try
/ catch块,这些方法的逻辑(把所有的方法逻辑到try块来保证您将处理所有例外情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
据我所知,我们不需要在用户端显式处理异常,因为kafka流将在稍后自动重试使用(因为偏移量不会改变,直到消息被使用和处理为止);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获取异常,并且当断开时,kafka流将消耗所有消息。因此,在这种情况下,我们将只是延迟而没有损坏/丢失。
使用,setUncaughtExceptionHandler
您将无法更改默认行为,例如使用ProductionExceptionHandler
,则只能记录错误或将消息发送到失败主题。