本文介绍了DeadLetterPublishingRecoverer - TopicPartition 处的名称主题以 _ERR 结尾,死信发布失败并出现 InvalidTopicException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在更改 DeadLetterPublishingRecoverer destionationResolver 时发现了一个错误.

I identified an error when I changed the DeadLetterPublishingRecoverer destionationResolver.

当我使用时:

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
        DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());

效果很好.

但是,如果您使用 _ERR 而不是 .ERR,则会发生错误:

However, if you use _ERR instead of .ERR, an error occurs:

2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}

2020-08-05 12:53:10,278 [kafka-producer-network-thread |producer-kafka-tx-group1.ABC_TEST_XPTO.0] 错误 org.apache.kafka.clients.Metadata - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] 元数据响应报告无效主题 [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,309 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 osksLoggingProducerListener - 发送带有 key='null' 和 payload='XPTOEvent(super=Event(id=CAPBA2548, destination=ABC_TEST_XPTO, he...' 到主题 ABC_TEST_XPTO_ERR 和分区 0:org.apache.kafka.common.errors.InvalidTopicException:无效主题:[ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,320 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 osklDeadLetterPublishingRecoverer - 死信发布失败:ProducerRecord(topic=ABC_TEST_XPTO_ERR, partition=0, headers记录头(头= ..org.springframework.kafka.KafkaException:发送失败;嵌套异常是 org.apache.kafka.common.errors.InvalidTopicException:无效主题:[ABC_TEST_XPTO_ERR]在 org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)在 org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)在 org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)强文本

2020-08-05 12:53:10,278 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Metadata response reported invalid topics [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,309 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='XPTOEvent(super=Event(id=CAPBA2548, destination=ABC_TEST_XPTO, he...' to topic ABC_TEST_XPTO_ERR and partition 0:org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,320 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication failed for: ProducerRecord(topic=ABC_TEST_XPTO_ERR, partition=0, headers=RecordHeaders(headers = ..org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR]at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)at org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)strong text

我的主题在名字中间使用_,例如ABC_TEST_XPTO,所以如果可能的话,我想用_ERR设置死信主题

我的环境

  1. Spring Boot 2.3.2.RELEASE

  1. Spring Boot 2.3.2.RELEASE

Spring-Kafka 2.5.3.RELEASE 但同样的问题出现在 2.5.4.RELEASE

Spring-Kafka 2.5.3.RELEASE but the same problem occurs with 2.5.4.RELEASE

Java 11

private static final BiFunctionDESTINATION_RESOLVER = (cr, e) ->new TopicPartition(cr.topic() + "_ERR", cr.partition());

private static final BiFunction<ConsumerRecord, Exception, TopicPartition>DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition());

@组件class ContainerFactoryConfigurer {

@Componentclass ContainerFactoryConfigurer {

ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                           ChainedKafkaTransactionManager<?, ?> tm,
                           KafkaTemplate<Object, Object> template) {

    factory.getContainerProperties().setTransactionManager(tm);
    DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {
    }, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true);
    factory.setAfterRollbackProcessor(rollbackProcessor);
    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
            new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps)));
    errorHandler.setCommitRecovered(true);
    errorHandler.setAckAfterHandle(true);
    factory.setErrorHandler(errorHandler);
}

}

谢谢DPG

推荐答案

这对我来说很好用...

This works fine for me...

@SpringBootApplication
public class So63270367Application {

    public static void main(String[] args) {
        SpringApplication.run(So63270367Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
    }

    @KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO_ERR")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

也许你的经纪人对主题名称有一些规定;也许看看经纪人日志?

Maybe your brokers have some rules about topic names; maybe look at the broker logs?

编辑

正如我在评论中所说;记录从哪里发布应该无关紧要;这对我仍然有效...

As I said in my comment; it shouldn't matter where the record is published from; this still works for me...

@SpringBootApplication
public class So63270367Application {

    public static void main(String[] args) {
        SpringApplication.run(So63270367Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("ABC_TEST_XPTO").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicErr() {
        return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
    }

    @KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @KafkaListener(id = "so63270367err", topics = "ABC_TEST_XPTO_ERR")
    public void listenErr(String in) {
        System.out.println("From DLT:" + in);
    }

    @Bean
    public SeekToCurrentErrorHandler eh(KafkaOperations<String, String> template) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
                template,
                (cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition())),
                new FixedBackOff(0L, 0L));
    }

}
From DLT:foo

这篇关于DeadLetterPublishingRecoverer - TopicPartition 处的名称主题以 _ERR 结尾,死信发布失败并出现 InvalidTopicException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 17:28