• zookeeper快速上手



    zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务

    Apache Hadoop大数据生态里面的组件的图标大部分都是动物,例如🐘、🐝、🐖、🐬,而zookeeper的图标是一个拿着铲子的👨‍🏭,是管理大数据生态系统各组件的管理员。

    1972306-20210328192107325-998987816

    zookeeper的详细理论讲解和功能请查看菜鸟教程zookeeper,本篇文章全部为实战

    1. docker安装zookeeper

    下载zookeeper 最新版镜像

    docker search zookeeper    
    docker pull zookeeper 
    docker images              //查看下载的本地镜像
    docker inspect zookeeper   //查看zookeeper详细信息
    
    • 1
    • 2
    • 3
    • 4

    新建一个文件夹

    mkdir zookeeper
    
    • 1

    挂载本地文件夹并启动服务

    docker run -d -e TZ="Asia/Shanghai" -p 2181:2181 -v /root/docker/zookeeper:/data --name zookeeper --restart always zookeeper
    
    • 1

    参数解释

    -e TZ="Asia/Shanghai" # 指定上海时区 
    -d # 表示在一直在后台运行容器
    -p 2181:2181 # 对端口进行映射,将本地2181端口映射到容器内部的2181端口
    --name # 设置创建的容器名称
    -v # 将本地目录(文件)挂载到容器指定目录;
    --restart always #始终重新启动zookeeper
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    查看容器

    docker ps
    
    • 1

    进入容器(zookeeper)

    第一种方式

     docker run -it --rm --link zookeeper:zookeeper zookeeper zkCli.sh -server zookeeper       //这样的话,直接登录到容器时,进入到 zkCli中
    
    • 1

    第二种方式(推荐)

    docker exec -it zookeeper bash      //只登录容器,不登录 zkCli
    
    ./bin/zkCli.sh    //执行脚本新建一个Client,即进入容器
    
    • 1
    • 2
    • 3

    2. zookeeper基本使用(Linux)

    新增结点

    格式:create [-s] [-e] [-c] [-t ttl] path [data] [acl]
    解释: 
    [-s] : 创建有序结点
    [-e] : 创建临时结点
    [-c] : 创建一个容器结点,容器结点主要用来容纳字结点,如果没有给其创建子结点,容器结点表现和持久化结点一样,如果给容器结点创建了子结点,后续又把子结点清空,容器结点也会被zookeeper删除。定时任务默认 60s 检查一次
    [-t ttl] : 创建一个TTL结点, -t 时间(单位毫秒)
    path: 路径 ,因为没有中括号,所以是必须参数。
    [data]:结点的数据,可选,如果不使用时,结点数据就为null
    [acl] :权限相关,后面文章会专门讲
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    演示 :

    创建持久结点: create /test '测试' (默认创建无序持久结点)

    在这里插入图片描述

    创建持久化有序结点:create -s /1_node 1

    创建有序结点时,Zookeeper会在我们知道的结点名称后面补一个有序的,唯一的递增数字后缀

    在这里插入图片描述

    在这里插入图片描述

    创建临时结点:create -e /3_node

    当前客户端和zookeeper连接断开后,临时结点将被清除

    在这里插入图片描述

    断开连接:quit

    当重新连接后:

    在这里插入图片描述

    创建临时有序结点:create -e -s /node3 "2"

    创建容器结点:create -c /node4 "容器结点"

    创建TTL结点:create -t 2000 /node5 "TTL结点"

    查看命令

    格式: get [-s] [-w] path //查看结点存储的值及其结点状态
    解释:
    [-s] : 查看结点数据和结点状态(字段含义看下面的stat)
    [-w] : 查看结点,并添加一个监听器,当指定的znode或znode的子结点数据更改(set path data)时,监视器会显示通知,但只会显示一次,显示后会将监听删除
    
    格式: stat [-w] path //查看结点状态
    解释: 
    [-w] :查看结点并为结点添加一个监听,当结点被修改时,该客户端会收到一个回调。之前版本是在path 后面加一个watch实现:stat path watch
    结点状态参数解释:
    cZxid:创建znode的事务ID(Zxid的值)
    mZxid:最后修改znode的事务ID。
    pZxid:最后添加或删除子结点的事务ID(子结点列表发生变化才会发生改变)。
    ctime:znode创建时间。
    mtime:znode最近修改时间。
    dataVersion:znode的当前数据版本。
    cversion:znode的子结点结果集版本(一个结点的子结点增加、删除都会影响这个版本)。
    aclVersion:表示对此znode的acl版本。
    ephemeralOwner:znode是临时znode时,表示znode所有者的 session ID。 如果znode不是临时znode,则该字段设置为零。
    dataLength:znode数据字段的长度。
    numChildren:znode的子znode的数量
    
    格式: ls [-s] [-w] [-R] path //查看某一结点下的子结点
    解释:
    [-s] : 查看某一结点下的子结点加当前结点的元信息,相当于之前版本的ls2命令
    [-w] : 查看结点并为结点添加一个监听,当结点被修改时,该客户端会收到一个回调。之前版本是在path 后面加一个watch实现:ls path watch 
    [-R] : 返回当前结点路径,当前结点的子结点,当前结点的子结点的子结点(递归)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    get [-s] [-w] path //查看结点存储的值及其结点状态

    在这里插入图片描述

    查看信息并添加监视器:get -w /test,注意监视器只会起一次作用并且在此命令中只监视结点数据更新

    image-20220509204100460

    stat [-w] path //查看结点状态

    在这里插入图片描述

    ls [-s] [-w] [-R] path //查看某一结点下的子结点

    image-20220509210209439

    修改命令

    格式: set [-s] [-v version] path data
    解释:
    [-s] :返回修改后结点的元信息
    [-v version] :指定数据的版本,版本不符合时修改失败,类似关系型数据库的乐观锁,Java中的CAS机制,当传入数据版本号和当前不一致时拒绝修改,初始版本号dataVersion为0,每次修改后会加1
    path :修改结点路径
    data :修改的数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    image-20220509212612042

    删除命令

    格式: delete [-v version] path //删除结点,删除的结点必须没有任何子结点,否则会删除失败
    解释:
    [-v version] :和修改命令一样
    
    deleteall path // 递归结点,会递归删除该结点及其所有子结点,旧版本是rmr path
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其他命令

    history 查看当前连接最新的11条历史命令
    
    connect host:port 连接其他Zookeeper服务器 
    
    close 关闭客户端连接,把连接设置为关闭状态,实质关闭Socket连接,关闭之后发送命令就会报错
    	
    printwatches on|off 是否开启watch机制,如果设置为off,则该客户端监听的结点事件都不会生效、默认on
    
    removewatches path 删除在某结点上设置的监听
    
    sync path  把当前Zookeeper服务器的指定结点同步到主从集群中的其他Zookeeper服务器上
    
    quie 断开连接
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3. zookeeper ACL(Linux)

    此处参考菜鸟教程Zookeeper 权限控制 ACL

    zookeeper 的 ACL(Access Control List,访问控制表)权限在生产环境是特别重要的,所以本章节特别介绍一下。

    ACL 权限可以针对结点设置相关读写等权限,保障数据安全性。

    permissions 可以指定不同的权限范围及角色。

    ACL 命令行

    • getAcl 命令:获取某个结点的 acl 权限信息。
    • setAcl 命令:设置某个结点的 acl 权限信息。
    • addauth 命令:输入认证授权信息,注册时输入明文密码,加密形式保存。

    ACL 构成

    zookeeper 的 acl 通过 [scheme :id :permissions] 来构成权限列表。

    • scheme:代表授权的策略,包括 world、auth、digest、ip、super 几种。
    • id:代表授权的对象,值依赖于schema
    • permissions:表示授予的权限(权限组合字符串),由 cdrwa 组成,其中每个字母代表支持不同权限, 创建权限 create©、删除权限 delete(d)、读权限 read®、写权限 write(w)、管理权限admin(a)

    ACL 特性

    1. Zookeeper的权限控制是基于znode节点的,需要对每个节点设置权限。
    2. 每个znode支持设置多种权限控制方案和多个权限。
    3. 子节点不会继承父节点的权限。客户端无法访问某个节点,但是可以访问他的子节点

    scheme :权限模式

    模式描述
    world这种模式方法的授权对象只有一个anyone,代表登录到服务器的所有客户端都能对该节点执行某种权限
    ip对连接的客户端使用IP地址认证方式进行认证
    auth使用以添加认证的用户进行认证
    digest使用 用户:密码方式验证

    id :授权对象

    代表授权的对象,值依赖于schema

    permission :授予的权限

    即当前用户对结点的权限,具体有:

    类型ACL简写描述
    readr读取节点及显示子节点列表的权限
    writew设置节点数据的权限
    createc创建子节点的权限
    deleted删除子节点的权限
    admina设置该节点ACL权限的权限

    world授权模式

    授权格式:

    setAcl path world:anyone:   //此模式下只能修改acl
    
    • 1

    我们可以通过getAcl 结点路径查看一个结点的访问权限,可以看到创建一个结点的默认访问权限是world授权,即任何人都拥有对该结点的所有权限

    在这里插入图片描述

    例如我想设置某个结点只读,可以这样设置:setAcl /node world:anyone:r

    image-20220510123229426


    ip授权模式

    setAcl path ip:: //用来限制访问者的ip地址
    
    • 1

    可以看到这里我设置了只能由指定ip有权限操作结点,由于不是指定的ip,所以提示无权限访问

    image-20220510124440546

    auth模式

    此模式需要配合addauth命令

    • 第一步:先添加授权用户 addauth digest username:password
    • 第二步:设置该节点只有登录了该授权用户的客户端连接才能进行操作
    格式为:
    addauth digest <user>:<password>  //添加认证用户
    setAcl <path> auth:<user>:<acl>   //授权指定用户访问结点
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    digest授权模式

    digest授权模式其实和auth认证模式一样,区别在于auth认证会自动将你的密码加密存储,而digest需要手动加密密码再进行认证

    • 在Linux中调用系统库函数手动加密密码

    格式为echo -n username:password | openssl dgst -binary -sha1 | openssl base64

    • 添加授权用户,这里的密码要用上一步生成的密文

    addauth digest username:password

    • 给结点赋予授权用户(和auth一样)

    setAcl /node digest:username:password:permissions

    添加授权用户和为节点赋予授权是两个独立的操作,它们的顺序不会相互影响

    一般情况下先添加授权用户,然后为特定节点设置该用户的权限

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    当然可以设置多种acl,用逗号隔开就行

    4. IDEA操作Zookeeper

    官方文档:zookeeper3.8.0官方文档

    首先需要添加以下依赖,其实核心就只有org.apache.zookeeper这个依赖,这里我是为了打印日志才添加了一些其他的依赖

    <properties>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <lombok.version>1.18.22lombok.version>
        <slf4j.version>1.7.36slf4j.version>
        <junit.version>4.7junit.version>
        <logback.version>1.2.3logback.version>
        <zookeeper.version>3.7.0zookeeper.version>
    properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>${lombok.version}version>
        dependency>
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-apiartifactId>
            <version>${slf4j.version}version>
        dependency>
        <dependency>
            <groupId>ch.qos.logbackgroupId>
            <artifactId>logback-classicartifactId>
            <version>${logback.version}version>
        dependency>
        <dependency>
            <groupId>junitgroupId>
            <artifactId>junitartifactId>
            <version>${junit.version}version>
            <scope>compilescope>
        dependency>
        <dependency>
            <groupId>org.apache.zookeepergroupId>
            <artifactId>zookeeperartifactId>
            <version>${zookeeper.version}version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    这里的每一个例子都会将常用的方法都写到一个工具类中,这样我们以后就可以复用我们的工具类了

    我们先新建一个工具类,我这里叫ZkHelper,也可以叫ZkUtils,主要为了封装一些常用的方法

    /**
     * zookeeper帮助类
     *
     * 

    创建一个新的帮助类{@code ZookeeperConnection}并添加一个方法{@code connect} * connect方法创建一个{@link ZooKeeper}对象,连接到Zookeeper集合,然后返回对象 * *

    这里通过{@link CountDownLatch}控制主线程,让其等待客户端线程连接之后再进行进行 */ @Slf4j(topic = "ZkHelper") public class ZkHelper { private static final String connectUri = "ip地址:端口号"; private static final int sessionTimeout = 20000; //超时时间 private static ZooKeeper zkClient = null;//zk的客户端,相当于zkCli.sh private static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public ZkHelper() { try { zkClient = connect(); } catch (IOException | InterruptedException ioException) { ioException.printStackTrace(); } } public ZkHelper(Watcher watcher){ try { zkClient = new ZooKeeper(connectUri,sessionTimeout,watcher); } catch (IOException ioException) { ioException.printStackTrace(); } } /** * 建立一个{@link ZooKeeper}连接 * * @return {@link ZooKeeper}连接 * @throws IOException IO异常 * @throws InterruptedException 中断异常 */ private ZooKeeper connect() throws IOException, InterruptedException { zkClient = new ZooKeeper(connectUri, sessionTimeout, event -> { log.debug("ZooKeeper客户端初始化"); //收到事件通知后的回调函数(用户的业务逻辑) log.debug("事件信息:事件类型{}--事件发生的结点的路径{}--服务器状态{}", event.getType(), event.getPath(), event.getState()); if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { //只有回调的状态值 log.debug("客户端建立与服务器的连接"); connectedSemaphore.countDown();//只有连接建立了才释放锁,让主线程继续运行 } }); connectedSemaphore.await(); //在主线程中堵塞,等待连接建立好 log.debug("客户端主线程运行完"); return zkClient; } /** * 关闭连接 */ public void close() throws InterruptedException { zkClient.close(); } /** * 拿到连接uri */ public String getConnectUri() { return connectUri; } /** * 拿到zookeeper连接 * * @return zkClient */ public ZooKeeper getZookeeper() { return zkClient; } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    在上面的代码中已经有连接的方法了,这里有一点需要注意,就是第十七行代码中的CountDownLatch,是Java JUC中的一个类,主要作用是让一个或多个线程等待,一直等到其他线程中执行完成一组操作。有countDown方法和await方法,CountDownLatch在初始化时,需要指定用给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。

    我们看一下Zookeeper的构造器:

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    
    • 1

    他会传入一个监视器,可以想到这里肯定不会只有主线程一个线程,如果我们不用CountDownLatch堵塞主线程的话,可能zookeeper还没有初始化话线程就已经执行完了,所以这里我们用CountDownLatch做通知,当new ZooKeeper执行完后再通知主线程继续往下执行

    连接zookeeper

    @Slf4j(topic = "Test1_ZkHelper")
    public class Test1_ZkHelper {
        public static void main(String[] args) {
            ZkHelper zkHelper = new ZkHelper();
            log.debug("客户端运行完毕,关闭连接成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    执行结果:

    在这里插入图片描述

    创建结点

    这里东西有点多,我们对应着Linux命令一个个实现

    格式:create [-s] [-e] [-c] [-t ttl] path [data] [acl]
    解释: 
    [-s] : 创建有序结点
    [-e] : 创建临时结点
    [-c] : 创建一个容器结点,容器结点主要用来容纳字结点,如果没有给其创建子结点,容器结点表现和持久化结点一样,如果给容器结点创建了子结点,后续又把子结点清空,容器结点也会被zookeeper删除。定时任务默认 60s 检查一次
    [t ttl] : 创建一个TTL结点, -t 时间(单位毫秒)
    path: 路径 ,因为没有中括号,所以是必须参数。
    [data]:结点的数据,可选,如果不使用时,结点数据就为null
    [acl] :权限相关,后面文章会专门讲
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们先看一个zookeeper连接对象里面的create方法,一共有三个

    public String create(final String path,byte[] data,List<ACL> acl,CreateMode createMode)
    public String create(final String path,byte[] data,List<ACL> acl,CreateMode createMode,Stat stat)
    public String create(final String path,byte[] data,List<ACL> acl,CreateMode createMode,Stat stat,long ttl) 
    
    • 1
    • 2
    • 3

    前面有提过,Acl可以设置多个,用逗号隔开,在这里我们传入一个List集合,表示我们的控制权限

    我们来看一下Acl的构造函数:

    public ACL(int perms,Id id)
    
    • 1

    我们一个个来看具体的参数:

    • int perms:其实就是permission ,文件的权限,取值包括read、write、create、delete、admin(rwcda)这里用数字表示:

      READ = 1  //二进制
      WRITE = 10
      CREATE = 100
      DELETE = 1000
      ADMIN = 10000
      READ | WRITE | CREATE | DELETE | ADMIN -> 运算结果为: 11111 即31,表示所有权限
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • Id id:代表授权的对象,值依赖于schema

      public Id(String sch,String id) //例如world模式下为 -> Id("world", "anyone")
      
      • 1

    综上所示,如果我们想要创造一个默认的结点(无序、持久、任何人都拥有所有权限),我们得这样写:

    Linux -> create /node
    java中:
    ArrayList<ACL> acls = new ArrayList<>(Collections.singletonList(new ACL(31, new Id("world", "anyone"))));
    zooKeeper.create("/node","结点".getBytes(),acls, CreateMode.PERSISTENT);
    
    • 1
    • 2
    • 3
    • 4

    看上去有一点点繁琐,所有zookeeper官方为我们封装了一个类ZooDefs.Ids,用来表示访问的权限,我们看一下Ids,是一个接口

    public interface Ids {
        ANYONE_ID_UNSAFE   //所有用户可以访问,其实就是  -> Id("world", "anyone")
        AUTH_IDS 		   //auth认证 -> new Id("auth", "")
        OPEN_ACL_UNSAFE    //开放所有权限,相当于 -> world:anyone:rwcda
        CREATOR_ALL_ACL    //给创建结点的用户赋予所有权限
        READ_ACL_UNSAFE    //任何人都只能读
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    有了这个类,上面的代码我们可以这样写:

    zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
    
    • 1

    当然只封装了一些简单的模式,复杂模式还是需要我们自己手动创建Acl,但是懂的了原理就很简单了

    这里简单封装一下ACL的四种模式:

    //world授权模式
    ArrayList<ACL> acls = new ArrayList<>(Collections.singletonList(new ACL(31, new Id("world", "anyone"))));
    //ip授权模式
    ArrayList<ACL> acls = new ArrayList<>(Collections.singletonList(new ACL(31, new Id("ip", "127.0.0.1"))));
    //auth模式
    zooKeeper.addAuthInfo("digest","用户名:密码".getBytes()) //先添加
    ArrayList<ACL> acls = new ArrayList<>(Collections.singletonList(new ACL(31, new Id("auth", ""))));
    //digest模式
    ArrayList<ACL> acls = new ArrayList<>(Collections.singletonList(new ACL(31, new Id("digest", "用户名:加密后的密码"))));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    CreateMode是一个枚举类型,用来表示创建结点的类型,包括有序、无序、持久、临时

    枚举对应取值有:

    单词中文含义
    PERSISTENT(persistent)持久
    EQUENTIAL(ephemeral)临时
    SEQUENTIAL(sequential)有序
    public enum CreateMode {
        PERSISTENT(0, false, false, false, false),              //持久
        PERSISTENT_SEQUENTIAL持久(2, false, true, false, false), //持久且有序
        EPHEMERAL(1, true, false, false, false),                //临时
        EPHEMERAL_SEQUENTIAL(3, true, true, false, false),      //临时且有序
        CONTAINER(4, false, false, true, false),                //容器结点
        PERSISTENT_WITH_TTL(5, false, false, false, true),      //带TTL的持久结点
        PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true); //带TTL的持久有序结点
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    讲了很多理论知识,现在开始实践一下

    创建无序持久结点

    在ZkHelper中添加方法

    /**
     * 创建结点
     *
     * 

    先复习一下zk中的ACL情况:格式为 schema:id:permission
    * 对应含义为:

      *
    • schema可选:world ip digest auth
    • *
    • id可选: anyone ip地址 用户名:密码 用户
    • *
    • permission权限列表: {@code cdrwa}
    * *

    {@link ZooDefs.Ids#ANYONE_ID_UNSAFE}表示[world:anyone]
    * {@link ZooDefs.Ids#OPEN_ACL_UNSAFE}:ANYONE_ID_UNSAFE + * {@link org.apache.zookeeper.ZooDefs.Perms#ALL}[READ | WRITE | CREATE | DELETE | ADMIN] * *

    授予权限详情为:

      *
    • READ = 1 << 0 任为1
    • *
    • WRITE = 1 << 1 为10
    • *
    • CREATE = 1 << 2 为100
    • *
    • DELETE = 1 << 3 为1000
    • *
    • ADMIN = 1 << 4 为10000
    • *
    • READ | WRITE | CREATE | DELETE | ADMIN -> 运算结果为: 11111 即31,表示所有权限
    * * @param path 结点路径 * @param data 结点中存放的数据 */
    public String create(String path, byte[] data, ArrayList<ACL> aclList, CreateMode createMode){ //先要拿到连接 String resultPath = null; try { resultPath = zkClient.create(path, data, aclList, createMode); log.debug("结点[{}]创建成功", resultPath); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return resultPath; } /** * 创建结点并设置访问默认权限,即world:anyone:rwcda * *

    权限设置详情{@link com.hnit.zookeeper.util.ZkHelper#create} * * @param path 结点路径 * @param data 结点中存放的数据 */ public void createAndSetDefaultAcl(String path, byte[] data) { create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    测试一下:

    @Slf4j(topic = "Test2_create")
    public class Test2_create {
        public static void main(String[] args) throws Exception {
            ZkHelper zkHelper = new ZkHelper();
            zkHelper.createAndSetDefaultAcl("/MyFirstZkNode","This is my first Zookeeper Node!".getBytes());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    在这里插入图片描述

    常见有序持久结点

    无序持久结点是默认的结点类型,其他的结点也可以封装方法,或者直接用最原始的create方法,通过CreateMode控制就好了

    public class Test2_create {
        public static void main(String[] args) throws Exception {
            ZkHelper zkHelper = new ZkHelper();
            zkHelper.create("/node","结点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    创建auth模式结点

    在ZkHelper中添加方法

    /**
     * 添加认证信息
     *
     * @param scheme 例如:"digest"
     * @param auth 例如:"用户名:密码"
     */
    public void addAuthInfo(String scheme, byte[] auth) {
        zkClient.addAuthInfo(scheme,auth);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    测试:

    public class Test2_create {
        public static void main(String[] args) throws Exception {
            ZkHelper zkHelper = new ZkHelper();
            //添加认证信息
            zkHelper.addAuthInfo("digest","zjp:123".getBytes());
            //注意要选CREATOR_ALL_ACL,只有创建者才有权限,创建了有序持久结点
            zkHelper.create("/node","结点".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT_SEQUENTIAL);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    结果:
    在这里插入图片描述

    在这里插入图片描述

    其他类型结点可以执行封装工具类

    查看信息

    查看的命令为:

    get [-s] [-w] path //查看结点存储的值及其结点状态
    stat [-w] path //查看结点状态
    ls [-s] [-w] [-R] path //查看某一结点下的子结点
    
    • 1
    • 2
    • 3

    这里封装的方法就有点多了,先封装get和stat方法

    ZkHelper添加以下方法

    /**
     * 获取指定结点的值
     *
     * @param path 结点路径
     * @param watcher 监听器
     * @return 返回byte[]类型的结点信息
     */
    public byte[] getData(String path) {
        byte[] data = null;
        try {
            Stat stat;
            if ((stat = zkClient.exists(path, false)) != null) {
                data = zkClient.getData(path, false, stat);
            } else {
                log.error("znode:{}不存在",path);
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("取到znode:" + path + "出现问题!!", e);
        }
        return data;
    }
    
    /**
     * 获取指定结点的值
     *
     * @param path 结点路径
     * @return 返回结点信息
     */
    public String get(String path) {
        String data = null;
        try {
            Stat stat;
            if ((stat = zkClient.exists(path, false)) != null) {
                byte[] bt = zkClient.getData(path, false, stat);
                if(bt != null){
                    data = new String(bt,StandardCharsets.UTF_8);
                }else {
                    log.info("该结点{}中没有数据",path);
                }
            } else {
                log.error("znode:{}不存在",path);
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("取到znode:" + path + "出现问题!!", e);
        }
        return data;
    }
    
    /**
     * 根据结点路径返回Stat对象
     */
    public Stat getStat(String path){
        Stat stat = null;
        try {
            stat = zkClient.exists(path, false);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        if (stat == null) {
            throw new RuntimeException("node路径[" + path + "]不存在");
        }
        return stat;
    }
    /**
     * 获取state格式化的信息
     */
    public String getStatInfo(String path){
        Stat stat;
        stat = getStat(path);
        assert stat != null;
        return printZnodeInfo(stat);
    }
    
    /**
     * 格式化{@link Stat} 信息
     *
     * @param stat {@link Stat}
     * @return 返回格式化信息
     */
    public static String printZnodeInfo(Stat stat) {
        SimpleDateFormat df = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
        StringBuilder sb = new StringBuilder();
        sb.append("\n*******************************\n");
        sb.append("创建znode的事务id czxid:").append(stat.getCzxid()).append("\n");
        //格式化时间
        sb.append("创建znode的时间 ctime:").append(df.format(stat.getCtime())).append("\n");
        sb.append("更新znode的事务id mzxid:").append(stat.getMzxid()).append("\n");
        sb.append("更新znode的时间 mtime:").append(df.format(stat.getMtime())).append("\n");
        sb.append("更新或删除本节点或子节点的事务id pzxid:").append(stat.getPzxid()).append("\n");
        sb.append("子节点数据更新次数 cversion:").append(stat.getCversion()).append("\n");
        sb.append("本节点数据更新次数 dataVersion:").append(stat.getVersion()).append("\n");
        sb.append("节点ACL(授权信息)的更新次数 aclVersion:").append(stat.getAversion()).append("\n");
        if (stat.getEphemeralOwner() == 0) {
            sb.append("本节点为持久节点\n");
        } else {
            sb.append("本节点为临时节点,创建客户端id为:").append(stat.getEphemeralOwner()).append("\n");
        }
        sb.append("数据长度为:").append(stat.getDataLength()).append("字节\n");
        sb.append("子节点个数:").append(stat.getNumChildren()).append("\n");
        sb.append("\n*******************************\n");
        return sb.toString();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    测试:

    @Slf4j(topic = "Test3_Stat")
    public class Test3_Stat {
        public static void main(String[] args) {
            ZkHelper zkHelper = new ZkHelper();
            String str1 = zkHelper.get("/MyFirstZkNode");
            log.debug(str1);
            log.debug("==================================================");
            String statInfo = zkHelper.getStatInfo("/MyFirstZkNode");
            log.debug(statInfo);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    接下来是ls [-s] [-w] [-R] path

    /**
     * 获得子结点路径,只会显示一层
     */
    public List<String> getChildren(String path){
        List<String> childrenList = null;
        try {
            childrenList = zkClient.getChildren(path, false);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return childrenList;
    }
    
    /**
     * 将当前路径及其子路径呈树状结构递归打印出来
     *
     * 

    打印结果示例: * *

     * /
     * ----/node
     * --------/node/node2
     * --------/node/node3
     * --------/node/node1
     * ------------/node/node1/node1_1
     * ----/zookeeper
     * --------/zookeeper/config
     * --------/zookeeper/quota
     * ----/test
     * ----/MyFirstZkNode
     * 
    * * @param path 当前结点路径 * @param level 表示当前路径在第几层,默认第0层 */
    public void showTree(String path, int level) { List<String> childList;//子路径 //每递归一级打印一段占位符 for (int i = 0; i < level; i++) { System.out.print("----"); } //打印已经有的路径 System.out.println(path); //递归打印树状结构 childList = getChildren(path); if(!childList.isEmpty()){ childList.forEach(sonPath -> { if(level == 0){ //第0层是/, / + xxx showTree(path + sonPath, level + 1); }else { //其他层为 xxx showTree(path + "/" + sonPath, level + 1); } }); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    测试:

    public class Test4_showTree {
        public static void main(String[] args) {
            ZkHelper zkHelper = new ZkHelper();
            List<String> children = zkHelper.getChildren("/");
            children.forEach(System.out::println);
            zkHelper.showTree("/",0);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    结果:

    image-20220510195140267

    其他方法可以执行封装和测试

    删除结点

    /**
     * 删除结点,如果包含 / ,则递归删除
     *
     * @param path 结点
     * @return true 删除键结点成功  false表示结点不存在
     */
    public boolean delete(String path) {
        try {
            Stat stat;
            if ((stat = zkClient.exists(path, true)) != null) {
                List<String> subPaths = zkClient.getChildren(path, false);
                if (subPaths.isEmpty()) {
                    zkClient.delete(path, stat.getVersion());
                    return true;
                } else {
                    for (String subPath : subPaths) {
                        delete(path + "/" + subPath);
                    }
                }
            }
        } catch (InterruptedException | KeeperException e) {
            throw new RuntimeException("删除znode:" + path + "出现问题!!", e);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    更新结点

    /**
     * 更新指定结点的值
     *
     * @param path 结点路径
     * @param data 新的值
     */
    public boolean update(String path, String data) {
        //更新源数据最新版本的值,不设置监视器
        try {
            Stat stat;
            if ((stat = zkClient.exists(path, true)) != null) {
                zkClient.setData(path, data.getBytes(), stat.getVersion());
                return true;
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("修改znode:" + path + "出现问题!!", e);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    设置监视器

    监视器是zookeeper中非常重要的组件,可以说zookeeper所有的特性都离不开监视器!

    我们观察API可以发现很多地方都可以传入监视器Watcher,但是监视的内容不太一样

    stat path[watch]: 对当前结点更新数据起作用
    get path[watch]: 对当前结点更新数据起作用
    ls path[watch]: 对创建、删除子结点事件起作用
    
    • 1
    • 2
    • 3

    这里简单演示一下,在ZkHelper中添加

    /**
     * 获取结点信息并绑定监听器
     *
     * 

    事件绑定类型:

      * *
    • stat path[watch]: 对当前结点更新数据起作用
    • *
    • get path[watch]: 对当前结点更新数据起作用
    • *
    • ls path[watch]: 对创建、删除子结点事件起作用
    • *
    • ls2 path[watch]: 对创建、删除子结点事件起作用
    * * @param path 要访问结点路径 * @param watcher 监听器 */
    public byte[] getAndSetWatch(String path, Watcher watcher) { Stat stat = getStat(path); byte[] data = null; try { data = zkClient.getData(path, watcher, stat); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return data; }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    测试

    @Slf4j(topic = "Test8_watch")
    public class Test8_watch implements Watcher {
        private static final String path = "/node";
        private static final CountDownLatch countDownLatch = new CountDownLatch(1);
        private static ZkHelper zkHelper;
    
        public static void main(String[] args) throws InterruptedException {
            Test8_watch myWatcher = new Test8_watch();
            zkHelper = new ZkHelper();
            byte[] andSetWatch = zkHelper.getAndSetWatch(path, myWatcher);
            countDownLatch.await();
        }
    
        @Override
        public void process(WatchedEvent event) {
            //监听结点改变事件
            if (event.getType() == Event.EventType.NodeDataChanged) {
                Stat stat = new Stat();
                byte[] data = null;
                data = zkHelper.getData(path);
                assert data != null;
                String dataStr = new String(data, StandardCharsets.UTF_8);
                log.debug("监听此节点[{}]的新数据[{}]", path, dataStr);
                log.debug("当前结点stat新信息为:\n{}", ZkHelper.printZnodeInfo(stat));
                countDownLatch.countDown();
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                //子结点发生改变
                log.debug("子结点发生改变,类型为[{}]", event.getType());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    结果(可以看到只对更新结点起作用):

    在这里插入图片描述

  • 相关阅读:
    配置nginx常用命令
    android事件分发机制详解
    MIT 6.S081 Operating System Lecture5 (随意的笔记)
    Ubuntu16.04离线安装socat
    必不可少的UI组件二——组件库开发的基础知识(工程化篇)
    idea工具配置隐藏文件及文件夹
    kafka入门(一):kafka消息发送与消费
    apache dophinscheduler 3.1.0 编译
    C语言中操作符的详细介绍
    二叉搜索树(Binary Search Tree,BST)
  • 原文地址:https://blog.csdn.net/qq_56265207/article/details/136245279