Curator API 节点的增删改查

Curator API 节点的增删改查文章目录一、curatorAPI增删改查1.1、maven依赖1.2、重试策略1.2.1、ExponentialBackoffRetry重试指定的次数1.2.2、RetryNTimes指定最大重试次数的重试策略1.2.3、RetryOneTime重连一次,简单粗暴1.2.4、RetryUntilElapsed一直重试直到达到规定的时间1.3、连接1.4、新增1.5、删除节…

大家好,欢迎来到IT知识分享网。Curator

一 、Curator API 增删改查

本文主讲 Curator API的使用。

关于 Curator的介绍,请参考:
Zookeeper原生Java API、ZKClient和Apache Curator 区别对比 :https://blog.csdn.net/xiaojin21cen/article/details/88538102
Apache Curator 的简单介绍: https://blog.csdn.net/xiaojin21cen/article/details/88538102

1.1、maven 依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.4.2</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.4.2</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.4.2</version>
</dependency>

1.2、重试策略

Curator 四种重连策略 :

  • ExponentialBackoffRetry 指定重试的次数, 且每一次重试之间停顿的时间逐渐增加。

  • RetryNTimes 指定最大重试次数的重试策略

  • RetryOneTime 重试一次,简单粗暴

  • RetryUntilElapsed 一直重试直到达到规定的时间

1.2.1、ExponentialBackoffRetry(使用人较多) 指定重试的次数

指定重试的次数, 且每一次重试之间停顿的时间逐渐增加。

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 

参数说明:

  • baseSleepTimeMs :初始 sleep 时间 (毫秒)
  • maxRetries : 最大重试次数
  • maxSleepMs 每次重试的最大睡眠时间(毫秒)。此参数不指定,默认是 Integer.MAX_VALUE

时间间隔 的公式计算:

时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt( 1<<(retryCount+1)) ) 

说明:
(1<<(retryCount+1) )的取值是 2,4,8,16。(retryCount>=0 )

随着重试次数的增加,计算出来的sleep 时间会越来越大。如果sleep 时间在 maxSleepMs 的范围内,那么就使用该 sleep 时间,否则使用;maxSleepMs 。另外,maxSleepMs 参数控制了最大重试次数,以避免无制的重试。

1.2.2、RetryNTimes 指定最大重试次数的重试策略

RetryNTimes(int n, int sleepMsBetweenRetries)

1.2.3、RetryOneTime 重连一次,简单粗暴

RetryOneTime(int sleepMsBetweenRetry)

1.2.4、RetryUntilElapsed 一直重试直到达到规定的时间

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

1.3、连接

@Before
public void before() { 
   
    // 1 重试策略:初试时间为1s 重试10次
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);  // 重试策略,上面讲过了。

    // 2 通过工厂创建连接
    cf = CuratorFrameworkFactory.builder() //
    	.connectString("127.0.0.1:2181")   //连接地址
    	.sessionTimeoutMs(10000)    //超时时间
    	.retryPolicy(retryPolicy)   //重试策略
   		 //.namespace("super") //命名空间
   		 .build();//
    
    // 3 开启连接
    cf.start();
    System.out.println("-------连接成功----");
}	

1.4、新增

@Test
public void test2() throws Exception { 
   
    // 4 建立节点。指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 
    byte[] data = "c1内容".getBytes();
    cf.create()      //
    	.creatingParentsIfNeeded()     //级联创建。如果父节点不存在,则将父节点、父父节点一块创建
    	.withMode(CreateMode.PERSISTENT)   //节点类型
    	.forPath("/super/c1", data);
}
  • creatingParentsIfNeeded():递归创建父节点。创建节点时,如果其父节点不存在,也一并创建。如果不加此方法时,父节点不存在时,则抛异常。

  • withMode(CreateMode.PERSISTENT) : 节点类型。默认是PERSISTENT类型。

    CreateMode 中的类型有:

    • PERSISTENT :持久化节点
    • EPHEMERAL :临时时节点
    • PERSISTENT_SEQUENTIAL :持久化顺序节点
    • EPHEMERAL_SEQUENTIAL :监时顺序节点
  • forPath(String path, byte[] data) :path 表示 节点路径;data 表示节点内容。

1.5、修改节点

@Test
public void test3_2() throws Exception { 
   
	// 修改节点
	cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
	
	String ret2 = new String(cf.getData().forPath("/super/c2"));
	System.out.println(ret2);
}

参数说明:

  • setData() :修改节点

1.6、删除节点

@Test
public void test2_2() throws Exception { 
   	
	// 5 删除节点
	cf.delete() //
		//.guaranteed() //确保节点被删除
		//.withVersion(-1) //指定删除的哪个版本
		.deletingChildrenIfNeeded()  //递归删除
		.forPath("/super");//
}

