简介

要配置kafka,首先要配置zookeeper保证集群的高可用。因此本教程包含两者的配置。

1、下载kafka:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz

2、下载zookeoper:http://mirror.bit.edu.cn/apache/zookeeper/

0、准备工作

1、目前配置三台机器的集群,配置好hosts文件:

192.168.102.131 h131
192.168.102.132 h132
192.168.102.133 h133

2、配置好三台机器间的SSH无秘钥登录(虽然不是必须,但是作为集群间更好的管理,这是必要的步骤)

3、关闭防火墙。当然,为了安全起见,最好还是不要偷懒,应该针对性的开启需要开启的端口。

4、配置java jdk环境。

1、安装zookeeper

1、解压安装包并复制zoo.cfg模板文件为zoo.cfg

# cd /opt/module/zookeeper-3.4.10/conf
# cp zoo_sample.cfg zoo.cfg

2、配置zoo.cfg

# vi zoo.cfg

zoo.cfg配置如下:

#心跳间隔
tickTime=2000
#其他服务器连接到Leader时,最长能忍受的心跳间隔数:10*2000 = 20秒
initLimit=10
#发送消息时,多长忍受心跳间隔数:5*2000 = 10秒
syncLimit=5
#快照日志
dataDir=/opt/module/zookeeper-3.4.10/zkdata
#事务日志
dataLogDir=/opt/module/zookeeper-3.4.10/zkdatalog
#客户端连接zookeeper服务器的端口
clientPort=2181
#可以待后续克隆完剩下两台后,再写上其他两台的ip
server.1=192.168.102.131:2888:3888
server.2=192.168.102.132:2888:3888
server.3=192.168.102.133:2888:3888

3、分发到zookeeper到其他两台机器

# scp -r zookeeper-3.4.10/ root@h132:/opt/module/
# scp -r zookeeper-3.4.10/ root@h133:/opt/module/

4、在三台机器上的dataDir目录/data/zookeeper/下写一个myid文件,对应zoo.cfg里的server.1、server.2、server.3

*** server1、2和3
# cd /opt/module/zookeeper-3.4.10/
# mkdir zkdata
# mkdir zkdatalog

*** server1
# echo "1" > /opt/module/zookeeper-3.4.10/zkdata/myid

*** server2
# echo "2" > /opt/module/zookeeper-3.4.10/zkdata/myid

*** server3
# echo "3" > /opt/module/zookeeper-3.4.10/zkdata/myid

5、启动各台服务

# ./bin/zkServer.sh start

6、查看状态

# /bin/zkServer.sh status
2 follower, 1 leader.

X、其他命令

*** 停止服务
# bin/zkServer.sh stop

2、配置kafka

1、解压kafka。

2、进入安装目录修改配置文件

# cd /opt/module/kafka_2.12-2.1.0/config
# vi server.properties

server.properties的配置信息如下,注意每台机器略微不同,详情见注释

# 三台机器分别是1,2以及3
broker.id=1
# 对应自身ip
listeners=PLAINTEXT://192.168.102.131:9092
# 对应自身ip
advertised.listeners=PLAINTEXT://192.168.102.131:9092
# zookeeper集群地址
zookeeper.connect=192.168.102.131:2181,192.168.102.132:2181,192.168.102.133:2181

3、将安装包分发到其他两台机器

# scp -r kafka_2.12-2.1.0/ root@h132:/opt/module/
# scp -r kafka_2.12-2.1.0/ root@h133:/opt/module/

4、修改各自的IP配置:broker.id、listeners以及advertised。

# vi server.properties

具体见第2步中的注释,例如,第二台机器的配置应该是这样

# 三台机器分别是1,2以及3
broker.id=2
# 对应自身ip
listeners=PLAINTEXT://192.168.102.132:9092
# 对应自身ip
advertised.listeners=PLAINTEXT://192.168.102.132:9092
# zookeeper集群地址
zookeeper.connect=192.168.102.131:2181,192.168.102.132:2181,192.168.102.133:2181

5、启动集群

bin/kafka-server-start.sh -daemon ./config/server.properties

6、测试集群

(1)任意一台机器上创建话题,例如创建一个名为zhaoyi的话题(指定分区为2):

# bin/kafka-topics.sh --create --zookeeper 192.168.102.131:2181,192.168.102.132:2181,192.168.102.133:2181 --replication-factor 2 --partitions 2 --topic zhaoyi

得到如下的输出

Created topic "zhaoyi".

(2)查看所有分区

# bin/kafka-topics.sh --zookeeper 192.168.102.131:2181 --list

得到如下的输出

__consumer_offsets
zhaoyi

(3)启动一个生产者(通过控制台),往topic队列(zhaoyi)中写入数据,例如,我们在192.168.102.133上执行:

# bin/kafka-console-producer.sh --broker-list 192.168.102.131:9092,192.168.102.132:9092,192.168.102.132:9092 --topic zhaoyi
>

(4) (3)执行之后进入一个等待输入命令行,即需要我们输入消息。这时候,我们再从另一台机器,例如192.168.102.132上启动一个消费者(通过控制台)。)

bin/kafka-console-consumer.sh --bootstrap-server 192.168.102.131:9092,192.168.102.132:9092,192.168.102.133:9092 --topic zhaoyi

执行该命令之后,控制台处于阻塞状态,因为他已经进入了监听模式,监听zhaoyi话题的任何产出信息。

(5)回到创建生产者的机器上,往等待命令行输入任何消息,敲击回车

>I love you, deer!
>

(6)这时候我们观察进入阻塞模式的消费者控制台,即192.168.102.132服务器,可以看到接收到了信息:

I love you, deer!

至此,配置结束。

X、其他命令

*** 停止服务
# bin/kafka-server-stop.sh

*** 查看某个topic的消息
# bin/kafka-topics.sh --zookeeper 192.168.102.131:2181 --describe --topic zhaoyi
02-19 12:11