我有一个Spring Boot应用程序,该应用程序的使用者从一个群集中的主题进行消费,并在另一个群集中产生另一个主题。

现在,我正在尝试使用Spring Embedded Kafka编写集成测试用例,但是出现问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

消费阶层

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);

   }

}


生产者阶层

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }


在我的测试案例中,我想覆盖KafkaTemplate,以便当我在kafkaConsumerService.professor中调用Test方法时,它应该将数据生成到嵌入式Kafka中,并且我应该对其进行验证。

测试配置

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer,
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

 }


测试班

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
     kafkaConsumerService.professor(Arrays.asList(new Professor()));

     //How to check messages is sent to kafka?
}

 }


错误

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered.
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true




并且有人可以帮助我如何验证发送到嵌入式Kafka服务器的消息吗?

注意我有一些过时的警告


  不推荐使用KafkaEmbedded类型
  
  不推荐使用KafkaEmbedded类型的getPartitionsPerTopic()方法
  
  不推荐使用来自KafkaTestUtils类型的方法producerProps(KafkaEmbedded)

最佳答案

引导2.1 disables bean overriding by default


  默认情况下禁用Bean覆盖,以防止意外覆盖Bean。如果要依赖覆盖,则需要将spring.main.allow-bean-definition-overriding设置为true


关于弃用;请参阅@EmbeddedKafka的javadocs。它由EmbeddedKafkaBroker代替。

关于java - 嵌入式Kafka的 Spring Kafka集成测试,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54754662/

10-09 01:58