目录

1.介绍

1.1概念

zookeeper作用:用于维护配置信息、命名、提供分布式同步和提供组服务
zookeeper主要是文件系统和通知机制

文件系统主要是用来存储数据
通知机制主要是服务器或者客户端进行通知,并且监督
基于观察者模式设计的分布式服务管理框架,开源的分布式框架

1.2特点

1):一个leader,多个follower的集群
2):集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器
3):全局数据一致,每个服务器都保存同样的数据,实时更新
4):更新的请求顺序保持顺序(来自同一个服务器)
5):数据更新的原子性,数据要么成功要么失败(大事务)
6):数据实时更新性很快(因为数据量很小)

1.3主要的集群步骤

1):服务端启动时去注册信息(创建都是临时节点)
2):获取到当前在线服务器列表,并且注册监听
3):服务器节点下线
4):服务器节点上下线事件通知
5):process(){重新再去获取服务器列表,并注册监听}

1.4数据结构

与 Unix 文件系统很类似,可看成树形结构,每个节点称做一个ZNode。每一个ZNode默认能够存储 1MB 的数据。也就是只能存储小数据(一般配置信息,注册信息等)

1.5应用场景

1):统一命名服务(域名服务)
    在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,eg:ip不容易记住,而域名容易记住

2):统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)

3):将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器

4):统一集群管理(掌握实时状态)

5):将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化

6):服务器节点动态上下线

7):软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)

zookeeper从小白到精通-LMLPHP

2.本地安装

2.1安装jdk

1.查看是否已安装JDK
yum list installed |grep java

2.卸载CentOS系统Java环境
# yum -y remove java-1.8.0-openjdk*   *表示卸载所有openjdk相关文件输入
# yum -y remove tzdata-java.noarch       卸载tzdata-java

3.查看JDK软件包版本
# yum -y list java*      或者使用# yum searchjava | grep -i --color JDK

4.安装
yum install java-1.8.0-openjdk*  //安装java1.8.0所有程序
java -version //查看java版本信息

注意:使用yum安装环境变量自动就配好了

2.2下载安装

官网:https://zookeeper.apache.org/
下载3.5.7稳定版本
下载:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg
bin目录 框架启动停止,客户端和服务端的
conf 配置文件信息
docs文档
lib 配置文档的依赖

2.3配置文件修改

配置文件:/opt/module/zookeeper-3.5.7/conf/zoo.cfg

修改:
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径

2.4启动服务端

cd /opt/module/zookeeper-3.5.7/bin
[root@sg-15 bin]# ./zkServer.sh start  // 启动服务端

//查看进程
[root@sg-15 bin]# jps -l
21172 sun.tools.jps.Jps
21110 org.apache.zookeeper.server.quorum.QuorumPeerMain

2.5启动客户端

[root@sg-15 bin]# ./zkCli.sh  // 启动客户端
[zk: localhost:2181(CONNECTED) 4] ls /
[zookeeper]

[zk: localhost:2181(CONNECTED) 5] quit //退出客户端

