文章目录
一 、Curator API 增删改查
本文主讲 Curator API的使用。
关于 Curator的介绍,请参考:
Zookeeper原生Java API、ZKClient和Apache Curator 区别对比 :https://blog.csdn.net/xiaojin21cen/article/details/88538102
Apache Curator 的简单介绍: https://blog.csdn.net/xiaojin21cen/article/details/88538102
1.1、maven 依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
1.2、重试策略
Curator 四种重连策略 :
-
ExponentialBackoffRetry
指定重试的次数, 且每一次重试之间停顿的时间逐渐增加。 -
RetryNTimes
指定最大重试次数的重试策略 -
RetryOneTime
重试一次,简单粗暴 -
RetryUntilElapsed
一直重试直到达到规定的时间
1.2.1、ExponentialBackoffRetry(使用人较多) 指定重试的次数
指定重试的次数, 且每一次重试之间停顿的时间逐渐增加。
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
参数说明:
- baseSleepTimeMs :初始 sleep 时间 (毫秒)
- maxRetries : 最大重试次数
- maxSleepMs 每次重试的最大睡眠时间(毫秒)。此参数不指定,默认是
Integer.MAX_VALUE
时间间隔 的公式计算:
时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt( 1<<(retryCount+1)) )
说明:
(1<<(retryCount+1) )的取值是 2,4,8,16。(retryCount>=0 )
随着重试次数的增加,计算出来的sleep 时间会越来越大。如果sleep 时间在 maxSleepMs 的范围内,那么就使用该 sleep 时间,否则使用;maxSleepMs 。另外,maxSleepMs 参数控制了最大重试次数,以避免无制的重试。
1.2.2、RetryNTimes 指定最大重试次数的重试策略
RetryNTimes(int n, int sleepMsBetweenRetries)
1.2.3、RetryOneTime 重连一次,简单粗暴
RetryOneTime(int sleepMsBetweenRetry)
1.2.4、RetryUntilElapsed 一直重试直到达到规定的时间
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
1.3、连接
@Before
public void before() {
// 1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 重试策略,上面讲过了。
// 2 通过工厂创建连接
cf = CuratorFrameworkFactory.builder() //
.connectString("127.0.0.1:2181") //连接地址
.sessionTimeoutMs(10000) //超时时间
.retryPolicy(retryPolicy) //重试策略
//.namespace("super") //命名空间
.build();//
// 3 开启连接
cf.start();
System.out.println("-------连接成功----");
}
1.4、新增
@Test
public void test2() throws Exception {
// 4 建立节点。指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
byte[] data = "c1内容".getBytes();
cf.create() //
.creatingParentsIfNeeded() //级联创建。如果父节点不存在,则将父节点、父父节点一块创建
.withMode(CreateMode.PERSISTENT) //节点类型
.forPath("/super/c1", data);
}
-
creatingParentsIfNeeded():递归创建父节点。创建节点时,如果其父节点不存在,也一并创建。如果不加此方法时,父节点不存在时,则抛异常。
-
withMode(CreateMode.PERSISTENT) : 节点类型。默认是PERSISTENT类型。
CreateMode 中的类型有:
- PERSISTENT :持久化节点
- EPHEMERAL :临时时节点
- PERSISTENT_SEQUENTIAL :持久化顺序节点
- EPHEMERAL_SEQUENTIAL :监时顺序节点
-
forPath(String path, byte[] data) :path 表示 节点路径;data 表示节点内容。
1.5、修改节点
@Test
public void test3_2() throws Exception {
// 修改节点
cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
String ret2 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret2);
}
参数说明:
- setData() :修改节点
1.6、删除节点
@Test
public void test2_2() throws Exception {
// 5 删除节点
cf.delete() //
//.guaranteed() //确保节点被删除
//.withVersion(-1) //指定删除的哪个版本
.deletingChildrenIfNeeded() //递归删除
.forPath("/super");//
}
参数说明:
- guaranteed() :确保节点被删除
- withVersion(-1): 指定删除的哪个版本
- deletingChildrenIfNeeded() : 递归删除。包括子节点。
1.7、读取节点内容:
/** * 创建节点、读取节点内容 */
@Test
public void test3() throws Exception {
// 创建节点
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/super/c2", "c2内容".getBytes());
// 读取节点内容
String ret1 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret1);
}
参数说明 :
- getData() : 读取节点内容
- forPath(String path) :节点路径
1.8、异步的回调函数
原生的ZooKeeper 的CRUD API有同步和异步之分,对于异步API,需要传递 AsyncCallback回调,对于getData,getChildren,exists这三个API,还可以设置Watcher。
在Curator中也有也有 2种类型 的异步API ,如下:
- inBackground( 回调函数 )
- getListenable().addListener( 回调函数 )
getListenable().addListener() 以后再说, 本文只讲 inBackground() 的使用。
inBackground() 的方法:
public T inBackground();
public T inBackground(Object context);
public T inBackground(BackgroundCallback callback);
public T inBackground(BackgroundCallback callback, Executor executor);
1.8.1、BackgroundCallback 接口
public interface BackgroundCallback{
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
参数说明:
- CuratorFramework 当前客户端实例
- CuratorEvent 服务端事件
1.8.1.1、CuratorEventType 事件类型
CuratorEvent 中的 CuratorEventType( 事件类型):
CREATE,
DELETE,
EXISTS,
GET_DATA,
SET_DATA,
CHILDREN,
SYNC,
GET_ACL,
SET_ACL,
WATCHED,
CLOSING
1.8.1.2、getResultCode 响应码
0 (OK)、-4 (CollectionLoss)、-110 (NodeExists)、-112 (SessionExpired)
1.8.2、Executor 参数重点说明
在ZooKeeper 中,所有异步通知都是由EventThread 线程来处理的 —— EventThread 线程用于串行处理所有的事件通知。
EventThread 的“串行处理机制” 在绝大部分应用场景下能够保证事件处理的顺序性,但是也有弊端,就是一旦碰到复杂的处理单元,就会消耗过长的处理时间,从而影响对其他事件的处理。因此 inBackground 接口中,允许传入一个 Executor 实例,就可以把一些复杂的事件处理放到一个专门的线程池中。
1.8.3、示例:
@Test
public void test4() throws Exception {
ExecutorService pool = Executors.newCachedThreadPool(); //创建线程池
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) //创建节点
.inBackground(new BackgroundCallback() {
//指定回调函数
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("线程为:" + Thread.currentThread().getName());
}
}, pool) //指定池程池
.forPath("/super/c3", "c3内容".getBytes());//
Thread.sleep(Integer.MAX_VALUE);
}
运行结果:
-------连接成功----
code:0
type:CREATE
线程为:pool-3-thread-1
1.9、读取子节点
@Test
public void test5() throws Exception {
List<String> list = cf.getChildren().forPath("/super");
for(String p : list){
System.out.println(p);
}
}
1.10、判断节点是否存在
@Test
public void test6() throws Exception {
Stat stat = cf.checkExists().forPath("/super/c3");
System.out.println(stat);
}
运行结果:
如果节点存在:
631,631,1557570121762,1557570121762,0,0,0,0,8,0,631
如果节点不存在:
null
二、示例代码
2.1、示例代码1
package com.aop8.curator.base;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorBase_junit {
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;// ms
static CuratorFramework cf = null;
@Before
public void before() {
// 1 重试策略:初次间隔时间为1s(1000毫秒), 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
// 2 通过工厂创建连接
cf = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy)
// .namespace("super")
.build();
// 3 开启连接
cf.start();
System.out.println("-------连接成功----");
}
@After
public void after() {
cf.close();
System.out.println("======关闭连接功=====");
}
/** * 查看状态 */
@Test
public void test1() {
System.out.println(States.CONNECTED);
System.out.println(cf.getState());
}
/** * 新增、删除 * <pre> * creatingParentsIfNeeded():级联创建节点。创建节点时,如果父节点不存在,也一并创建 * deletingChildrenIfNeeded:级联删除节点。删除节点时,如果存在子节点,也级联删除 * </pre> */
@Test
public void test2() throws Exception {
// 4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
// withMode()方法指定创建的节点类型,跟原生的Zookeeper API一样,默认为PERSISTENT类型。
cf.create().withMode(CreateMode.PERSISTENT).forPath("/super0", "c0内容".getBytes());
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/super/c1", "c1内容".getBytes());
Thread.sleep(1000*30);//休眠30S后,删除节点
// 5 删除节点
cf.delete()
.guaranteed()
//.withVersion(version)
.deletingChildrenIfNeeded()
.forPath("/super");
}
/** * 创建节点、读取节点内容 */
@Test
public void test3() throws Exception {
// 创建节点
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/super/c1", "c1内容".getBytes());
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/super/c2", "c2内容".getBytes());
// 读取节点
String ret1 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret1);
}
/** * 修改节点、读取 */
@Test
public void test3_2() throws Exception {
// 修改节点
cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
String ret2 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret2);
}
/** * 绑定回调函数 */
@Test
public void test4() throws Exception {
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)//
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("线程为:" + Thread.currentThread().getName());
}
}, pool)//
.forPath("/super/c3", "c3内容".getBytes());//
Thread.sleep(Integer.MAX_VALUE);
}
/** * 子节点 */
@Test
public void test5() throws Exception {
List<String> list = cf.getChildren().forPath("/super");
for(String p : list){
System.out.println(p);
}
}
/** * 判断节点是否存在 */
@Test
public void test6() throws Exception {
Stat stat = cf.checkExists().forPath("/super/c4");
System.out.println(stat);
}
@Test
public void test7() throws Exception {
Thread.sleep(2000);
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
}
}
2.2、示例代码2
主要依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
<!-- 时间操作组件 -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<!-- Apache工具组件 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
package com.imooc.curator;
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorOperator2 {
public CuratorFramework curatorFramework = null;
public static final String zkServerPath = "127.0.0.1:2181";
@Before
public void testCuratorFramework(){
test1();
}
//@Test
public void test1(){
/** * 同步创建zk示例,原生api是异步的 * * curator链接zookeeper的策略:ExponentialBackoffRetry * baseSleepTimeMs:初始sleep的时间 * maxRetries:最大重试次数 * maxSleepMs:最大重试时间 */
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/** * curator链接zookeeper的策略:RetryNTimes * n:重试的次数 * sleepMsBetweenRetries:每次重试间隔的时间 */
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/** * curator链接zookeeper的策略:RetryOneTime * sleepMsBetweenRetry:每次重试间隔的时间 */
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/** * 永远重试,不推荐使用 */
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/** * curator链接zookeeper的策略:RetryUntilElapsed * maxElapsedTimeMs:最大重试时间 * sleepMsBetweenRetries:每次重试间隔 * 重试时间超过maxElapsedTimeMs后,就不再重试 */
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
curatorFramework.start();
// 实例化
boolean isZkCuratorStarted = curatorFramework.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
}
/** * * @Description: 关闭zk客户端连接 */
@After
public void closeZKClient() {
if (curatorFramework != null) {
this.curatorFramework.close();
}
boolean isZkCuratorStarted2 = curatorFramework.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
}
// 创建节点
String nodePath = "/super/imooc";
@Test
public void test2() throws Exception{
byte[] data = "superme".getBytes();
curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
}
@Test
public void test3() throws Exception{
// 更新节点数据
byte[] newData = "batman".getBytes();
curatorFramework.setData().withVersion(0).forPath(nodePath, newData);
}
@Test
public void test33() throws Exception{
// 删除节点
curatorFramework.delete()
.guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,就删除
.withVersion(4)
.forPath(nodePath);
}
@Test
public void test4() throws Exception{
// 读取节点数据
Stat stat = new Stat();
byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());
}
@Test
public void test5() throws Exception{
// 查询子节点
List<String> childNodes = curatorFramework.getChildren()
.forPath(nodePath);
System.out.println("开始打印子节点:");
for (String s : childNodes) {
System.out.println(s);
}
}
@Test
public void test6() throws Exception{
// 判断节点是否存在,如果不存在则为空
Stat statExist = curatorFramework.checkExists().forPath(nodePath + "/abc");
System.out.println(statExist);
}
@Test
public void test7() throws Exception{
// watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
curatorFramework.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
curatorFramework.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
Thread.sleep(100000);
}
@Test
public void test8() throws Exception{
// 为节点添加watcher
// NodeCache: 监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(curatorFramework, nodePath);
// buildInitial : 初始化的时候获取node的值并且缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("空");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
}
});
Thread.sleep(100000);
}
@Test
public void test9() throws Exception{
// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,会触发事件
String childNodePathCache = nodePath;
// cacheData: 设置缓存节点的数据状态
final PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, childNodePathCache, true);
/** * StartMode: 初始化方式 * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 * NORMAL:异步初始化 * BUILD_INITIAL_CACHE:同步初始化 */
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("添加不正确...");
}
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});
Thread.sleep(100000);
}
public final static String ADD_PATH = "/super/imooc/d";
}
三、参考
《从Paxos到Zookeeper–分布式一致性原理与实践》 -2015年2月
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/21994.html