参数说明:

  • guaranteed() :确保节点被删除
  • withVersion(-1): 指定删除的哪个版本
  • deletingChildrenIfNeeded() : 递归删除。包括子节点。

1.7、读取节点内容:

/** * 创建节点、读取节点内容 */
@Test
public void test3() throws Exception { 
   
    // 创建节点 
    cf.create()
    	.creatingParentsIfNeeded()
    	.withMode(CreateMode.PERSISTENT)
    	.forPath("/super/c2", "c2内容".getBytes());

    // 读取节点内容
    String ret1 = new String(cf.getData().forPath("/super/c2"));
    System.out.println(ret1);
}

参数说明 :

  • getData() : 读取节点内容
  • forPath(String path) :节点路径

1.8、异步的回调函数

原生的ZooKeeper 的CRUD API有同步和异步之分,对于异步API,需要传递 AsyncCallback回调,对于getData,getChildren,exists这三个API,还可以设置Watcher。

在Curator中也有也有 2种类型 的异步API ,如下:

  • inBackground( 回调函数 )
  • getListenable().addListener( 回调函数 )

getListenable().addListener() 以后再说, 本文只讲 inBackground() 的使用。

inBackground() 的方法:

public T inBackground();

public T inBackground(Object context);

public T inBackground(BackgroundCallback callback);

public T inBackground(BackgroundCallback callback, Executor executor);

1.8.1、BackgroundCallback 接口

public interface BackgroundCallback{ 
      
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

参数说明:

  • CuratorFramework 当前客户端实例
  • CuratorEvent 服务端事件

1.8.1.1、CuratorEventType 事件类型

CuratorEvent 中的 CuratorEventType( 事件类型):

CREATE,
DELETE,
EXISTS,
GET_DATA,
SET_DATA,
CHILDREN,
SYNC,
GET_ACL,
SET_ACL,
WATCHED,
CLOSING

1.8.1.2、getResultCode 响应码

0 (OK)、-4 (CollectionLoss)、-110 (NodeExists)、-112 (SessionExpired)

1.8.2、Executor 参数重点说明

在ZooKeeper 中,所有异步通知都是由EventThread 线程来处理的 —— EventThread 线程用于串行处理所有的事件通知。

EventThread 的“串行处理机制” 在绝大部分应用场景下能够保证事件处理的顺序性,但是也有弊端,就是一旦碰到复杂的处理单元,就会消耗过长的处理时间,从而影响对其他事件的处理。因此 inBackground 接口中,允许传入一个 Executor 实例,就可以把一些复杂的事件处理放到一个专门的线程池中。

1.8.3、示例:

@Test
public void test4() throws Exception { 
   
	
	ExecutorService pool = Executors.newCachedThreadPool(); //创建线程池

	cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)  //创建节点
		.inBackground(new BackgroundCallback() { 
    //指定回调函数
			@Override
			public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { 
   
				System.out.println("code:" + ce.getResultCode());
				System.out.println("type:" + ce.getType());
				System.out.println("线程为:" + Thread.currentThread().getName());
			}
		}, pool)  //指定池程池
		.forPath("/super/c3", "c3内容".getBytes());//

	Thread.sleep(Integer.MAX_VALUE);
}

运行结果:

-------连接成功----
code:0
type:CREATE
线程为:pool-3-thread-1

1.9、读取子节点

@Test
public void test5() throws Exception { 
   
	List<String> list = cf.getChildren().forPath("/super");
	for(String p : list){ 
   
		System.out.println(p);
	}
}

1.10、判断节点是否存在

@Test
public void test6() throws Exception { 
   
	Stat stat = cf.checkExists().forPath("/super/c3");
	System.out.println(stat);
}

运行结果:

如果节点存在:

631,631,1557570121762,1557570121762,0,0,0,0,8,0,631

如果节点不存在:

null

二、示例代码

2.1、示例代码1