[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

2.6zookeeper常用命令

[root@sg-15 bin]# ./zkServer.sh start  // 启动服务端
[root@sg-15 bin]# ./zkServer.sh stop   // 停止服务端
[root@sg-15 bin]# jps -l               // 查看进程
[root@sg-15 bin]# ./zkCli.sh           // 启动客户端
[zk: localhost:2181(CONNECTED) 5] quit // 退出客户端
[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态

2.7配置文件解读

配置文件的5大参数:

tickTime = 2000  //通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit = 10   //Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
syncLimit = 5    //Leader和Follower之间通信时间如果超过5个心跳时间,Leader认为Follwer死掉,从服务器列表中删除Follwer。
dataDir =/tmp/zookeeper //保存zookeeper的数据,这是默认值,会定时被系统清除
dataDir保存zookeeper的数据,默认是temp会被系统定期清除,通常改为自己的路径

dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径

clientPort = 2181  //客户端的连接端口,一般不需要修改

3.集群安装

3.1集群规划

sg15,sg16,sg17三台机器部署Zookeeper

3.2安装

解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg

3.3配置

[root@sg-15 zookeeper-3.5.7]# mkdir zkData  // 创建目录zkData
[root@sg-15 zkData]# vi myid                // 创建一个 myid 的文件
在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)
5    // 192.168.0.215,这里设置服务器尾号,其他的不重复就行

[root@sg-15 conf]# mv zoo_sample.cfg zoo.cfg  // 配置文件改名
[root@sg-15 conf]# vi zoo.cfg                //修改配置文件
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径
// 添加配置
#######################cluster##########################
server.5=192.168.0.215:2888:3888
server.6=192.168.0.216:2888:3888
server.7=192.168.0.217:2888:3888

##配置参数解读
server.A=B:C:D
A 是一个数字,表示这个是第几号服务器,就是myid中的值
B 是这个服务器的地址
C 是这个服务器Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的
Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

注意:一台机器弄好之后,打包发送到其他机器。其他机器修改myid值
tar -cf module.tar ./module  // 打包
scp -r module.tar root@192.168.0.216:/opt/  //发送
tar -xf module.tar          // 解包

3.4启动zookeeper集群

注意:集群只要有半数以上包括半数就可正常服务。

三台机器启动:
[root@sg-15 bin]# ./zkServer.sh start    //启动
[root@sg-15 bin]# ./zkServer.sh restart  //重启
[root@sg-17 bin]# ./zkServer.sh status   //检查状态
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader                  //本机器为leader

[root@sg-15 bin]# ./zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower           //本机器为follower

4.选举机制

选举谁当leader
介绍:
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一样
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是

4.1触发选举时机

1.服务器刚启动
2.服务器运行期间无法和leader保持连接

当一台机器进入leader选举流程时,当前集群出现两种状态:
 1.集群中已经存在一个leader(此机器和leader建立连接,并状态同步)
 2.集群中不存在leader(触发选举),looking状态

4.2zookeeper选举机制---第一次启动

服务器1:myid=1
服务器2:myid=2
服务器3:myid=3
(1)	服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)	服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:服务器1和服务器2比较谁的myid大,更改选票为推举服务器myid大的。此时服务器1票数0票,服务器2票数2票,大于半数以上结果,选举完成。服务器1为follower,2状态为leader
(3)	服务器3启动,发起一次选举。此时服务器1,2已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为1票,此时服务器3服从多数,更改选票信息为服务器2,并更改状态为FOLLOWING;

4.2zookeeper选举机制---非第一次启动

服务器1:myid=1
服务器2:myid=2
服务器3:myid=3
服务器运行期间无法和leader保持连接触发重新选举:
假设zookeeper由5台服务器组成,SID分别为1,2,3,4,5,ZXID分别为8,8,8,7,7,并且此时SID为3的服务器此时时leader。某时刻3和5出现故障,因此开始进行leader选举:
 SID为1的机器:(1,8,1)
 SID为2的机器:(1,8,2)
 SID为4的机器:(1,7,4)

选举规则:
优先比较SID,再比较ZXID,其次比较Epoch。大的直接胜出当选leader

4.3选举机制总结

半数机制,超过半数的投票通过,即通过。
第一次启动选举规则:投票过半数时,服务器 myid 大的胜出当leader
第二次启动选举规则:①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,任期代号id大的胜出

5.客户端命令行操作

5.1常用命令整合

登陆客户端操作:很多命令和linux命令相似,比如ls,history等
jps : 查看zookeeper运行的进程
help :显示所有操作命令
ls / :查看当前znode中包含的内容
ls -s / :查看当前节点详细数据
create : 创建普通节点
create -s :创建带序号节点
create -e :创建临时普通节点
create -e -s :创建临时有序节点
delete:删除节点

create /wangzhe "this is wangzhe"  //新建节点(永久节点,不带序号)
create /wangzhe/fashi "this is fashi" // //新建节点(永久节点,不带序号)
get /wangzhe  // this is wangzhe  取值
get -s /wangzhe  //节点详情
set set /wangzhe "this is wangzherongyao" //修改值

