Apache Curator操作zookeeper的API使用——watcher「终于解决」

Apache Curator操作zookeeper的API使用——watcher「终于解决」curator在注册watch事件上,提供了一个usingWatcher方法,使用这个方法注册的watch事件和默认watch事件一样,监听只会触发一次,监听完毕后就会销毁,也就是一次性的。而这个方法有两种参数可选,一个是zk原生API的Watcher接口的实现类,另一个是Curator提供的CuratorWatcher接口的实现类,不过在usingWatcher方法上使用哪一个效果都是一样的,都…

大家好,欢迎来到IT知识分享网。

curator在注册watch事件上,提供了一个usingWatcher方法,使用这个方法注册的watch事件和默认watch事件一样,监听只会触发一次,监听完毕后就会销毁,也就是一次性的。而这个方法有两种参数可选,一个是zk原生API的Watcher接口的实现类,另一个是Curator提供的CuratorWatcher接口的实现类,不过在usingWatcher方法上使用哪一个效果都是一样的,都是一次性的

新建一个 MyWatcher 实现类,实现 Watcher 接口。代码如下:

package org.zero01.zk.curator;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * @program: zookeeper-connection
 * @description:  zk原生API的Watcher接口实现
 * @author: 01
 * @create: 2018-04-28 13:41
 **/
public class MyWatcher implements Watcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) {
        System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath());
    }
}

新建一个 MyCuratorWatcher 实现类,实现 CuratorWatcher 接口。代码如下:

package org.zero01.zk.curator;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

/**
 * @program: zookeeper-connection
 * @description: Curator提供的CuratorWatcher接口实现
 * @author: 01
 * @create: 2018-04-28 13:40
 **/
public class MyCuratorWatcher implements CuratorWatcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath());
    }
}

修改 CuratorConnect 类的main方法代码如下,因为在usingWatcher方法上使用一个接口的实现类效果都是一样的,所以这里就只演示其中一种。代码如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 获取当前客户端的状态
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));

        // 节点路径
        String nodePath = "/super/testNode";

        // 添加 watcher 事件,当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
        curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        // curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

        Thread.sleep(100000);

        // 关闭客户端
        curatorConnect.closeZKClient();

        // 获取当前客户端的状态
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    }
}

运行该类,然后到zookeeper服务器上修改/super/testNode节点的数据:

[zk: localhost:2181(CONNECTED) 35] set /workspace/super/testNode new-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb0000002b
mtime = Sat Apr 28 21:40:58 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 3
[zk: localhost:2181(CONNECTED) 36] 

修改完成后,此时控制台输出内容如下,因为workspace是命名空间节点,所以不会被打印出来:

触发watcher,节点路径为:/super/testNode

curator之nodeCache一次注册N次监听

想要实现watch一次注册n次监听的话,我们需要使用到curator里的一个NodeCache对象。这个对象可以用来缓存节点数据,并且可以给节点添加nodeChange事件,当节点的数据发生变化就会触发这个事件。

我们依旧是使用之前的demo进行演示,修改 CuratorConnect 类中的main方法代码如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 获取当前客户端的状态
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));

        // 节点路径
        String nodePath = "/super/testNode";

        // NodeCache: 缓存节点,并且可以监听数据节点的变更,会触发事件
        final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);

        // 参数 buildInitial : 初始化的时候获取node的值并且缓存
        nodeCache.start(true);

        // 获取缓存里的节点初始化数据
        if (nodeCache.getCurrentData() != null) {
            System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("节点初始化数据为空...");
        }

        // 为缓存的节点添加watcher,或者说添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            // 节点数据change事件的通知方法
            public void nodeChanged() throws Exception {
                // 防止节点被删除时发生错误
                if (nodeCache.getCurrentData() == null) {
                    System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
                    return;
                }
                // 获取节点最新的数据
                String data = new String(nodeCache.getCurrentData().getData());
                System.out.println(nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
            }
        });

        Thread.sleep(200000);

        // 关闭客户端
        curatorConnect.closeZKClient();

        // 获取当前客户端的状态
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    }
}

运行该类后,我们到zookeeper服务器上,对/super/testNode节点进行如下操作:

