文章目录

版本声明

  1. Zookeeper服务器版本:3.4.6

java原生客户端

  1. 依赖

     compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.4.6'
    
  2. 原生客户端比较底层,使用起来比较麻烦,需要处理很多细节,而且容易出错,没有对各种场景的封装(比如共享锁、分布式锁、集群领导选举机制等),在实际生产环境推荐使用Curator客户端。

创建会话

  1. 创建会话

        class InitWatch implements Watcher {
    
            private CountDownLatch countDownLatch;
    
            public InitWatch(CountDownLatch countDownLatch) {
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void process(WatchedEvent watchedEvent) {
                logger.info(watchedEvent.toString());
                if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    logger.info("连接成功...");
                    countDownLatch.countDown();
                }
            }
        }
    
        @Test
        public void testCreateClient() throws Exception {
            //String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            //是否支持只读模式,当集群过半失去网络连接,是否还继续支持读服务(可写肯定是不行了)
            boolean canBeReadOnly = false;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch),canBeReadOnly);
    
            countDownLatch.await();
            //CONNECTED
            logger.info(zooKeeper.getState().toString());
        }
    
        @Test
        public void testCreateClient2() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            boolean canBeReadOnly = false;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch),canBeReadOnly);
    
            countDownLatch.await();
            //CONNECTED
            logger.info(zooKeeper.getState().toString());
    
            //sessionId和sessionPasswd这两个参数能唯一确定一个会话
            long sessionId = zooKeeper.getSessionId();
            byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            //复用之前的sessionId和sessionPasswd创建连接
            ZooKeeper zooKeeper1 = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch2), sessionId, sessionPasswd);
            countDownLatch2.await();
            logger.info(zooKeeper1.getState().toString());
        }
    
    

创建节点

  1. 创建节点

    
        class InitWatch implements Watcher {
    
            private CountDownLatch countDownLatch;
    
            public InitWatch(CountDownLatch countDownLatch) {
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void process(WatchedEvent watchedEvent) {
                logger.info(watchedEvent.toString());
                if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    logger.info("连接成功...");
                    countDownLatch.countDown();
                }
            }
        }
    
        @Test
        public void testCreateZnodeSync() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
    
            countDownLatch.await();
            String path = "/zookeeper-create";
            byte[] data = "jannal".getBytes(StandardCharsets.UTF_8);
            path = zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //zookeeper-create创建成功
            logger.info("{}创建成功", path);
        }
    
       @Test
       public void testCreateZnodeAsync() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
    
            countDownLatch.await();
            String path = "/zookeeper-create-async";
            byte[] data = "jannal".getBytes(StandardCharsets.UTF_8);
            //Context用于传递一个对象,可以在回调方法执行的时候使用
            String context = "我是Context";
            zooKeeper.create(path,
                    data,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT,
                    new AsyncCallback.StringCallback() {
                        /**
                         *
                         * @param resultCode
                         *          0 (OK) 接口调用成功
                         *          -4(ConnectionLoss):客户端与服务器连接断开
                         *          -100(NodeExists)
                         *          -112(SessionExpired)
                         * @param path
                         * @param ctx
                         * @param name 实际在服务上创建的节点名称
                         */
                        @Override
                        public void processResult(int resultCode, String path, Object ctx, String name) {
                            // rc:0,path:/zookeeper-create-async,ctx:我是Context,name:/zookeeper-create-async
                            logger.info("rc:{},path:{},ctx:{},name:{}", resultCode, path, ctx, name);
                        }
                    }, context);
            TimeUnit.SECONDS.sleep(5000);
    
        }
    
    
    