get -w :监听值
ls -w /wangzhe :监听数量
监听注册一个生效一次

5.2启动客户端

[root@sg-15 bin]# ./zkCli.sh -server 192.168.0.215:2181
quit //退出

5.3znode节点数据信息

命令:ls -s /
[zk: 192.168.0.215:2181(CONNECTED) 3] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

##################################
1):czxid:创建节点的事务id zxid
  每次修改ZooKeeper 状态都会产生一个ZooKeeper 事务 ID。事务 ID 是ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么zxid1 在 zxid2 之前发生。

2):ctime:znode 被创建的毫秒数(从 1970 年开始)
3):mzxid:znode 最后更新的事务zxid
4):mtime:znode 最后更新的毫秒数(从 1970 年开始)
5):pZxid:znode 最后更新的子节点zxid
6):cversion:znode 子节点变化号,znode 子节点修改次数
7):dataversion:znode 数据变化号
8):aclVersion:znode 访问控制列表的变化号
9):ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
10):dataLength:znode 的数据长度
11):numChildren:znode 子节点数量

5.4节点类型(持久/短暂/有序号/无序号)

持久:客户端和服务端端开连接后,创建的节点不删除
序号:在分布式系统中,顺序号可以被用于所有事件排序,这样客户端可以通过顺序号推断事件的顺序
{
  持久有序号:客户端和服务端端开连接后,创建的节点不删除。且节点名称顺序编号
  持久无序号:客户端和服务端端开连接后,创建的节点不删除。无序号
}

短暂:客户端和服务端端开连接后,创建的节点自己删除
{
  短暂有序号:客户端和服务端端开连接后,创建的节点自己删除。且节点名称顺序编号
  短暂无序号:客户端和服务端端开连接后,创建的节点自己删除。无序号
}

5.5节点操作

5.5.1创建/删除节点

create : 创建普通节点
create -s :创建带序号节点
create -e :创建临时普通节点
create -e -s :创建临时有序节点
delete:删除一个节点(如果这个节点下有子节点,删除失败)
deleteall:递归删除所有节点(如果这个节点下有子节点,全部删除)
eg:delete /wangzhe/fashi //只删除fashi节点
deleteall /wangzhe  //递归删除wangzhe所有节点

//1.创建节点:普通永久节点
[zk: 192.168.0.215:2181(CONNECTED) 27] create /wangzhe "this is wangzhe"
Created /wangzhe

[zk: 192.168.0.215:2181(CONNECTED) 40] create /wangzhe/fashi "this is fashi"
Created /wangzhe/fashi

//2.创建节点:有序永久节点
[zk: 192.168.0.215:2181(CONNECTED) 66] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000000

[zk: 192.168.0.215:2181(CONNECTED) 70] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000001 //再次创建daji,序号+1

//3.创建节点:普通临时节点
[zk: 192.168.0.215:2181(CONNECTED) 71] create -e /wangzhe/fashi/ganjiang "ganjiang"
Created /wangzhe/fashi/ganjiang

//4.创建节点:有序临时节点
[zk: 192.168.0.215:2181(CONNECTED) 73] create -e -s /wangzhe/fashi/anqila "anqila"
Created /wangzhe/fashi/anqila0000000003

//查看
[zk: 192.168.0.215:2181(CONNECTED) 74] ls /wangzhe/fashi
[anqila0000000003, daji0000000000, daji0000000001, ganjiang]


//delete删除
[zk: 192.168.0.215:2181(CONNECTED) 19] delete /wangzhe/fashi/xiaoqiao
xiaoqiao             xiaoqiao0000000007

5.5.2获取/查看的值

注意:get获取有序节点的值时:必须获取最后一个序号的值,比如获取下面ganjiang0000000004报错,获取ganjiang0000000005正常。
ls
get
get -s

//查看
[zk: 192.168.0.215:2181(CONNECTED) 10] ls /wangzhe/fashi
[daji0000000000, daji0000000001, ganjiang0000000004, ganjiang0000000005, xiaoqiao]

