文章目录
版本声明
Zookeeper
服务器版本:3.4.6
java原生客户端
依赖
compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.4.6'
原生客户端比较底层,使用起来比较麻烦,需要处理很多细节,而且容易出错,没有对各种场景的封装(比如共享锁、分布式锁、集群领导选举机制等),在实际生产环境推荐使用
Curator
客户端。
创建会话
创建会话
class InitWatch implements Watcher { private CountDownLatch countDownLatch; public InitWatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent watchedEvent) { logger.info(watchedEvent.toString()); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { logger.info("连接成功..."); countDownLatch.countDown(); } } } @Test public void testCreateClient() throws Exception { //String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; //是否支持只读模式,当集群过半失去网络连接,是否还继续支持读服务(可写肯定是不行了) boolean canBeReadOnly = false; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch),canBeReadOnly); countDownLatch.await(); //CONNECTED logger.info(zooKeeper.getState().toString()); } @Test public void testCreateClient2() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; boolean canBeReadOnly = false; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch),canBeReadOnly); countDownLatch.await(); //CONNECTED logger.info(zooKeeper.getState().toString()); //sessionId和sessionPasswd这两个参数能唯一确定一个会话 long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); CountDownLatch countDownLatch2 = new CountDownLatch(1); //复用之前的sessionId和sessionPasswd创建连接 ZooKeeper zooKeeper1 = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch2), sessionId, sessionPasswd); countDownLatch2.await(); logger.info(zooKeeper1.getState().toString()); }
创建节点
创建节点
class InitWatch implements Watcher { private CountDownLatch countDownLatch; public InitWatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent watchedEvent) { logger.info(watchedEvent.toString()); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { logger.info("连接成功..."); countDownLatch.countDown(); } } } @Test public void testCreateZnodeSync() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String path = "/zookeeper-create"; byte[] data = "jannal".getBytes(StandardCharsets.UTF_8); path = zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //zookeeper-create创建成功 logger.info("{}创建成功", path); } @Test public void testCreateZnodeAsync() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String path = "/zookeeper-create-async"; byte[] data = "jannal".getBytes(StandardCharsets.UTF_8); //Context用于传递一个对象,可以在回调方法执行的时候使用 String context = "我是Context"; zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { /** * * @param resultCode * 0 (OK) 接口调用成功 * -4(ConnectionLoss):客户端与服务器连接断开 * -100(NodeExists) * -112(SessionExpired) * @param path * @param ctx * @param name 实际在服务上创建的节点名称 */ @Override public void processResult(int resultCode, String path, Object ctx, String name) { // rc:0,path:/zookeeper-create-async,ctx:我是Context,name:/zookeeper-create-async logger.info("rc:{},path:{},ctx:{},name:{}", resultCode, path, ctx, name); } }, context); TimeUnit.SECONDS.sleep(5000); }
读取节点
- 读取数据
@Test public void testGetNode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); zooKeeper.create("/zookeeper-get", "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/zookeeper-get/user", "2".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/zookeeper-get/person", "3".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> children = zooKeeper.getChildren("/zookeeper-get", null); // [user, person] logger.info(children.toString()); String path = "/zookeeper-get"; /** * 1. 如果在获取指定节点的子节点列表后,还需要订阅这个子节点列表变化的通知, * 可以注册一个watch。zk服务端发送NodeChildrenChanged事件通知时 * 仅仅只会发出一个通知,不会把子节点的变化情况发送给客户端,客户端需要重新获取 * 2. Watcher是一次性的,一旦触发一次通知后,该Watch就立即失效,因此客户端需要反复注册Watcher * */ List<String> children1 = zooKeeper.getChildren(path, getWatcher(path, zooKeeper, null)); logger.info(children1.toString()); /** * 1. 我们不仅需要获取节点最新的子节点列表 * 还需要获取最新的节点状态信息,此时可以传递一个Stat, * 此变量会被服务器响应的新stat对象替换掉 */ Stat stat = new Stat(); List<String> children2 = zooKeeper.getChildren(path, getWatcher(path, zooKeeper, stat), stat); logger.info(children2.toString(), ToStringBuilder.reflectionToString(stat)); byte[] data = zooKeeper.getData(path, getWatcher(path, zooKeeper, stat), stat); logger.info(new String(data, StandardCharsets.UTF_8)); //修改数据,可以看到watcher被调用,version指定数据版本(乐观锁) zooKeeper.setData(path,data,stat.getVersion()); //休眠1s让第一个数据更新的Watcher先调用成功,这样下一个stat才能获取正确的版本 Thread.sleep(1000); zooKeeper.setData(path,data,stat.getVersion()); Thread.sleep(60000); } private Watcher getWatcher(final String path, final ZooKeeper zooKeeper, Stat stat) { Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) { try { List<String> children = zooKeeper.getChildren(path, true); logger.info("{}子节点发生变化,重新获取:{},stat:{}", path, children.toString(), ToStringBuilder.reflectionToString(stat)); //重新注册Watcher zooKeeper.getChildren(path, this, stat); } catch (Exception e) { logger.error(e.getMessage(), e); } } //数据内容或者数据版本变化都会触发NodeDataChanged事件 if (Event.EventType.NodeDataChanged == watchedEvent.getType()) { try { byte[] data = zooKeeper.getData(path, true, stat); logger.info("{}数据发生变化{},stat:{}", path,new String(data, StandardCharsets.UTF_8), ToStringBuilder.reflectionToString(stat)); //重新注册watcher zooKeeper.getData(path, this, stat); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }; return watcher; }
更新节点
更新节点
@Test public void testUpdateNodeSync() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String path = "/zookeeper-updateSync"; zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = new Stat(); zooKeeper.getData(path, true, stat); //version=0 logger.info(ToStringBuilder.reflectionToString(stat)); zooKeeper.setData(path, "2".getBytes(StandardCharsets.UTF_8), stat.getVersion()); byte[] newData = zooKeeper.getData(path, true, stat); //2 logger.info(new String(newData, StandardCharsets.UTF_8)); //version=1 logger.info(ToStringBuilder.reflectionToString(stat)); } @Test public void testUpdateNodeAsync() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String path = "/zookeeper-updateAsync"; zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = new Stat(); String context = "我是Context"; zooKeeper.getData(path, true, stat); //version=0 logger.info(ToStringBuilder.reflectionToString(stat)); zooKeeper.setData(path, "2".getBytes(StandardCharsets.UTF_8), stat.getVersion(), new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (rc == 0) { logger.info("更新成功,rc:{},path:{},ctx:{},stat:{}", rc, path, ctx, ToStringBuilder.reflectionToString(stat)); } } }, context); TimeUnit.SECONDS.sleep(10000); }
删除节点
删除节点
@Test public void testDeleteNode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String path = "/zookeeper-delete"; zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //检测节点是否存在 //不存在返回空 Stat stat = zooKeeper.exists(path, true); if (stat != null) { zooKeeper.delete(path, stat.getVersion()); } else { logger.error("{}不存在", path); } }
ACL
- 一个
Zookeeper
集群,不同应用之间如果不想共享数据,则需要解决不用应用之间的访问数据的权限问题。Zookeeper
提供了ACL(Access Control List访问控制列表)
机制,就是通过设置Zookeeper
服务器上数据节点的ACL,来控制客户端对该节点的访问权限。如果一个客户端符合该ACL控制,那么就可以对其进行访问,否则无法操作。 - 如果对一个数据节点添加权限信息后,修改和访问必须携带权限信息,但是可以自由的删除这个节点(删除节点的权限控制比较特殊),但是对于这个节点的子节点,必须使用相应的权限信息才能够删除。
- 传统的文件系统中,
ACL
分为两个维度,一个是属组,一个是权限,子目录/文件默认继承父目录的ACL
。而在Zookeeper
中,znode
的ACL
是没有继承关系的,是独立控制的。Zookeeper
的ACL
,可以从三个维度来理解:一是scheme
; 二是user
; 三是permission
,通常表示为scheme:id:permissions
,Zookeeper
有5个内置的scheme:world
:只有一个唯一的id:anyone;表示任何人都可以做对应的操作。这个scheme没有对应的鉴权实现。只要一个znode的ACL
中包含有这个scheme的Id,其对应的操作就运行执行auth
:没有对应的id,或者只有一个空串id。这个scheme没有对应的鉴权实现。语义是当前连接绑定的适合做创建者鉴权的autoInfo (通过调用autoInfo的scheme对应的API的isAuthenticated()得知)都拥有对应的权限。遇到这个auth后,Server会根据当前连接绑定的符合要求的autoInfo生成ACL加入到所操作znode的acl列表中。digest
:使用username:password格式的字符串生成MD5 hash 作为ACL ID。 具体格式为:username:base64 encoded SHA1 password digest.对应内置的鉴权插件:DigestAuthenticationProviderip
:它对应的id为客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段。对应内置鉴权插件IPAuthenticationProvidersuper
: 在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa)
- permission,zookeeper支持下面一些权限:
CREATE(c)
: 创建权限,可以在在当前node下创建child nodeDELETE(d)
: 删除权限,可以删除当前的nodeREAD(r)
: 读权限,可以获取当前node的数据,可以list当前node所有的child nodesWRITE(w)
: 写权限,可以向当前node写数据ADMIN(a)
: 管理权限,可以设置当前node的permission
- 示例代码
@Test public void testAclNode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch)); countDownLatch.await(); String scheme = "digest"; byte[] auth = "jannal:jannal".getBytes(StandardCharsets.UTF_8); /** * addAuthInfo主要用于为当前zk会话添加权限信息,之后凡是通过 * 该会话对zk服务器的任何操作,都会带上权限信息 * scheme: * 权限控制模式,分为world,auth,digest,ip,super * auth: * 具体权限信息 * */ zooKeeper.addAuthInfo(scheme, auth); String pathParent = "/zookeeper-acl"; /** * Ids.CREATOR_ALL_ACL:只有创建节点的客户端才有所有权限 * Ids.OPEN_ACL_UNSAFE:这是一个完全开放的权限,所有客户端都有权限 * Ids.READ_ACL_UNSAFE:所有客户端只有读取的 */ zooKeeper.create(pathParent, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); String pathChildren = "/zookeeper-acl/user"; zooKeeper.create(pathChildren, "2".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); byte[] data = zooKeeper.getData(pathParent, false, null); logger.info(new String(data, StandardCharsets.UTF_8)); //使用错误的权限信息访问 CountDownLatch countDownLatch2 = new CountDownLatch(1); ZooKeeper zooKeeper2 = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch2)); countDownLatch2.await(); String scheme2 = "digest"; byte[] auth2 = "jannal:123".getBytes(StandardCharsets.UTF_8); zooKeeper2.addAuthInfo(scheme2, auth2); //抛异常 NoAuth for /zookeeper-acl try { zooKeeper2.getData(pathParent, false, null); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //错误的权限信息删除父节点 Directory not empty for /zookeeper-acl zooKeeper2.delete(pathParent, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //错误的权限信息删除子节点 NoAuth for /zookeeper-acl/user zooKeeper2.delete(pathChildren, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //错误的权限信息删除父节点 Directory not empty for /zookeeper-acl zooKeeper2.delete(pathParent, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //正确的权限删除父节点 Directory not empty for /zookeeper-acl zooKeeper.delete(pathParent, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //正确的权限删除子节点,正确删除 zooKeeper.delete(pathChildren, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } try { //错误的权限信息删除父节点(此时子节点已经删除),正确删除 zooKeeper2.delete(pathParent, -1); } catch (Exception e) { logger.error(e.getMessage(), e); } }