红泥小火炉


Zookeeper部分应用场景

Nathaniel 2021-12-14 1132浏览 0条评论
首页/正文
分享到: / / / /

Zookeeper部分应用场景

配置维护

应用所部署的服务节点众多,当需要修改配置信息时,可以利用Zookeeper的watcher机制,实现配置维护,将修改的配置信息同步至各个服务节点。

1.各个服务节点向Zookeeper注册watcher监听事件;
2.配置修改之后,改变对应Zookeeper上的节点信息;
3.触发watcher事件回调;
4.各个服务节点获取最新Zookeeper上配置节点的信息。

场景模拟

以某一节点的逻辑为主要说明,通过日志查看对应的信息输出,以客户端工具来充当发布者,修改Zookeeper节点信息,观察当前服务节点日志。

maven依赖

<dependency>
   <groupId>com.netflix.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>1.3.3</version>
</dependency>
<dependency>
   <groupId>com.netflix.curator</groupId>
   <artifactId>curator-client</artifactId>
   <version>1.3.3</version>
</dependency>
<dependency>
   <groupId>com.netflix.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>1.3.3</version>
</dependency>

配置信息,此处假定配置节点为/gconfig

zkServer=192.168.80.81:2181,192.168.80.82:2181,192.168.80.83:2181
configPath=/gconfig

ConfigHandle.java

package site.zhaoyangjue.handle;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import site.zhaoyangjue.tool.ZkTool;

@Component
public class ConfigHandle implements ApplicationRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigHandle.class);
    @Value("${zkServer}")
    private String zkServer;
    @Value("${configPath}")
    private String configPath;

    public static CuratorFramework client;
    @Override
    public void run(ApplicationArguments args) {
        LOGGER.info("--当前应用启动,开始获取配置信息--");
        LOGGER.info("zkServer集群地址:{},配置信息节点:{}",zkServer,configPath);
        client = ZkTool.getClient(zkServer);
        if (ZkTool.checkNodeExist(configPath)){
            String value = ZkTool.getValue(configPath);
            LOGGER.info("读取到zk上的配置信息:{}",value);
        } else {
            LOGGER.info("--读取默认配置信息--");
        }
        // 注册watcher事件
        watcher();
    }
    public void watcher(){
        try {
            client.getData().usingWatcher((CuratorWatcher) event -> {
                if (Watcher.Event.EventType.NodeDataChanged.equals(event.getType())) {
                    // 节点数据发生变化
                    LOGGER.info("配置节点数据发生了变化");
                    String value = ZkTool.getValue(event.getPath());
                    LOGGER.info("获取到新的配置信息为:{}",value);
                    watcher();
                }
            }).forPath(configPath);
        } catch (Exception e) {
            LOGGER.error("watcher注册异常",e.getMessage());
        }
    }
}

ZkTool.java

