Zookeeper(2)-分布式锁的基础实现
Zookeeper 分布式锁
什么是分布式锁?
在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用 Java API 自带的 Lock 或者是 synchronize 就可以解决多线程带来的并发问题。但是在集群环境
中,上述的方法并不能解决服务与服务之间的并发问题。
分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务
具体可以看这位大佬的解释,说的通俗易懂。
实现分布式锁的方式
- 数据库实现(效率低,不推荐)
- redis 实现(使用 redission 实现,当需要考虑死锁和释放问题,比较繁琐)
- Zookeeper 实现(使用临时节点,效率高)
- Spring Cloud 实现全局锁(内置的)
Zookeeper 实现分布式锁
实现原理
使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点
,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。
案例实战
下面的代码基于 Zookeeper(1)-安装与基础使用
原生 Zookeeper 案例
编写分布式锁的代码
public class DistributedLock {
private final String connectString = "192.168.3.33:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zooKeeper;
private final String rootNode = "locks";
private final String subNode = "seq-";
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
public DistributedLock() throws IOException, KeeperException, InterruptedException {
// 获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
connectLatch.await();
// 判断节点/locks 是否存在
Stat stat = zooKeeper.exists("/" + rootNode, false);
// 如果根节点不存在则创建永久根节点
if (stat == null) {
System.out.println("根节点不存在!");
zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 获取锁
public void zkLock() throws KeeperException, InterruptedException {
// 在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有的节点
List <String> children = zooKeeper.getChildren("/" + rootNode, false);
// 列表中只有一个节点,就直接获取到锁
if (children.size() == 0) {
return;
} else {
// 对节点进行排序
Collections.sort(children);
//当前节点名称
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
// 获取当前节点在数组中的位置
int indexOf = children.indexOf(thisNode);
if (indexOf == -1) {
System.out.println("数据异常");
} else if (indexOf == 0) {
// index == 0 说明 thisNode 在列表中最小,当前 client 获取锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zooKeeper.getData(waitPath, true, new Stat());
waitLatch.await();
return;
}
}
}
// 释放锁
public void unZkLock() {
try {
zooKeeper.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
测试代码
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
// 创建分布式锁 1
final DistributedLock lock1 = new DistributedLock();
// 创建分布式锁 2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("线程 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("线程 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
输出信息
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
可能在测试中会报:Will not attempt to authenticate using SASL (unknown error) 这个错误信息。
解决方案
在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig
配置信息,将 ENABLE_CLIENT_SASL_KEY
改成 false。
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
// 获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, config);
Curator 案例
导入 POM 文件
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
实战代码
public class CuratorLockTest {
// 测试代码
public static void main(String[] args) throws Exception {
// 创建分布式锁1
InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取到锁
locks1.acquire();
System.out.println("线程 1 获取到锁了!");
locks1.acquire();
System.out.println("线程 1 再次获取到锁了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks1.release();
System.out.println("线程 1 释放锁了!");
locks1.release();
System.out.println("线程 1 释放锁了!");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取到锁
locks2.acquire();
System.out.println("线程 2 获取到锁了!");
locks2.acquire();
System.out.println("线程 2 再次获取到锁了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks2.release();
System.out.println("线程 2 释放锁了!");
locks2.release();
System.out.println("线程 2 释放锁了!");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 创建连接
private static CuratorFramework getCuratorFramework() throws Exception {
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory();
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutMs(2000).retryPolicy(backoffRetry)..build();
client.start();
System.out.println("客户端启动成功!");
return client;
}
}
输出信息
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
可能在测试中会报:Will not attempt to authenticate using SASL (unknown error) 这个错误信息。
解决方案
使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。
创建自定义 ZookeeperFactory
public class DefaultZookeeperFactory implements ZookeeperFactory {
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean b) throws Exception {
// 自定义 ZKClientConfig 配置
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
return new ZooKeeper(connectString, sessionTimeout, watcher, b, config);
}
}
使用 CuratorFrameworkFactory 创建连接的时候导入自定义 ZookeeperFactory
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutMs(2000).retryPolicy(backoffRetry).
zookeeperFactory(zookeeperFactory).build();