我从最新的Kafka文档http://kafka.apache.org/documentation.html开始。但是,当我尝试使用新的Consumer API时遇到了一些问题。我已经完成了以下步骤:

1.添加一个新的依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>


2.添加配置

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");


3.使用KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");


但是,当我尝试轮询来自代理的消息时,除了null外,我什么也没有:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");


然后,在检查了源代码之后,我知道使用者有什么问题:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}


更糟糕的是,我找不到关于0.8.2 API的任何其他有用信息,因为有关Kafka的所有用法均与最新版本不兼容。有人可以帮我吗?非常感谢。

最佳答案

我还试图在Kafka 0.8.2.1之上编写一个Consumer,以读取新Producer产生的消息。

到目前为止,我已经知道Producer API已经准备就绪并且可以使用,而在使用者方面,我们必须等待0.8.3(如@habsq所指出的那样),您已经发现为Consumer提供了一些代码,但是仍然无法运行。

因此,要使用的客户端(当前的客户端API)是在您当前的Kafka版本(即0.8.2.1)的“核心”项目中找到的客户端(最好不要将客户端降级到任何其他版本)。

因此,现在我们需要导入两个jar:一个用于“新” java客户端,一个用于核心项目,具体取决于您使用的scala版本(我使用2.11)。

就我而言,我使用graddle来管理依赖项,所以我只需要导入

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}


当您更新依赖项时,它将获取所有需要的库。

如果您使用的是其他Scala版本,则只需更改版本即可。无论如何,您可以在Maven Central上找到所有不同的版本或完整的pom:
http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

如果使用这些Consumer实现,那么所有当前示例都应照常运行。

PS参考:Kafka-用户ml线程http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

10-08 01:11