读取节点

  1. 读取数据
        @Test
        public void testGetNode() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
            countDownLatch.await();
            zooKeeper.create("/zookeeper-get", "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zooKeeper.create("/zookeeper-get/user", "2".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zooKeeper.create("/zookeeper-get/person", "3".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            List<String> children = zooKeeper.getChildren("/zookeeper-get", null);
            // [user, person]
            logger.info(children.toString());
    
            String path = "/zookeeper-get";
            /**
             *  1. 如果在获取指定节点的子节点列表后,还需要订阅这个子节点列表变化的通知,
             *  可以注册一个watch。zk服务端发送NodeChildrenChanged事件通知时
             *  仅仅只会发出一个通知,不会把子节点的变化情况发送给客户端,客户端需要重新获取
             *  2. Watcher是一次性的,一旦触发一次通知后,该Watch就立即失效,因此客户端需要反复注册Watcher
             *
             */
            List<String> children1 = zooKeeper.getChildren(path, getWatcher(path, zooKeeper, null));
            logger.info(children1.toString());
    
            /**
             * 1. 我们不仅需要获取节点最新的子节点列表
             * 还需要获取最新的节点状态信息,此时可以传递一个Stat,
             * 此变量会被服务器响应的新stat对象替换掉
             */
            Stat stat = new Stat();
            List<String> children2 = zooKeeper.getChildren(path, getWatcher(path, zooKeeper, stat), stat);
    
            logger.info(children2.toString(), ToStringBuilder.reflectionToString(stat));
    
    
            byte[] data = zooKeeper.getData(path, getWatcher(path, zooKeeper, stat), stat);
            logger.info(new String(data, StandardCharsets.UTF_8));
    
            //修改数据,可以看到watcher被调用,version指定数据版本(乐观锁)
            zooKeeper.setData(path,data,stat.getVersion());
            //休眠1s让第一个数据更新的Watcher先调用成功,这样下一个stat才能获取正确的版本
            Thread.sleep(1000);
            zooKeeper.setData(path,data,stat.getVersion());
    
            Thread.sleep(60000);
        }
    
        private Watcher getWatcher(final String path, final ZooKeeper zooKeeper, Stat stat) {
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
                        try {
                            List<String> children = zooKeeper.getChildren(path, true);
                            logger.info("{}子节点发生变化,重新获取:{},stat:{}", path, children.toString(), ToStringBuilder.reflectionToString(stat));
                            //重新注册Watcher
                            zooKeeper.getChildren(path, this, stat);
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    //数据内容或者数据版本变化都会触发NodeDataChanged事件
                    if (Event.EventType.NodeDataChanged == watchedEvent.getType()) {
                        try {
                            byte[] data = zooKeeper.getData(path, true, stat);
                            logger.info("{}数据发生变化{},stat:{}", path,new String(data, StandardCharsets.UTF_8), ToStringBuilder.reflectionToString(stat));
                            //重新注册watcher
                            zooKeeper.getData(path, this, stat);
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
    
                    }
                }
            };
            return watcher;
        }
    
    

更新节点

  1. 更新节点

        @Test
        public void testUpdateNodeSync() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
            countDownLatch.await();
    
            String path = "/zookeeper-updateSync";
            zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Stat stat = new Stat();
            zooKeeper.getData(path, true, stat);
            //version=0
            logger.info(ToStringBuilder.reflectionToString(stat));
            zooKeeper.setData(path, "2".getBytes(StandardCharsets.UTF_8), stat.getVersion());
            byte[] newData = zooKeeper.getData(path, true, stat);
            //2
            logger.info(new String(newData, StandardCharsets.UTF_8));
            //version=1
            logger.info(ToStringBuilder.reflectionToString(stat));
        }
    
    
        @Test
        public void testUpdateNodeAsync() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
            countDownLatch.await();
    
            String path = "/zookeeper-updateAsync";
            zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Stat stat = new Stat();
            String context = "我是Context";
            zooKeeper.getData(path, true, stat);
            //version=0
            logger.info(ToStringBuilder.reflectionToString(stat));
            zooKeeper.setData(path, "2".getBytes(StandardCharsets.UTF_8), stat.getVersion(), new AsyncCallback.StatCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat) {
                    if (rc == 0) {
                        logger.info("更新成功,rc:{},path:{},ctx:{},stat:{}", rc, path, ctx, ToStringBuilder.reflectionToString(stat));
                    }
                }
            }, context);
            TimeUnit.SECONDS.sleep(10000);
        }
    
    

删除节点

  1. 删除节点

        @Test
        public void testDeleteNode() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
            countDownLatch.await();
            String path = "/zookeeper-delete";
            zooKeeper.create(path, "1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            //检测节点是否存在
            //不存在返回空
            Stat stat = zooKeeper.exists(path, true);
    
            if (stat != null) {
                zooKeeper.delete(path, stat.getVersion());
            } else {
                logger.error("{}不存在", path);
            }
    
    
        }
    
    

