本文介绍了根据异常类型调用ContainerStoppingErrorHandler的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spring kafka 2.2.4 版和 Kafka 2.11 版.我使用 ContainerStoppingErrorHandler 作为我的错误处理程序.每当出现异常时,都会调用此方法并停止容器.现在我需要根据异常类型停止容器,如果发生某些数据库异常,它应该停止其他异常类型的容器,它应该向组发送电子邮件.下面是我的错误处理程序代码

I am using spring kafka version 2.2.4 Release and Kafka version 2.11. I am using ContainerStoppingErrorHandler as my error handler. Whenever there is an exception this method is called and stops the container. Now I need to stop the container based on the exception type if some DB exception occurs it should stop the container for other exception types it should send email to the group. Below is my error handler code

public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> messageKafkaListenerContainerFactory() {
//consumer configs...
factory.setErrorHandler(new ContainerStoppingErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {


            if (thrownException instanceof ConnectionException) {

                LOGGER.error("Database exception occured stopping the container");
                super.handle(thrownException, records, consumer, container);

            } else {
                //send email about error without discarding the records
            }

        }
    }
}

我能够根据数据库异常停止容器,但对于其他异常,包括错误记录在内的轮询中的记录将被丢弃,因此我正在丢失数据.有没有办法根据类型处理异常并调用错误处理程序,如果数据库异常停止,否则继续而不像这样丢弃剩余的记录.

I am able to stop the container based on the DB exception but for other exceptions the records in the poll including the error record is getting discarded so I am losing the data. Is there any way to handle the exceptions based on the type and invoke error handler if DB exception stop else continue without discarding the remaining records like that.

推荐答案

对于其他异常,委托给 SeekToCurrentErrorHandler 这将导致对所有未处理记录(包括失败记录)的主题进行搜索所以他们会在下一次投票时重新发送.

For other exceptions, delegate to a SeekToCurrentErrorHandler which will cause seeks for the topics for all the unprocessed records (including the failed record) so they will be redelivered on the next poll().

默认情况下,STCEH 在 10 次尝试后放弃失败的记录,但您可以通过设置 maxAttempts 构造函数参数来更改它.

The STCEH gives up on the failed record after 10 attempts by default, but you can change that by setting the maxAttempts constructor argument.

编辑

factory.setErrorHandler(new ContainerStoppingErrorHandler() {

        private final SeekToCurrentErrorHandler stceh = new SeekToCurrentErrorHandler(...);

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {


            if (thrownException.getCause() instanceof ConnectionException) {

                LOGGER.error("Database exception occured stopping the container");
                super.handle(thrownException, records, consumer, container);

            } else {
                //send email about error without discarding the records
                this.stceh.handle(thrownException, records, consumer, container);
            }

        }
    }
}

这篇关于根据异常类型调用ContainerStoppingErrorHandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 17:29