package site.zhaoyangjue.tool;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.recipes.locks.InterProcessMutex;
import com.netflix.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import site.zhaoyangjue.handle.NamingHandle;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ZkTool {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZkTool.class);
    private static CuratorFramework client;
    /**
     * 获取到zk客户端
     * @param zkServer
     * @return
     */
    public static CuratorFramework getClient(String zkServer,String namespace){
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServer)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .namespace(namespace)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        startClient();
        return client;
    }

    /**
     * 开启zk客户端
     */
    public static void startClient(){
        client.start();
    }

    /**
     * 关闭zk客户端
     */
    public static void closeClient(){
        client.close();
    }

    /**
     * 创建节点
     * @param path
     * @param value
     * @param isEphemeral
     * @return
     * @throws Exception
     */
    public static String createNode(String path,String value,Boolean isEphemeral,Boolean isSort) throws Exception {
        if (isEphemeral == null) {
            isEphemeral = false;
        }
        return isEphemeral ? (client.create().withMode(CreateMode.EPHEMERAL).forPath(path, value.getBytes()))
                : (isSort ? (client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, value.getBytes()))
                : (client.create().forPath(path, value.getBytes())));
    }
    /**
     * 修改节点内容
     * @param path
     * @param newData
     * @return
     */
    public static String setNodeData(String path, String newData) {
        Stat stat;
        try {
            stat = client.setData().forPath(path, newData.getBytes());
        } catch (Exception e) {
            return "更新节点失败";
        }
        return stat.toString();
    }

    /**
     * 获取子节点列表
     * @param path
     * @return
     */
    public static List<String> getChildren(String path){
        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            return new ArrayList<>();
        }
    }

    /**
     * 获取节点value
     * @return
     */
    public static String getValue(String path){
        try {
            byte[] bytes = client.getData().forPath(path);
            return new String(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 校验路径节点是否存在
     * @param path
     * @return
     */
    public static boolean checkNodeExist(String path){
        boolean isExists = false;
        try {
            Stat stat = client.checkExists().forPath(path);
            if (stat != null) {
                isExists = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return isExists;
    }

    /**
     * 删除节点
     * @param path
     */
    public static void deleteNode(String path){
        if (checkNodeExist(path)){
            try {
                client.delete().forPath(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

    /**
     * zk锁实现
     */
    public static void lock(){
        InterProcessMutex lock = new InterProcessMutex(client, "");
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                LOGGER.info(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                LOGGER.info(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

备注:

  • 主类实现ApplicationRunner接口,实现在Springboot容器启动之后执行run方法中的逻辑;

  • 配置信息读取之后只是日志输出,不做他用,正常生产环境下是需要做相关业务逻辑操作的;

  • 注册接听事件中使用递归来实现循环watcher注册,正常应该是建立一个监听器。

  • ZkTool为Zookeeper常见操作的工具类,实现比较简单,对于异常没有做特别处理。

日志输出:

[  restartedMain] site.zhaoyangjue.handle.ConfigHandle     : 读取到zk上的配置信息:init_config
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle     : 配置节点数据发生了变化
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle     : 获取到新的配置信息为:first_change_config
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle     : 配置节点数据发生了变化
[ain-EventThread] site.zhaoyangjue.handle.ConfigHandle     : 获取到新的配置信息为:second_change_config

命名服务

在分布式环境下有很多的业务模块,均需要获取唯一标识,可以利用Zookeeper中的有序永久节点来实现,对应的节点名称是唯一的,以此来实现唯一标识。

NamingHandle.java

package site.zhaoyangjue.handle;

import com.netflix.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import site.zhaoyangjue.tool.ZkTool;

import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/naming")
public class NamingHandle {
    private static final Logger LOGGER = LoggerFactory.getLogger(NamingHandle.class);
    @Value("${zkServer}")
    private String zkServer;

    public static CuratorFramework client;

    @GetMapping("/getNames")
    public List<String> getNames(@RequestParam("app") String app){
        client = ZkTool.getClient(zkServer,app);
        List<String> nameList = new ArrayList<>();
        try {
            for (int i = 0; i < 1000; i++) {
                String tempNode = ZkTool.createNode("/" + app + i, "", false, true);
                nameList.add(tempNode);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        LOGGER.info("获取到的名称列表:{}",nameList.toString());
        ZkTool.closeClient();
        return nameList;
    }
}

备注

生产环境下命名服务不建议使用这样的方式。

分布式锁

详见ZkTool.java中lock();

基于原生的zkClientAPI实现分布式锁过程比较繁琐,基本思想如下:

1.客户端向某资源发起读写请求之后,判断根节点是否存在,如果不存在,则创建一个根节点(永久节点)。
2.对于读请求,遍历根节点下的所有写请求,向序号比自己小的最后一个写请求节点添加watcher监听;
  对于写请求,遍历根节点下的请求,向序号比自己小的最后一个请求添加watcher监听;
3.创建完毕之后,执行watcher回调;
4.回调逻辑如下:
	遍历根节点下比自己序号下的所有请求:
	对于当前请求是读请求,如果这些请求都是读请求,则当前可以进行数据读取,如果存在写请求,则等待,直到写请求完毕;
	对于当前请求是写请求,如果这些请求中没有比自己序号小的话,则当前可以进行数据写入,如果存在,则等待,直到前面的写请求完毕。
最后修改:2021-12-14 17:57:19 © 著作权归作者所有
上一篇

评论列表

还没有人评论哦~赶快抢占沙发吧~