我有一个Spring Boot应用程序,尝试从本地Kafka主题读取。

该应用程序开始旋转,暂停2分钟,并引发以下错误:

19-11-10 14:49:24.700  INFO 20476 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1573390164696
2019-11-10 14:51:24.793  WARN 20476 --- [  restartedMain] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [XYZ] is/are not present and missingTopicsFatal is true
2019-11-10 14:51:24.795  INFO 20476 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2019-11-10 14:51:24.796  INFO 20476 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2019-11-10 14:51:24.810  INFO 20476 --- [  restartedMain] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2019-11-10 14:51:24.837  INFO 20476 --- [  restartedMain] ConditionEvaluationReportLoggingListener :

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-11-10 14:51:24.844 ERROR 20476 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [XYZ] is/are not present and missingTopicsFatal is true
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at com.example.main(NyApplication.java:38) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.2.0.RELEASE.jar:2.2.0.RELEASE]
Caused by: java.lang.IllegalStateException: Topic(s) [ZYX] is/are not present and missingTopicsFatal is true
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:366) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:323) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:309) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:256) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    ... 19 common frames omitted    `
kafka在本地计算机上的docker中运行:
 docker run --detach --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.1.89 --env ADVERVTISED_PORT=9092 --env AUTO.CREATE.TOPICS.ENABLE spotify/kafka

bash放入容器时
docker exec -it 1234567896b5 bash

我可以看到主题:
$/opt/kafka_2.11-0.10.1.0/bin# ./kafka-topics.sh --list --zookeeper localhost:2181
$XYZ

该主题是使用以下命令创建的:
./kafka-topics.sh --zookeeper  localhost:2181 --create --replication-factor 1 --partitions 3 --topic XYZ

我在应用程序的application.yml中有此设置(也尝试使用端口9092):
spring:
  kafka:
consumer:
  bootstrap-servers: localhost:2181
  group-id: group_id
  auto-offset-reset: earliest
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
  bootstrap-servers: localhost:2181
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer

最佳答案

您的配置使用的是Zookeeper,而不是Kafka作为引导服务器

使用端口9092,而不是2181

另外,该Spotify容器未维护,因此我建议找到一个更新的容器

10-04 10:06