//获取值
[zk: 192.168.0.215:2181(CONNECTED) 12] get /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao

// 获取详细
[zk: 192.168.0.215:2181(CONNECTED) 13] get -s /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao
cZxid = 0x30000002f
ctime = Sat Mar 26 18:37:35 CST 2022
mZxid = 0x30000002f
mtime = Sat Mar 26 18:37:35 CST 2022
pZxid = 0x30000002f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0

[zk: 192.168.0.215:2181(CONNECTED) 14] get -s /wangzhe/fashi
this is fashi
cZxid = 0x300000020
ctime = Sat Mar 26 17:31:12 CST 2022
mZxid = 0x300000020
mtime = Sat Mar 26 17:31:12 CST 2022
pZxid = 0x30000002f
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 6

5.6节点监听

get -w :监听值
ls -w /wangzhe :监听数量
监听注册一个生效一次

5.6.1监听节点值改变

[zk: 192.168.0.215:2181(CONNECTED) 25] get -w /wangzhe //开启一次监听
this is wangzhe
[zk: 192.168.0.215:2181(CONNECTED) 26] set /wangzhe "jianting:this is wangzhe"  //修改值

WATCHER::  //监听值发生变化
WatchedEvent state:SyncConnected type:NodeDataChanged path:/wangzhe

5.6.2监听节点下子节点数量改变

[zk: 192.168.0.215:2181(CONNECTED) 27] ls -w /wangzhe  ////开启一次监听
[fashi, fashi0000000001]
[zk: 192.168.0.215:2181(CONNECTED) 28] delete /wangzhe/fashi0000000001  //删除一个节点

WATCHER::  //监听数量发生变化

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/wangzhe

6.goang操作zookeeper

6.1创建节点create(增)

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
  // 任意一个ip都可以,但是建议放主节点ip
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// 1.创建的普通永久节点
	path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))
	if err != nil {
		fmt.Println("err:",err)
	}
	fmt.Println("创建的普通永久节点:", path)

  // 2.创建的普通临时节点,创建此节点的会话结束后立即清除此节点
	ephemeral, err := conn.Create("/ephemeral", []byte("world"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	if err != nil {
		fmt.Println("err:",err)
	}
	fmt.Println("创建的普通临时节点:", ephemeral)

  // 3.创建的有序永久节点
	sequence, err := conn.Create("/sequence", []byte("world"), zk.FlagSequence, zk.WorldACL(zk.PermAll))
	if err != nil {
		panic(err)
	}
	fmt.Println("创建的有序永久节点:", sequence)

  // 4.创建的有序临时节点,创建此节点的会话结束后立即清除此节点
	ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("world"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
	if err != nil {
		panic(err)
	}
	fmt.Println("创建的有序临时节点:", ephemeralSequence)
}

6.2查看节点get(查)

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	result, state, err := conn.Get("/wangzhe/fashi")
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	fmt.Println("result: ", string(result)) // this is fashi
	fmt.Printf("%#v",state)
  //状态结果:&zk.Stat{Czxid:12884901920, Mzxid:12884901920, Ctime:1648287072539, Mtime:1648287072539, Version:0, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:13, NumChildren:5, Pzxid:12884901936}
}

6.3修改节点set(改)

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	path := "/wangzhe/fashi"
	_, state, _ := conn.Get(path) // 先查询,拿到当前版本

	state, err = conn.Set(path, []byte("hello"), state.Version)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	fmt.Printf("%#v",state)

  //结果:&zk.Stat{Czxid:12884901920, Mzxid:12884902012, Ctime:1648287072539, Mtime:1648305221598, Version:1, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:5, NumChildren:5, Pzxid:12884901936}2022/03/26 22:33:39 recv loop

}

6.4删除节点delete(删)

注意:此方法不能递归删除节点

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	path := "/hello"
	exists, state, err := conn.Exists(path)  //判断是否存在和查询版本
	if exists{
		err = conn.Delete(path, state.Version)
		if err != nil {
			fmt.Println("err:",err)
			return
		}
		fmt.Println("节点删除成功!!!")
	}else {
		fmt.Println("节点不存在!!!")
	}
}

