问题描述
我在更改 DeadLetterPublishingRecoverer destionationResolver 时发现了一个错误.
I identified an error when I changed the DeadLetterPublishingRecoverer destionationResolver.
当我使用时:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());
效果很好.
但是,如果您使用 _ERR 而不是 .ERR,则会发生错误:
However, if you use _ERR instead of .ERR, an error occurs:
2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}
2020-08-05 12:53:10,278 [kafka-producer-network-thread |producer-kafka-tx-group1.ABC_TEST_XPTO.0] 错误 org.apache.kafka.clients.Metadata - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] 元数据响应报告无效主题 [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,309 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 osksLoggingProducerListener - 发送带有 key='null' 和 payload='XPTOEvent(super=Event(id=CAPBA2548, destination=ABC_TEST_XPTO, he...' 到主题 ABC_TEST_XPTO_ERR 和分区 0:org.apache.kafka.common.errors.InvalidTopicException:无效主题:[ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,320 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 osklDeadLetterPublishingRecoverer - 死信发布失败:ProducerRecord(topic=ABC_TEST_XPTO_ERR, partition=0, headers记录头(头= ..org.springframework.kafka.KafkaException:发送失败;嵌套异常是 org.apache.kafka.common.errors.InvalidTopicException:无效主题:[ABC_TEST_XPTO_ERR]在 org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)在 org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226)在 org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)在 org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)强文本
2020-08-05 12:53:10,278 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Metadata response reported invalid topics [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,309 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='XPTOEvent(super=Event(id=CAPBA2548, destination=ABC_TEST_XPTO, he...' to topic ABC_TEST_XPTO_ERR and partition 0:org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR]2020-08-05 12:53:10,320 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication failed for: ProducerRecord(topic=ABC_TEST_XPTO_ERR, partition=0, headers=RecordHeaders(headers = ..org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR]at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226)at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)at org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)strong text
我的主题在名字中间使用_,例如ABC_TEST_XPTO,所以如果可能的话,我想用_ERR设置死信主题
我的环境
Spring Boot 2.3.2.RELEASE
Spring Boot 2.3.2.RELEASE
Spring-Kafka 2.5.3.RELEASE 但同样的问题出现在 2.5.4.RELEASE
Spring-Kafka 2.5.3.RELEASE but the same problem occurs with 2.5.4.RELEASE
Java 11
private static final BiFunctionDESTINATION_RESOLVER = (cr, e) ->new TopicPartition(cr.topic() + "_ERR", cr.partition());
private static final BiFunction<ConsumerRecord, Exception, TopicPartition>DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition());
@组件class ContainerFactoryConfigurer {
@Componentclass ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm,
KafkaTemplate<Object, Object> template) {
factory.getContainerProperties().setTransactionManager(tm);
DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {
}, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true);
factory.setAfterRollbackProcessor(rollbackProcessor);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps)));
errorHandler.setCommitRecovered(true);
errorHandler.setAckAfterHandle(true);
factory.setErrorHandler(errorHandler);
}
}
谢谢DPG
推荐答案
这对我来说很好用...
This works fine for me...
@SpringBootApplication
public class So63270367Application {
public static void main(String[] args) {
SpringApplication.run(So63270367Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
}
@KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO_ERR")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
也许你的经纪人对主题名称有一些规定;也许看看经纪人日志?
Maybe your brokers have some rules about topic names; maybe look at the broker logs?
编辑
正如我在评论中所说;记录从哪里发布应该无关紧要;这对我仍然有效...
As I said in my comment; it shouldn't matter where the record is published from; this still works for me...
@SpringBootApplication
public class So63270367Application {
public static void main(String[] args) {
SpringApplication.run(So63270367Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("ABC_TEST_XPTO").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicErr() {
return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
}
@KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@KafkaListener(id = "so63270367err", topics = "ABC_TEST_XPTO_ERR")
public void listenErr(String in) {
System.out.println("From DLT:" + in);
}
@Bean
public SeekToCurrentErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
template,
(cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition())),
new FixedBackOff(0L, 0L));
}
}
From DLT:foo
这篇关于DeadLetterPublishingRecoverer - TopicPartition 处的名称主题以 _ERR 结尾,死信发布失败并出现 InvalidTopicException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!