我对某些情况下spring-kafka的行为有几个疑问。任何答案或指针都很好。

背景:我正在建立一个kafka用户,该用户与外部api对话并发送回确认。我的配置如下所示:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getString("kafka-generic.consumer.group.id"));
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "6000000");

    return props;
}


@Bean
public RetryTemplate retryTemplate() {
    final ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();
    backOffPolicy.setInitialInterval(this.configuration.getLong("retry-exp-backoff-init-interval"));
    final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(this.configuration.getInt("retry-max-attempts"));
    final RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> retryKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(this.configuration.getInt("kafka-concurrency"));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setIdleEventInterval(this.configuration.getLong("kafka-rtm-idle-time"));
    //factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(kafkaConsumerErrorHandler);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}


可以说我的分区数是4。我的KafkaListener分区分配是:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"0", "1"}),
            containerFactory = "retryKafkaListenerContainerFactory")
public void receive(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"2", "3"}),
        containerFactory = "retryKafkaListenerContainerFactory")
public void receive1(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}


现在我的问题是:


假设我有2台机器在其中部署了此代码(具有相同的使用者组ID)。如果我理解正确,如果我得到一个分区事件,则其中一台计算机的相应分区的kafkalistener会监听,而另一台计算机的kafkalistener不会监听此事件。是吗?
我的错误处理程序是:



  @Named
        public class KafkaConsumerErrorHandler implements ErrorHandler {
          @Inject
          private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

          @Override
          public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
              System.out.println("Shutting down all the containers");
              kafkaListenerEndpointRegistry.stop();
          }
        }



让我们说一个场景,在该场景中,调用消费者的kafkalistener调用serviceInvoker.callService(event);,但服务关闭,然后根据retryKafkaListenerContainerFactory重试3次,然后失败,然后调用错误处理程序,从而停止kafkaListenerEndpointRegistry。这是否将关闭具有相同使用者组的所有其他使用者或计算机,或者仅关闭该使用者或计算机?


让我们谈谈2中的场景。是否有任何需要更改的配置,以使kafka知道在这么长的时间内推迟确认?
我的kafka生产者每10分钟产生一条消息。我是否需要在“使用者”代码中的任何地方配置10分钟,还是与之无关?
在我的KafkaListener批注中,我对主题名称和分区进行了硬编码。我可以在运行时更改它吗?


任何帮助都非常感谢。提前致谢。 :)

最佳答案

正确;只有1个会得到它。
它只会停止本地容器-Spring对您的其他实例一无所知。
由于您具有ackOnError=false,因此不会提交偏移量。
消费者不需要知道消息发布的频率。
您不能在运行时更改它们,但是可以在应用程序初始化期间使用属性占位符${...}或Spel表达式#{...}进行设置。

10-08 01:36