6.5查看节点的子节点Children

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	childrenList, state, err := conn.Children("/wangzhe/fashi")
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	fmt.Println(childrenList)
	fmt.Printf("%#v",state)
}

6.6遍历节点Children后再get

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	childrenList, _, err := conn.Children("/wangzhe/fashi")
	if err != nil {
		fmt.Println("err:",err)
		return
	}

	for _,children:=range childrenList{
		childrenPath:= fmt.Sprintf("/wangzhe/fashi/%s",children)
		result, state, err := conn.Get(childrenPath)
		if err != nil {
			fmt.Println("err:",err)
			return
		}
		fmt.Println(string(result))
		fmt.Println(state)
	}
}

6.7判断节点是否存在-conn.Exists

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	path := "/hello"
	exists, _, err := conn.Exists(path)
  if err != nil {
		fmt.Println("err:",err)
		return
	}

  if exists{
    fmt.Println("节点存在")
  }else{
    fmt.Println("节点不存在")
  }
}

7.监听/watch

7.1监听节点-全局监听

将监听器放到Connect函数中,如果有监听事件发生,会一直执行监听器的回调函数。监听执行了一次之后要重新注册监听。

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)


func callback(e zk.Event) {
	fmt.Println("========================")
	fmt.Println("path:", e.Path)
	fmt.Println("type:", e.Type.String())
	fmt.Println("state:", e.State.String())
}

func main() {
	eventCallbackOption := zk.WithEventCallback(callback)
  // 经过测试,在连接时会执行3次回调函数
	conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second,eventCallbackOption)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()


	// 注册一个 watch
	exists, state, _, err := conn.ExistsW("/watch")
	if err != nil {
		fmt.Println("err:",err)
		return
	}


	fmt.Println("exists:",exists)
	if !exists {
		// 创建 /watch 时,触发监听事件,watch 失效
		_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			fmt.Println("err:",err)
			return
		}

		// 再注册一个 watch
		_, state, _, err = conn.ExistsW("/watch")
		if err != nil {
			fmt.Println("err:",err)
			return
		}
	}

	// 删除 /watch 时,触发监听事件,watch 失效
	err = conn.Delete("/watch", state.Version)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
}

结果:

========================
path:
type: EventSession
state: StateConnecting
========================
path:
type: EventSession
state: StateConnected
2022/03/27 11:09:27 connected to 192.168.0.215:2181
========================
path:
type: EventSession
state: StateHasSession
2022/03/27 11:09:27 authenticated: id=360292329056632906, timeout=4000
2022/03/27 11:09:27 re-submitting `0` credentials after reconnect
exists: false
========================
path: /watch
type: EventNodeCreated
state: Unknown
========================
path: /watch
type: EventNodeDeleted
state: Unknown

7.2监听部分事件

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()

	// 注册一个 watch
	exists, _, eventChannel, err := conn.ExistsW("/watch")
	if err != nil {
		fmt.Println("err:",err)
		return
	}

	go func() {
		// 从事件 channel 中取出事件
		e := <-eventChannel
		fmt.Println("========================")
		fmt.Println("path:", e.Path)
		fmt.Println("type:", e.Type.String())
		fmt.Println("state:", e.State.String())
	}()
	if !exists {
		// 创建 临时节点/watch 时,触发监听事件,watch 失效
		_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			fmt.Println("err:",err)
			return
		}
	}
}

8.微服务动态上下线监听(服务注册/发现)

8.1需求实现

微服务分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求实现:
服务端:服务端启动时,在zookeeper中创建临时有序节点,服务关闭时,临时节点自动删除了(zookeeper临时节点机制)
客户端:监听节点的变化

