Zookeeper部分应用场景
配置维护
应用所部署的服务节点众多,当需要修改配置信息时,可以利用Zookeeper的watcher机制,实现配置维护,将修改的配置信息同步至各个服务节点。
1.各个服务节点向Zookeeper注册watcher监听事件;
2.配置修改之后,改变对应Zookeeper上的节点信息;
3.触发watcher事件回调;
4.各个服务节点获取最新Zookeeper上配置节点的信息。
场景模拟
以某一节点的逻辑为主要说明,通过日志查看对应的信息输出,以客户端工具来充当发布者,修改Zookeeper节点信息,观察当前服务节点日志。
maven依赖
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-client</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.3.3</version>
</dependency>
配置信息,此处假定配置节点为/gconfig
zkServer=192.168.80.81:2181,192.168.80.82:2181,192.168.80.83:2181
configPath=/gconfig
ConfigHandle.java
package site.zhaoyangjue.handle;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import site.zhaoyangjue.tool.ZkTool;
@Component
public class ConfigHandle implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigHandle.class);
@Value("${zkServer}")
private String zkServer;
@Value("${configPath}")
private String configPath;
public static CuratorFramework client;
@Override
public void run(ApplicationArguments args) {
LOGGER.info("--当前应用启动,开始获取配置信息--");
LOGGER.info("zkServer集群地址:{},配置信息节点:{}",zkServer,configPath);
client = ZkTool.getClient(zkServer);
if (ZkTool.checkNodeExist(configPath)){
String value = ZkTool.getValue(configPath);
LOGGER.info("读取到zk上的配置信息:{}",value);
} else {
LOGGER.info("--读取默认配置信息--");
}
// 注册watcher事件
watcher();
}
public void watcher(){
try {
client.getData().usingWatcher((CuratorWatcher) event -> {
if (Watcher.Event.EventType.NodeDataChanged.equals(event.getType())) {
// 节点数据发生变化
LOGGER.info("配置节点数据发生了变化");
String value = ZkTool.getValue(event.getPath());
LOGGER.info("获取到新的配置信息为:{}",value);
watcher();
}
}).forPath(configPath);
} catch (Exception e) {
LOGGER.error("watcher注册异常",e.getMessage());
}
}
}
ZkTool.java
package site.zhaoyangjue.tool;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.recipes.locks.InterProcessMutex;
import com.netflix.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import site.zhaoyangjue.handle.NamingHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ZkTool {
private static final Logger LOGGER = LoggerFactory.getLogger(ZkTool.class);
private static CuratorFramework client;
/**
* 获取到zk客户端
* @param zkServer
* @return
*/
public static CuratorFramework getClient(String zkServer,String namespace){
client = CuratorFrameworkFactory.builder()
.connectString(zkServer)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace(namespace)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
startClient();
return client;
}
/**
* 开启zk客户端
*/
public static void startClient(){
client.start();
}
/**
* 关闭zk客户端
*/
public static void closeClient(){
client.close();
}
/**
* 创建节点
* @param path
* @param value
* @param isEphemeral
* @return
* @throws Exception
*/
public static String createNode(String path,String value,Boolean isEphemeral,Boolean isSort) throws Exception {
if (isEphemeral == null) {
isEphemeral = false;
}
return isEphemeral ? (client.create().withMode(CreateMode.EPHEMERAL).forPath(path, value.getBytes()))
: (isSort ? (client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, value.getBytes()))
: (client.create().forPath(path, value.getBytes())));
}
/**
* 修改节点内容
* @param path
* @param newData
* @return
*/
public static String setNodeData(String path, String newData) {
Stat stat;
try {
stat = client.setData().forPath(path, newData.getBytes());
} catch (Exception e) {
return "更新节点失败";
}
return stat.toString();
}
/**
* 获取子节点列表
* @param path
* @return
*/
public static List<String> getChildren(String path){
try {
return client.getChildren().forPath(path);
} catch (Exception e) {
return new ArrayList<>();
}
}
/**
* 获取节点value
* @return
*/
public static String getValue(String path){
try {
byte[] bytes = client.getData().forPath(path);
return new String(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 校验路径节点是否存在
* @param path
* @return
*/
public static boolean checkNodeExist(String path){
boolean isExists = false;
try {
Stat stat = client.checkExists().forPath(path);
if (stat != null) {
isExists = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isExists;
}
/**
* 删除节点
* @param path
*/
public static void deleteNode(String path){
if (checkNodeExist(path)){
try {
client.delete().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
};
}
/**
* zk锁实现
*/
public static void lock(){
InterProcessMutex lock = new InterProcessMutex(client, "");
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
LOGGER.info(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
LOGGER.info(Thread.currentThread().getName() + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
备注:
-
主类实现ApplicationRunner接口,实现在Springboot容器启动之后执行run方法中的逻辑;
-
配置信息读取之后只是日志输出,不做他用,正常生产环境下是需要做相关业务逻辑操作的;
-
注册接听事件中使用递归来实现循环watcher注册,正常应该是建立一个监听器。
-
ZkTool为Zookeeper常见操作的工具类,实现比较简单,对于异常没有做特别处理。
日志输出:
[ restartedMain] site.zhaoyangjue.handle.ConfigHandle : 读取到zk上的配置信息:init_config
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle : 配置节点数据发生了变化
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle : 获取到新的配置信息为:first_change_config
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle : 配置节点数据发生了变化
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle : 获取到新的配置信息为:second_change_config
命名服务
在分布式环境下有很多的业务模块,均需要获取唯一标识,可以利用Zookeeper中的有序永久节点来实现,对应的节点名称是唯一的,以此来实现唯一标识。
NamingHandle.java
package site.zhaoyangjue.handle;
import com.netflix.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import site.zhaoyangjue.tool.ZkTool;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/naming")
public class NamingHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingHandle.class);
@Value("${zkServer}")
private String zkServer;
public static CuratorFramework client;
@GetMapping("/getNames")
public List<String> getNames(@RequestParam("app") String app){
client = ZkTool.getClient(zkServer,app);
List<String> nameList = new ArrayList<>();
try {
for (int i = 0; i < 1000; i++) {
String tempNode = ZkTool.createNode("/" + app + i, "", false, true);
nameList.add(tempNode);
}
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("获取到的名称列表:{}",nameList.toString());
ZkTool.closeClient();
return nameList;
}
}
备注
生产环境下命名服务不建议使用这样的方式。
分布式锁
详见ZkTool.java中lock();
基于原生的zkClientAPI实现分布式锁过程比较繁琐,基本思想如下:
1.客户端向某资源发起读写请求之后,判断根节点是否存在,如果不存在,则创建一个根节点(永久节点)。
2.对于读请求,遍历根节点下的所有写请求,向序号比自己小的最后一个写请求节点添加watcher监听;
对于写请求,遍历根节点下的请求,向序号比自己小的最后一个请求添加watcher监听;
3.创建完毕之后,执行watcher回调;
4.回调逻辑如下:
遍历根节点下比自己序号下的所有请求:
对于当前请求是读请求,如果这些请求都是读请求,则当前可以进行数据读取,如果存在写请求,则等待,直到写请求完毕;
对于当前请求是写请求,如果这些请求中没有比自己序号小的话,则当前可以进行数据写入,如果存在,则等待,直到前面的写请求完毕。
最后修改:2021-12-14 17:57:19
© 著作权归作者所有