ACL

  1. 一个Zookeeper集群,不同应用之间如果不想共享数据,则需要解决不用应用之间的访问数据的权限问题。 Zookeeper提供了ACL(Access Control List访问控制列表)机制,就是通过设置Zookeeper服务器上数据节点的ACL,来控制客户端对该节点的访问权限。如果一个客户端符合该ACL控制,那么就可以对其进行访问,否则无法操作。
  2. 如果对一个数据节点添加权限信息后,修改和访问必须携带权限信息,但是可以自由的删除这个节点(删除节点的权限控制比较特殊),但是对于这个节点的子节点,必须使用相应的权限信息才能够删除。
  3. 传统的文件系统中,ACL分为两个维度,一个是属组,一个是权限,子目录/文件默认继承父目录的ACL。而在Zookeeper中,znodeACL是没有继承关系的,是独立控制的。ZookeeperACL,可以从三个维度来理解:一是scheme; 二是user; 三是permission,通常表示为scheme:id:permissions, Zookeeper有5个内置的scheme:
    • world:只有一个唯一的id:anyone;表示任何人都可以做对应的操作。这个scheme没有对应的鉴权实现。只要一个znode的ACL中包含有这个scheme的Id,其对应的操作就运行执行
    • auth:没有对应的id,或者只有一个空串id。这个scheme没有对应的鉴权实现。语义是当前连接绑定的适合做创建者鉴权的autoInfo (通过调用autoInfo的scheme对应的API的isAuthenticated()得知)都拥有对应的权限。遇到这个auth后,Server会根据当前连接绑定的符合要求的autoInfo生成ACL加入到所操作znode的acl列表中。
    • digest:使用username:password格式的字符串生成MD5 hash 作为ACL ID。 具体格式为:username:base64 encoded SHA1 password digest.对应内置的鉴权插件:DigestAuthenticationProvider
    • ip:它对应的id为客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段。对应内置鉴权插件IPAuthenticationProvider
    • super: 在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa)
  4. permission,zookeeper支持下面一些权限:
    • CREATE(c): 创建权限,可以在在当前node下创建child node
    • DELETE(d): 删除权限,可以删除当前的node
    • READ(r): 读权限,可以获取当前node的数据,可以list当前node所有的child nodes
    • WRITE(w): 写权限,可以向当前node写数据
    • ADMIN(a): 管理权限,可以设置当前node的permission
  5. 示例代码
       @Test
       public void testAclNode() throws Exception {
           String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
           int sessionTimeoutMs = 25000;
           CountDownLatch countDownLatch = new CountDownLatch(1);
           ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch));
           countDownLatch.await();
    
           String scheme = "digest";
           byte[] auth = "jannal:jannal".getBytes(StandardCharsets.UTF_8);
           /**
            * addAuthInfo主要用于为当前zk会话添加权限信息,之后凡是通过
            * 该会话对zk服务器的任何操作,都会带上权限信息
            * scheme:
            *      权限控制模式,分为world,auth,digest,ip,super
            * auth:
            *      具体权限信息
            *
            */
           zooKeeper.addAuthInfo(scheme, auth);
           String pathParent = "/zookeeper-acl";
    
           /**
            * Ids.CREATOR_ALL_ACL:只有创建节点的客户端才有所有权限
            * Ids.OPEN_ACL_UNSAFE:这是一个完全开放的权限,所有客户端都有权限
            * Ids.READ_ACL_UNSAFE:所有客户端只有读取的
            */
           zooKeeper.create(pathParent,
                   "1".getBytes(StandardCharsets.UTF_8),
                   ZooDefs.Ids.CREATOR_ALL_ACL,
                   CreateMode.PERSISTENT);
    
           String pathChildren = "/zookeeper-acl/user";
           zooKeeper.create(pathChildren,
                   "2".getBytes(StandardCharsets.UTF_8),
                   ZooDefs.Ids.CREATOR_ALL_ACL,
                   CreateMode.PERSISTENT);
           byte[] data = zooKeeper.getData(pathParent, false, null);
           logger.info(new String(data, StandardCharsets.UTF_8));
    
           //使用错误的权限信息访问
           CountDownLatch countDownLatch2 = new CountDownLatch(1);
           ZooKeeper zooKeeper2 = new ZooKeeper(connectString, sessionTimeoutMs, new InitWatch(countDownLatch2));
           countDownLatch2.await();
           String scheme2 = "digest";
           byte[] auth2 = "jannal:123".getBytes(StandardCharsets.UTF_8);
           zooKeeper2.addAuthInfo(scheme2, auth2);
           //抛异常 NoAuth for /zookeeper-acl
           try {
               zooKeeper2.getData(pathParent, false, null);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
           try {
               //错误的权限信息删除父节点  Directory not empty for /zookeeper-acl
               zooKeeper2.delete(pathParent, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
           try {
               //错误的权限信息删除子节点  NoAuth for /zookeeper-acl/user
               zooKeeper2.delete(pathChildren, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
           try {
               //错误的权限信息删除父节点 Directory not empty for /zookeeper-acl
               zooKeeper2.delete(pathParent, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
           try {
               //正确的权限删除父节点 Directory not empty for /zookeeper-acl
               zooKeeper.delete(pathParent, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
           try {
               //正确的权限删除子节点,正确删除
               zooKeeper.delete(pathChildren, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
           try {
               //错误的权限信息删除父节点(此时子节点已经删除),正确删除
               zooKeeper2.delete(pathParent, -1);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           }
    
    
    
       }
    
    
    
10-05 15:04