本文介绍了如何通过Java在Kafka中创建Topic的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个topic.如果我在命令提示符下创建主题,并且通过 java api 推送消息,则它工作正常.但是我想通过java api创建一个主题.经过长时间的搜索,我找到了下面的代码,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它显示该主题已创建,但我无法在该主题中推送消息.我的代码有什么问题吗?或者任何其他方式来实现上述目标?

解决方案

原答案

我修好了..经过长时间的研究..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

从上面的代码来看,ZkClient会创建一个topic,但是这个topic信息不会被kafka感知.所以我们要做的是,我们需要通过以下方式为ZkClient创建对象,

首先导入下面的语句,

import kafka.utils.ZKStringSerializer$;

并通过以下方式为 ZkClient 创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

编辑 1:(@ajkret 评论)

由于api已更改,因此上述代码不适用于kafka > 0.9,对 kafka > 0.9

使用以下代码
import java.util.Properties;导入 kafka.admin.AdminUtils;导入 kafka.utils.ZKStringSerializer$;导入 kafka.utils.ZkUtils;导入 org.I0Itec.zkclient.ZkClient;导入 org.I0Itec.zkclient.ZkConnection;公共类 KafkaTopicCreationInJava{public static void main(String[] args) 抛出异常 {zkClient zkClient = null;zkUtils zkUtils = null;尝试 {String zookeeperHosts = "192.168.20.1:2181";//如果有多个zookeeper,则->String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";int sessionTimeOutInMs = 15 * 1000;//15 秒int connectionTimeOutInMs = 10 * 1000;//10 秒zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);String topicName = "testTopic";int noOfPartitions = 2;int noOfReplication = 3;Properties topicConfiguration = new Properties();AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);} 捕捉(异常前){ex.printStackTrace();} 最后 {如果(zkClient != null){zkClient.close();}}}}

I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?

解决方案



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$;

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());


Edit 1: (for @ajkret comment)


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

这篇关于如何通过Java在Kafka中创建Topic的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-11 15:41