Zookeeper(2)-分布式锁的基础实现

Zookeeper(2)-分布式锁的基础实现

Zookeeper 分布式锁

什么是分布式锁?

在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用 Java API 自带的 Lock 或者是 synchronize 就可以解决多线程带来的并发问题。但是在集群环境中,上述的方法并不能解决服务与服务之间的并发问题。

分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务

具体可以看这位大佬的解释,说的通俗易懂。

实现分布式锁的方式

  1. 数据库实现(效率低,不推荐)
  2. redis 实现(使用 redission 实现,当需要考虑死锁和释放问题,比较繁琐)
  3. Zookeeper 实现(使用临时节点,效率高)
  4. Spring Cloud 实现全局锁(内置的)

Zookeeper 实现分布式锁

实现原理

使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。

案例实战

下面的代码基于 Zookeeper(1)-安装与基础使用

原生 Zookeeper 案例

编写分布式锁的代码
public class DistributedLock {

    private final String connectString = "192.168.3.33:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zooKeeper;
    private final String rootNode = "locks";
    private final String subNode = "seq-";

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String waitPath;
    private String currentNode;

    public DistributedLock() throws IOException, KeeperException, InterruptedException {
        // 获取连接
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // 发生了 waitPath 的删除事件
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });
        connectLatch.await();
        // 判断节点/locks 是否存在
        Stat stat = zooKeeper.exists("/" + rootNode, false);
        // 如果根节点不存在则创建永久根节点
        if (stat == null) {
            System.out.println("根节点不存在!");
            zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 获取锁
    public void zkLock() throws KeeperException, InterruptedException {
        // 在根节点下创建临时顺序节点,返回值为创建的节点路径
        currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        // 获取所有的节点
        List <String> children = zooKeeper.getChildren("/" + rootNode, false);
        // 列表中只有一个节点,就直接获取到锁
        if (children.size() == 0) {
            return;
        } else {
            // 对节点进行排序
            Collections.sort(children);
            //当前节点名称
            String thisNode = currentNode.substring(("/" + rootNode + "/").length());
            // 获取当前节点在数组中的位置
            int indexOf = children.indexOf(thisNode);
            if (indexOf == -1) {
                System.out.println("数据异常");
            } else if (indexOf == 0) {
                // index == 0 说明 thisNode 在列表中最小,当前 client 获取锁
                return;
            } else {
                // 获得排名比 currentNode 前 1 位的节点
                this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1);
                // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                zooKeeper.getData(waitPath, true, new Stat());
                waitLatch.await();
                return;
            }
        }
    }
    
    // 释放锁
    public void unZkLock() {
        try {
            zooKeeper.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}
测试代码
public class DistributedLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        // 创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();
        // 创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.unZkLock();
                    System.out.println("线程 1 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.unZkLock();
                    System.out.println("线程 2 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
输出信息
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!

可能在测试中会报:Will not attempt to authenticate using SASL (unknown error) 这个错误信息。

解决方案

在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig 配置信息,将 ENABLE_CLIENT_SASL_KEY 改成 false。

    ZKClientConfig config = new ZKClientConfig();
    config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
    // 获取连接
    zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
            if (event.getState() == Event.KeeperState.SyncConnected) {
                connectLatch.countDown();
            }
            // 发生了 waitPath 的删除事件
            if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                waitLatch.countDown();
            }
        }
    }, config);

Curator 案例

导入 POM 文件
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>
实战代码
public class CuratorLockTest {

	// 测试代码
    public static void main(String[] args) throws Exception {
        // 创建分布式锁1
        InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 获取到锁
                    locks1.acquire();
                    System.out.println("线程 1 获取到锁了!");
                    locks1.acquire();
                    System.out.println("线程 1 再次获取到锁了!");
                    System.out.println("休息一下!");
                    Thread.sleep(5 * 1000);
                    locks1.release();
                    System.out.println("线程 1 释放锁了!");
                    locks1.release();
                    System.out.println("线程 1 释放锁了!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 获取到锁
                    locks2.acquire();
                    System.out.println("线程 2 获取到锁了!");
                    locks2.acquire();
                    System.out.println("线程 2 再次获取到锁了!");
                    System.out.println("休息一下!");
                    Thread.sleep(5 * 1000);
                    locks2.release();
                    System.out.println("线程 2 释放锁了!");
                    locks2.release();
                    System.out.println("线程 2 释放锁了!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 创建连接
    private static CuratorFramework getCuratorFramework() throws Exception {
        ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
        DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory();
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
                .sessionTimeoutMs(2000).retryPolicy(backoffRetry)..build();
        client.start();
        System.out.println("客户端启动成功!");
        return client;
    }

}
输出信息
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!

可能在测试中会报:Will not attempt to authenticate using SASL (unknown error) 这个错误信息。

解决方案

使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。

创建自定义 ZookeeperFactory
public class DefaultZookeeperFactory implements ZookeeperFactory {

    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean b) throws Exception {
        // 自定义 ZKClientConfig 配置
        ZKClientConfig config = new ZKClientConfig();
        config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
        return new ZooKeeper(connectString, sessionTimeout, watcher, b, config);
    }
}
使用 CuratorFrameworkFactory 创建连接的时候导入自定义 ZookeeperFactory
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
                .sessionTimeoutMs(2000).retryPolicy(backoffRetry).
                        zookeeperFactory(zookeeperFactory).build();
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » Zookeeper(2)-分布式锁的基础实现