package com.aop8.curator.base;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorBase_junit { 
   

	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;// ms

	static CuratorFramework cf = null;

	@Before
	public void before() { 
   
		// 1 重试策略:初次间隔时间为1s(1000毫秒), 重试10次
		
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		
		// 2 通过工厂创建连接
		cf = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
				.sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy)
				// .namespace("super")
				.build();
				
		// 3 开启连接
		cf.start();
		System.out.println("-------连接成功----");
	}
	
	@After
	public void after() { 
   
		cf.close();
		System.out.println("======关闭连接功=====");
	}

	/** * 查看状态 */
	@Test
	public void test1() { 
   
		System.out.println(States.CONNECTED);
		System.out.println(cf.getState());
	}

	/** * 新增、删除 * <pre> * creatingParentsIfNeeded():级联创建节点。创建节点时,如果父节点不存在,也一并创建 * deletingChildrenIfNeeded:级联删除节点。删除节点时,如果存在子节点,也级联删除 * </pre> */
	@Test
	public void test2() throws Exception { 
   
		// 4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
		// withMode()方法指定创建的节点类型,跟原生的Zookeeper API一样,默认为PERSISTENT类型。
		
		cf.create().withMode(CreateMode.PERSISTENT).forPath("/super0", "c0内容".getBytes());
		
		cf.create()
			.creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.forPath("/super/c1", "c1内容".getBytes());
		
		Thread.sleep(1000*30);//休眠30S后,删除节点
		
		// 5 删除节点
		cf.delete()
		.guaranteed()
		//.withVersion(version)
		.deletingChildrenIfNeeded()
		.forPath("/super");
	}

	
	/** * 创建节点、读取节点内容 */
	@Test
	public void test3() throws Exception { 
   
		// 创建节点
		cf.create()
			.creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.forPath("/super/c1", "c1内容".getBytes());
		
		cf.create()
			.creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.forPath("/super/c2", "c2内容".getBytes());
		
		// 读取节点
		String ret1 = new String(cf.getData().forPath("/super/c2"));
		System.out.println(ret1);
	}
	
	
	/** * 修改节点、读取 */
	@Test
	public void test3_2() throws Exception { 
   
		// 修改节点
		cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
		String ret2 = new String(cf.getData().forPath("/super/c2"));
		System.out.println(ret2);
	}
	
	

	/** * 绑定回调函数 */
	@Test
	public void test4() throws Exception { 
   
		
		ExecutorService pool = Executors.newCachedThreadPool();

		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)//
			.inBackground(new BackgroundCallback() { 
   
				@Override
				public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { 
   
					System.out.println("code:" + ce.getResultCode());
					System.out.println("type:" + ce.getType());
					System.out.println("线程为:" + Thread.currentThread().getName());
				}
			}, pool)//
			.forPath("/super/c3", "c3内容".getBytes());//

		Thread.sleep(Integer.MAX_VALUE);
	}

	/** * 子节点 */
	@Test
	public void test5() throws Exception { 
   
		List<String> list = cf.getChildren().forPath("/super");
		for(String p : list){ 
   
			System.out.println(p);
		}
	}

	/** * 判断节点是否存在 */
	@Test
	public void test6() throws Exception { 
   
		Stat stat = cf.checkExists().forPath("/super/c4");
		System.out.println(stat);
	}
	
	
	@Test
	public void test7() throws Exception { 
   
		Thread.sleep(2000);
		cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
	}	

}

2.2、示例代码2

主要依赖:

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>4.0.0</version>
</dependency>

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>4.0.0</version>
</dependency>

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.4.11</version>
</dependency>
<!-- 时间操作组件 -->
<dependency>
	<groupId>joda-time</groupId>
	<artifactId>joda-time</artifactId>
	<version>2.5</version>
</dependency>
<!-- Apache工具组件 -->
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.3.2</version>
</dependency>
package com.imooc.curator;