[zk: localhost:2181(CONNECTED) 2] set /workspace/super/testNode change-data     
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000037
mtime = Sat Apr 28 23:49:42 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 3      
[zk: localhost:2181(CONNECTED) 3] set /workspace/super/testNode change-agin-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000038
mtime = Sat Apr 28 23:50:01 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 3
[zk: localhost:2181(CONNECTED) 8] delete /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 9] create /workspace/super/testNode test-data
Created /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 10]

输出

当前客户端的状态:连接中...
节点初始化数据为:new-data
/super/testNode 节点的数据发生变化,最新的数据为:change-data
/super/testNode 节点的数据发生变化,最新的数据为:change-agin-data
获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除
/super/testNode 节点的数据发生变化,最新的数据为:test-data
当前客户端的状态:已关闭...

从控制台输出的内容可以看到,只要数据发生改变了都会触发这个事件,并且是可以重复触发的,而不是一次性的。

curator之PathChildrenCache子节点监听

使用NodeCache虽然能实现一次注册n次监听,但是却只能监听一个nodeChanged事件,也就是说创建、删除以及子节点的事件都无法监听。如果我们要监听某一个节点的子节点的事件,或者监听某一个特定节点的增删改事件都需要借助PathChildrenCache来实现。从名称上可以看到,PathChildrenCache也是用缓存实现的,并且也是一次注册n次监听。当我们传递一个节点路径时是监听该节点下的子节点事件,如果我们要限制监听某一个节点,只需要加上判断条件即可。

我们这里演示简单子节点事件的监听,修改 CuratorConnect 类的main方法代码如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 获取当前客户端的状态
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));

        // 父节点路径
        String nodePath = "/super/testNode";

        // 为子节点添加watcher
        // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, nodePath, true);

        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前节点的子节点详细数据列表:");
        for (ChildData childData : childDataList) {
            System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData()));
        }

        // 添加事件监听器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通过判断event type的方式来实现不同事件的触发
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                    System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                    System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                }
            }
        });

        Thread.sleep(200000);

        // 关闭客户端
        curatorConnect.closeZKClient();

        // 获取当前客户端的状态
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    }
}

运行该类,然后到zookeeper服务器上执行如下操作:

[zk: localhost:2181(CONNECTED) 0] create /workspace/super/testNode/fourNode four-node-data
Created /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 1] set /workspace/super/testNode/oneNode change-node-data
cZxid = 0xc00000002
ctime = Sun Apr 29 18:23:57 CST 2018
mZxid = 0xc00000023
mtime = Sun Apr 29 20:16:22 CST 2018
pZxid = 0xc00000002
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] delete /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 5] create /workspace/super/testNode/fiveNode five-node-data
Created /workspace/super/testNode/fiveNode
[zk: localhost:2181(CONNECTED) 6] 

输出

当前客户端的状态:连接中...
当前节点的子节点详细数据列表:
    * 子节点路径:/super/testNode/oneNode,该节点的数据为:one-node-data
    * 子节点路径:/super/testNode/threeNode,该节点的数据为:three-node-data
    * 子节点路径:/super/testNode/twoNode,该节点的数据为:two-node-data

--------------

子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data

--------------

子节点:/super/testNode/oneNode 数据更新成功,子节点:/super/testNode/oneNode 新的数据为:change-node-data

--------------

子节点:/super/testNode/fourNode 删除成功

--------------

子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data
当前客户端的状态:已关闭...

以上的演示例子中为了获取子节点列表,所以我们的代码使用的是同步初始化模式。如果使用异步初始化是获取不到子节点列表的,例如修改childrenCache.start代码如下:

childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

这种模式会在初始化时触发子节点初始化以及添加子节点这两个事件,将初始化模式修改成POST_INITIALIZED_EVENT后,运行该类,此时控制台输出信息如下:

当前客户端的状态:连接中...
当前节点的子节点详细数据列表:

--------------

子节点:/super/testNode/threeNode 添加成功,该子节点的数据为:three-node-data

--------------

子节点:/super/testNode/twoNode 添加成功,该子节点的数据为:two-node-data