8.2服务端创建代码-(注册服务)

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()
	//创建的有序临时节点,创建此节点的会话结束后立即清除此节点 create -e -s
	ephemeralSequence, err := conn.Create("/servers/bikesvc", []byte("bikesvc"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	fmt.Println("创建的有序临时节点:", ephemeralSequence)
	time.Sleep(time.Second*10)
}

8.3客户端监听代码-(服务发现)

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
	snapshots := make(chan []string)
	errors := make(chan error)
	go func() {
		for {
			snapshot, _, events, err := conn.ChildrenW(path)
			if err != nil {
				errors <- err
				return
			}
			snapshots <- snapshot
			evt := <-events
			if evt.Err != nil {
				errors <- evt.Err
				return
			}
		}
	}()
	return snapshots, errors
}

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	defer conn.Close()

	snapshots, errors := mirror(conn, "/servers") //监控的根节点,根节点不能删除
	go func() {
		for {
			select {
			case snapshot := <-snapshots:
				fmt.Println("监控变化:", snapshot)
			case err := <-errors:
				fmt.Println("err:", err)
			}
		}
	}()
	for {

	}
}
结果:
服务端:
创建的有序临时节点: /servers/bikesvc0000000010

客户端:
监控变化: []
监控变化: [bikesvc0000000009]
监控变化: []

9.分布式锁

加锁进行资源保护
go-zookeeper 添加分布式锁的方法为NewLock(c *Conn, path string, acl []ACL)。
锁的结构体为:
type Lock struct {
	c        *Conn
	path     string
	acl      []ACL
	lockPath string
	seq      int
}

这个结构体实现了三个方法:Lock(),LockWithData(data []byte)和Unlock()

9.1分布式锁案例

根节点“/root”判断是否存在,不存在则创建
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()
	var wg sync.WaitGroup

	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁
			err = lock.LockWithData([]byte("it is a lock"))
			if err != nil {
				panic(err)
			}
			fmt.Println("第", n, "个 goroutine 获取到了锁")
			time.Sleep(time.Second*1) // 1 秒后释放锁
			lock.Unlock()  //解锁
		}(i)
	}
	wg.Wait()
}
这里给了两个进程抢锁,ls查看一下锁:
解释:把所有进程按有序排列,当成节点放入lock节点中,按照最小的序号执行。解锁一个删除一个。直到节点为空,进程执行完毕。
[zk: localhost:2181(CONNECTED) 32] ls /root/lock
[_c_1dbbc1ec75b285ef10a6d6154627335c-lock-0000000153, _c_793a837ded040d01608395e5eac65979-lock-0000000152]

先执行152,再执行153

9.2监控锁案例

监控锁节点变化

监控代码

package main

import (
	"fmt"
	"time"

	"github.com/go-zookeeper/zk"
)

func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
	snapshots := make(chan []string)
	errors := make(chan error)
	go func() {
		for {
			snapshot, _, events, err := conn.ChildrenW(path)
			if err != nil {
				errors <- err
				return
			}
			snapshots <- snapshot
			evt := <-events
			if evt.Err != nil {
				errors <- evt.Err
				return
			}
		}
	}()
	return snapshots, errors
}

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:", err)
		return
	}
	defer conn.Close()

	snapshots, errors := mirror(conn, "/root/lock") //监控的根节点,根节点不能删除
	go func() {
		for {
			select {
			case snapshot := <-snapshots:
				fmt.Println("监控变化:", snapshot)
			case err := <-errors:
				fmt.Println("err:", err)
			}
		}
	}()
	for {

	}
}

分布式锁代码

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
	if err != nil {
		fmt.Println("err:",err)
		return
	}
	defer conn.Close()
	var wg sync.WaitGroup

	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁
			err = lock.LockWithData([]byte("it is a lock"))
			if err != nil {
				panic(err)
			}
			fmt.Println("第", n, "个 goroutine 获取到了锁")
			time.Sleep(time.Second*1) // 1 秒后释放锁
			lock.Unlock()  //解锁
		}(i)
	}
	wg.Wait()
}

结果:

zookeeper从小白到精通-LMLPHP

zookeeper从小白到精通-LMLPHP

选择了IT,必定终身学习
03-27 18:22