import java.util.List;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorOperator2 { 
   

	public CuratorFramework curatorFramework = null;
	public static final String zkServerPath = "127.0.0.1:2181";
	
	
	@Before
	public void testCuratorFramework(){ 
   
		test1();
	}
	
	
	//@Test
	public void test1(){ 
   
		
		/** * 同步创建zk示例,原生api是异步的 * * curator链接zookeeper的策略:ExponentialBackoffRetry * baseSleepTimeMs:初始sleep的时间 * maxRetries:最大重试次数 * maxSleepMs:最大重试时间 */
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/** * curator链接zookeeper的策略:RetryNTimes * n:重试的次数 * sleepMsBetweenRetries:每次重试间隔的时间 */
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/** * curator链接zookeeper的策略:RetryOneTime * sleepMsBetweenRetry:每次重试间隔的时间 */
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/** * 永远重试,不推荐使用 */
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/** * curator链接zookeeper的策略:RetryUntilElapsed * maxElapsedTimeMs:最大重试时间 * sleepMsBetweenRetries:每次重试间隔 * 重试时间超过maxElapsedTimeMs后,就不再重试 */
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
		
		curatorFramework = CuratorFrameworkFactory.builder()
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		curatorFramework.start();
		
		// 实例化
		boolean isZkCuratorStarted = curatorFramework.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
	}
	
	/** * * @Description: 关闭zk客户端连接 */
	@After
	public void closeZKClient() { 
   
		if (curatorFramework != null) { 
   
			this.curatorFramework.close();
		}
		boolean isZkCuratorStarted2 = curatorFramework.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
	}
	
	
	
	// 创建节点
	String nodePath = "/super/imooc";
	
	@Test
	public void test2() throws Exception{ 
   
		byte[] data = "superme".getBytes();
		
		curatorFramework.create().creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.withACL(Ids.OPEN_ACL_UNSAFE)
			.forPath(nodePath, data);
	}
	
	
	@Test
	public void test3() throws Exception{ 
   
		// 更新节点数据
		byte[] newData = "batman".getBytes();
		curatorFramework.setData().withVersion(0).forPath(nodePath, newData);
	}
	
	
	@Test
	public void test33() throws Exception{ 
   
		// 删除节点
		curatorFramework.delete()
		.guaranteed()					// 如果删除失败,那么在后端还是继续会删除,直到成功
		.deletingChildrenIfNeeded()	// 如果有子节点,就删除
		.withVersion(4)
		.forPath(nodePath);
		
	}
	
	
	@Test
	public void test4() throws Exception{ 
   
		// 读取节点数据
		Stat stat = new Stat();
		byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(nodePath);
		System.out.println("节点" + nodePath + "的数据为: " + new String(data));
		System.out.println("该节点的版本号为: " + stat.getVersion());
	}
	
	
	@Test
	public void test5() throws Exception{ 
   
		// 查询子节点
		List<String> childNodes = curatorFramework.getChildren()
											.forPath(nodePath);
		System.out.println("开始打印子节点:");
		for (String s : childNodes) { 
   
			System.out.println(s);
		}
	}
	
	
	
	@Test
	public void test6() throws Exception{ 
   
		// 判断节点是否存在,如果不存在则为空
		Stat statExist = curatorFramework.checkExists().forPath(nodePath + "/abc");
		System.out.println(statExist);
	}
	

	@Test
	public void test7() throws Exception{ 
   
		// watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
		curatorFramework.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
		curatorFramework.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
		
		Thread.sleep(100000);
	}
	
	@Test
	public void test8() throws Exception{ 
   
		// 为节点添加watcher
		// NodeCache: 监听数据节点的变更,会触发事件
		final NodeCache nodeCache = new NodeCache(curatorFramework, nodePath);
		// buildInitial : 初始化的时候获取node的值并且缓存
		nodeCache.start(true);
		
		if (nodeCache.getCurrentData() != null) { 
   
			System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
		} else { 
   
			System.out.println("节点初始化数据为空...");
		}
		nodeCache.getListenable().addListener(new NodeCacheListener() { 
   
			public void nodeChanged() throws Exception { 
   
				if (nodeCache.getCurrentData() == null) { 
   
					System.out.println("空");
					return;
				}
				String data = new String(nodeCache.getCurrentData().getData());
				System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
			}
		});
		
		Thread.sleep(100000);
	}
	
	
	

	@Test
	public void test9() throws Exception{ 
   
		// 为子节点添加watcher
		// PathChildrenCache: 监听数据节点的增删改,会触发事件
		String childNodePathCache =  nodePath;
		// cacheData: 设置缓存节点的数据状态
		final PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, childNodePathCache, true);
		/** * StartMode: 初始化方式 * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 * NORMAL:异步初始化 * BUILD_INITIAL_CACHE:同步初始化 */
		childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
		
		List<ChildData> childDataList = childrenCache.getCurrentData();
		System.out.println("当前数据节点的子节点数据列表:");
		for (ChildData cd : childDataList) { 
   
			String childData = new String(cd.getData());
			System.out.println(childData);
		}
		
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() { 
   
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 
   
				if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ 
   
					System.out.println("子节点初始化ok...");
				}
				
				else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ 
   
					String path = event.getData().getPath();
					if (path.equals(ADD_PATH)) { 
   
						System.out.println("添加子节点:" + event.getData().getPath());
						System.out.println("子节点数据:" + new String(event.getData().getData()));
					} else if (path.equals("/super/imooc/e")) { 
   
						System.out.println("添加不正确...");
					}
					
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ 
   
					System.out.println("删除子节点:" + event.getData().getPath());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ 
   
					System.out.println("修改子节点路径:" + event.getData().getPath());
					System.out.println("修改子节点数据:" + new String(event.getData().getData()));
				}
			}
		});
		
		Thread.sleep(100000);
		
	}	
	
	public final static String ADD_PATH = "/super/imooc/d";	
}

三、参考

《从Paxos到Zookeeper–分布式一致性原理与实践》 -2015年2月

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/21994.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信