--------------

子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data

--------------

子节点:/super/testNode/oneNode 添加成功,该子节点的数据为:change-node-data

--------------

子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data

--------------

子节点初始化成功
当前客户端的状态:已关闭...

从控制台输出信息中可以看到,在这种模式下,子节点列表并没有被获取出来。除此之外,会触发添加子节点事件以及子节点初始化事件。因为缓存初始化时是把子节点添加到缓存里,所以会触发添加子节点事件,而添加完成之后,就会触发子节点初始化完成事件。

我们再来看看另一种异步初始化的模式:NORMAL模式,在这种模式下,同样的无法获取子节点列表,并且也会触发添加子节点事件,但是不会触发子节点初始化完成事件。修改childrenCache.start代码如下:

childrenCache.start(PathChildrenCache.StartMode.NORMAL);

将初始化模式修改成NORMAL后,运行该类,此时控制台输出信息如下:

当前客户端的状态:连接中...
当前节点的子节点详细数据列表:

--------------

子节点:/super/testNode/threeNode 添加成功,该子节点的数据为:three-node-data

--------------

子节点:/super/testNode/twoNode 添加成功,该子节点的数据为:two-node-data

--------------

子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data

--------------

子节点:/super/testNode/oneNode 添加成功,该子节点的数据为:change-node-data

--------------

子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data
当前客户端的状态:已关闭...

从控制台输出信息中可以看到,在这种模式下,子节点列表并没有被获取出来。除此之外,还会触发添加子节点事件。通常使用异步初始化的情况下,都是使用POST_INITIALIZED_EVENT模式,NORMAL较为少用

如果我们想要监听某一个特定的节点,例如我们要监听/super/testNode这个节点,那么以上面的代码作为例子,就需要把nodePath改为/super。然后在每一个判断条件中,再加上一个子判断条件,将节点限定为/super/testNode才会触发,这样就能实现监听某一个节点的增删改事件了。如下示例:

...
public class CuratorConnect {
    ...

    private static final String PARENT_NODE_PATH = "/super";  // 父节点
    private static final String NODE_PATH = "/super/testNode";  // 需要被特定监听的节点

    public static void main(String[] args) throws Exception {
        // 实例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 获取当前客户端的状态
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));

        // 为子节点添加watcher
        // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, PARENT_NODE_PATH, true);

        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 添加事件监听器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通过判断event type的方式来实现不同事件的触发
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                        System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                        System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                    }
                }
            }
        });

        Thread.sleep(200000);

        // 关闭客户端
        curatorConnect.closeZKClient();

        // 获取当前客户端的状态
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    }
}

这是最简单粗暴的方式了,当然在实际开发中,肯定会写得好一些,这个演示只是为了说明可以借助PathChildrenCache来实现某个特点节点的增删改事件监听。

zk-watcher应用实例之模拟统一更新N台节点的配置文件

zookeeper有一个比较常见的应用场景就是统一管理、更新分布式集群环境中每个节点的配置文件,我们可以在代码中监听集群中的节点,当节点数据发生改变时就同步到其他节点上。如下图:

Apache Curator操作zookeeper的API使用——watcher「终于解决」

因为我们使用的json作为节点存储的数据格式,所以需要准备一个工具类来做json与pojo对象的一个转换,也就是所谓的反序列化。创建一个 JsonUtils 类,代码如下:

package org.zero01.zk.util;

import java.util.List;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * @Title: JsonUtils.java
 * @Package org.zero01.zk.util
 * @Description: JSON/对象转换类
 */
public class JsonUtils {

    // 定义jackson对象
    private static final ObjectMapper MAPPER = new ObjectMapper();

