分布式锁与实现(二)—基于 zookeeper 实现

发布于 2019-10-03

分布式锁与实现(二)—基于 zookeeper 实现

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。

什么是分布式锁
分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务。

分布式锁有那些实现方案

1.使用数据库实现分布式锁(不推荐使用)
缺点:效率低,性能差、线程出现异常时,容易出现死锁。
2.使用redis实现分布式锁
缺点:锁的失效时间难控制、容易产生死锁、非阻塞式、不可重入。
3.使用zookeeper实现分布式锁
实现相对简单、可靠性强、使用临时节点,失效时间容易控制。
4.springcloud内置实现全局锁
比较冷门,但是效率最高,实现简单,失效时间容易控制。

应用场景
分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。

实现原理
利用Zookeeper来实现分布式锁,主要基于其临时(或临时有序)节点和watch机制。

架构分析

实现思路和流程
1、在zookeeper指定节点(locker)下创建临时顺序节点node_n。
2、获取locker下所有子节点children。
3、对子节点按节点自增序号从小到大排序。
4、判断本节点是不是第一个子节点,若是,则获取锁;若不是,则等待。
5、使用zookeeper感知节点的功能,对本节点的上一个节点进行感知。
6、当上一个节点被删除了,zookeeper会通知该线程,该线程就结束等待,并获取锁。
7、释放锁,并删除该临时节点。

具体实现代码
下面就具体使用java和zookeeper实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。

分布式锁类

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Lock:Java的锁接口,并实现该接口的抽象方法
 * Watcher:zookeeper的节点感知接口,并实现process方法,当节点有改变时,会调用该方法
 */
public class DistributedLock  implements Lock, Watcher {


    private ZooKeeper zk = null;
    // 根节点
    private String ROOT_LOCK = "/locker";
    // 竞争的资源
    private String lockName;
    // 等待的前一个锁
    private String WAIT_LOCK;
    // 当前锁
    private String CURRENT_LOCK;
    // 同步计数器
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 30000;

    /**
     * 配置分布式锁
     * @param config 连接的url
     * @param lockName 竞争资源
     */
    public DistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            // 连接zookeeper
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(ROOT_LOCK, false);
            if (stat == null) {
                // 如果根节点不存在,则创建根节点
                zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 节点感知器,感知到节点变化会调用该方法
    public void process(WatchedEvent event) {
        System.out.println("感知节点变化类型:"+event.getType().name());
        if (this.countDownLatch != null) {
            //如果同步计数器不为null,则减一
            this.countDownLatch.countDown();
        }
    }

    public void lock() {

        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
                return;
            } else {
                // 等待锁
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new RuntimeException("锁名有误");
            }
            // 创建临时有序节点
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " 已经创建");
            // 取所有子节点
            List subNodes = zk.getChildren(ROOT_LOCK, false);
            // 取出所有lockName的锁
            List lockObjects = new ArrayList();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) {
                    lockObjects.add(node);
                }
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
            // 若当前节点为最小节点,则获取锁成功
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }

            // 若不是最小节点,则找到自己的前一个节点
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // 等待锁
    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {

        //获取并感知上一个节点
        Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
            //初始化同步计数器,计数为1,当同步计数器为0,主线程才会向下执行
            this.countDownLatch = new CountDownLatch(1);
            // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " 等到了锁");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("释放锁 " + CURRENT_LOCK);
            zk.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }


    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
    
}

测试类

public class Test {

    static int n = 500;

    public static void secskill() {
        System.out.println(--n);
    }

    public static void main(String[] args) {

        Runnable runnable = new Runnable() {
            public void run() {
                DistributedLock lock = null;
                try {
                    lock = new DistributedLock("127.0.0.1:2181", "node");
                    lock.lock();
                    secskill();
                    System.out.println(Thread.currentThread().getName() + "正在运行");
                } finally {
                    if (lock != null) {
                        lock.unlock();
                    }
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}
喜欢 0
奋楫笃行,臻于至善!

相关文章

使用 Mycat 中间件搭建 MySQL 高可用实现分库分表及读写分离

Mycat 是一款基于阿里开源产品Cobar而研发的开源数据库分库分表中间件(基于Java语言开发),可以用来方便地搭建面向企业应用开发的大数据库集群,支持事务、ACID等特性,其核心是基于代理方案实...
阅读全文

通用架构模式和通用架构服务

架构模式是在给定上下文的软件架构中,针对常发生问题的一种通用、复用的解决方案。架构模式类似于软件设计模式,但是范畴更广。一个好的软件产品往往需要有良好的架构思想和架构服务来支撑整个软件的生命周期,本文...
阅读全文

Java 的可重入锁和不可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java中Reentra...
阅读全文

Redis 的两种持久化方式及使用场景分析

Redis是内存数据库,数据都是存储在内存中,为了避免进程退出导致数据的永久丢失,需要定期将Redis中的数据以某种形式(数据或命令)从内存保存到硬盘。当下次Redis重启时,利用持久化文件实现数据恢...
阅读全文

redis 高可用主从,哨兵,集群解决方案

Redis因为其高性能和易用性在我们后端的服务中发挥了巨大的作用,并且很多重要功能的实现都会依赖redis。除了常用的缓存,还有队列,发布订阅等重要用处。所以redis的服务高可用就显得尤为关键。这里...
阅读全文

Redis 缓存穿透、缓存击穿、缓存雪崩的区别及解决方案

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很...
阅读全文

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注