    /**
     * 将对象转换成json字符串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
        try {
            String string = MAPPER.writeValueAsString(data);
            return string;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 将json结果集转化为对象
     *
     * @param jsonData json数据
     * @param beanType 对象中的object类型
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 将json数据转换成pojo对象list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
        JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
        try {
            List<T> list = MAPPER.readValue(jsonData, javaType);
            return list;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

创建一个pojo类,封装json格式的数据:

package org.zero01.zk.util;

public class RedisConfig {

    private String type;    // add 新增配置 update 更新配置 delete 删除配置
    private String url;        // 如果是add或update,则提供下载地址
    private String remark;    // 备注
    ... gtter stter 略 ...
}

然后创建客户端类,客户端类就是用来监听集群中的节点的。由于是模拟,所以这里的部分代码是伪代码。客户端类我们这里创建了三个,因为集群中有三个节点,由于代码基本上是一样的,所以这里只贴出客户端_1的代码。如下:

package org.zero01.zk.checkconfig;

import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

import org.zero01.zk.util.JsonUtils;
import org.zero01.zk.util.RedisConfig;

public class Client_1 {

    public CuratorFramework client = null;
    public static final String zkServerIp = "192.168.190.128:2181";

    // 初始化重连策略以及客户端对象并启动
    public Client_1() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerIp)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    // 关闭客户端
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    //  public final static String CONFIG_NODE = "/super/testNode/redis-config";
    public final static String CONFIG_NODE_PATH = "/super/testNode";
    public final static String SUB_PATH = "/redis-config";
    public static CountDownLatch countDown = new CountDownLatch(1);  // 计数器

    public static void main(String[] args) throws Exception {
        Client_1 cto = new Client_1();
        System.out.println("client1 启动成功...");

        // 开启子节点缓存
        final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
        childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

        // 添加子节点监听事件
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                // 监听节点的数据更新事件
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    String configNodePath = event.getData().getPath();
                    if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                        System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);

                        // 读取节点数据
                        String jsonConfig = new String(event.getData().getData());
                        System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);

                        // 从json转换配置
                        RedisConfig redisConfig = null;
                        if (StringUtils.isNotBlank(jsonConfig)) {
                            redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                        }

                        // 配置不为空则进行相应操作
                        if (redisConfig != null) {
                            String type = redisConfig.getType();
                            String url = redisConfig.getUrl();
                            String remark = redisConfig.getRemark();
                            // 判断事件
                            if (type.equals("add")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("监听到新增的配置,准备下载...");
                                // ... 连接ftp服务器,根据url找到相应的配置
                                Thread.sleep(500);
                                System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                                // ... 下载配置到你指定的目录
                                Thread.sleep(1000);
                                System.out.println("下载成功,已经添加到项目中");
                                // ... 拷贝文件到项目目录
                            } else if (type.equals("update")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("监听到更新的配置,准备下载...");
                                // ... 连接ftp服务器,根据url找到相应的配置
                                Thread.sleep(500);
                                System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                                // ... 下载配置到你指定的目录
                                Thread.sleep(1000);
                                System.out.println("下载成功...");
                                System.out.println("删除项目中原配置文件...");
                                Thread.sleep(100);
                                // ... 删除原文件
                                System.out.println("拷贝配置文件到项目目录...");
                                // ... 拷贝文件到项目目录
                            } else if (type.equals("delete")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("监听到需要删除配置");
                                System.out.println("删除项目中原配置文件...");
                            }
                            // TODO 视情况统一重启服务
                        }
                    }
                }
            }
        });

        countDown.await();

        cto.closeZKClient();
    }
}

完成以上代码的编写后,将所有的客户类都运行起来。然后到zookeeper服务器上,进行如下操作:

[zk: localhost:2181(CONNECTED) 14] set /workspace/super/testNode/redis-config {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000043
mtime = Mon Apr 30 01:52:35 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 75
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] set /workspace/super/testNode/redis-config {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000044
mtime = Mon Apr 30 01:53:46 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 81
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] set /workspace/super/testNode/redis-config {"type":"delete","url":"","remark":"delete"}   
cZxid = 0xc00000039               
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000045
mtime = Mon Apr 30 01:54:06 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 44
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

Apache Curator操作zookeeper的API使用——watcher「终于解决」

Apache Curator操作zookeeper的API使用——watcher「终于解决」

Apache Curator操作zookeeper的API使用——watcher「终于解决」

如上,从三个客户端的控制台输出信息可以看到,三个节点都进行了同样操作,触发了同样的watch事件,这样就可以完成统一的配置文件管理。

 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24754.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信