分布式锁的三种实现方式

为什么要使用分布式锁

使用分布式锁的目的,无外乎就是保证同一时间只有一个客户端可以对共享资源进行操作。

zookeeper可靠性比redis强太多,只是效率低了点,如果并发量不是特别大,追求可靠性,首选zookeeper。为了效率,则首选redis实现。

1.数据库方式

/*** 该类实现Lock类,用于分布式锁(也可以不实现该类,自己添加lock和unlock方法)* 实现思路:* 1.在数据库中建立一张表,创建一个属性唯一的字段* 2.当有用户访问时,先查询该字段是否有指定的值,* 如果有则说明该数据正在被访问,拒绝其他用户进行写操作* 如果没有则向数据库添加指定的值(上锁)* 3.该用户用完需要的数据后删除表中添加的指定数据(解锁)*/public class MysqlLock implements Lock {@AutowiredTestLockMapper testLockMapper;private static final String FLAG = "isLock";
/*** 上锁就是向数据库中加上一条数据,该字段设置为唯一* 当其他人想上锁时发现该数据有人占用就会等待*/@Overridepublic void lock() {while(true){ //循环直到获取到锁boolean b = tryLock(); //如果锁被占用则获取不到if(b){TestLock testLock = new TestLock();testLock.setFlag(FLAG);testLockMapper.insert(testLock);break;}else {System.out.println("该数据被占用...请等待。。。");}}}
@Overridepublic void lockInterruptibly() throws InterruptedException {
}
/*** 尝试获取锁,如果数据库中有该值,则获取不到锁* @return*/@Overridepublic boolean tryLock() {QueryWrapper wrapper = new QueryWrapper<>();wrapper.eq("flag",FLAG);TestLock testLock = testLockMapper.selectOne(wrapper);if(testLock==null){return true;}return false;}
@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}
/*** 释放锁,将上锁时设置的数据删除,其他请求可以设置数据来得到锁*/@Overridepublic void unlock() {QueryWrapper wrapper = new QueryWrapper<>();wrapper.eq("flag",FLAG);testLockMapper.delete(wrapper);}
@Overridepublic Condition newCondition() {return null;}}

2.redis方式

死锁问题:当进行lock方法上锁以后,在执行操作时有异常而无法执行unlock释放锁

解决死锁:在redis中可以通过expire设置数据的过期时间,指定的时间内即使我们不释放锁也会自动删除

redis分布式锁实现基于setnt(set if not exists),设置成功返回1,失败返回0,释放锁通过del指令完成

/*** 实现思路* 基本和mysql一致* 在redis中使用命令判断该值是否存在,存在则等待,不存在则设置值,获取锁* 当操作完成后删除值并且释放锁*/public class RedisLock {@AutowiredRedisTemplate redisTemplate;private static final String NAME = "lock";private static final String FLAG = "isLock";public void lock(){while (true){//向redis中设置指定的key/value,//该方法有可能导致死锁//    Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG);//设置过期时间为 1 分钟 ,即使1分钟后没有人释放锁他也会自动消失        Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG,1, TimeUnit.MINUTES);if (b){return ;}else{System.out.println(" 该数据被占用...请等待。。。");}}}
public void unlock(){redisTemplate.delete(NAME);}
}

3.zookeeper方式

zookeeper实现分布式锁:原理:有序临时节点+watch监听来实现

实现思路:为每一个执行的线程创建一个有序的临时节点,为了确保有序性,在创建完节点,会再获取全部节点,再重新进行—次排序,排序过程中,每个线程要判断自己剩下的临时节点的序号是否是显小的, 如果是最小的,将会获取到锁,执行相关操作,释放锁 如果不是最小的,会监听它的前一个节点,当它的前一个节点被删除时,它就会获得锁,依次类推

public class ZkLock {//ZooKeeper客户端private ZooKeeper zk;//zk的一个目录结构locks,代表根目录private String root ="/locks";//锁的名称private String lockName;//当前线程创建的序列nodeprivate ThreadLocal nodeId = new ThreadLocal<>();//用来同步等待zkclient链接到了服务端//CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待private CountDownLatch countDownLatch = new CountDownLatch(1);//超时时间private static final int sessionTimeout = 3000;private final static byte[] data = new byte[0];//在构造方法中连接zookeeper,创建根节点lockpublic  ZkLock(String config,String lockName){this.lockName = lockName;try {zk = new ZooKeeper(config, sessionTimeout, new Watcher() {//watcher用于监听@Overridepublic void process(WatchedEvent event) {if(event.getState()==Event.KeeperState.SyncConnected){//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。countDownLatch.countDown();}}});//使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。countDownLatch.await();Stat stat = zk.exists(root,false);if(null==stat){//创建根节点zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {e.printStackTrace();}}
/*** 添加watch监听临时顺序节点的删除*/class LockWatcher implements Watcher {private CountDownLatch latch = null;public LockWatcher(CountDownLatch latch) {this.latch = latch;}@Overridepublic void process(WatchedEvent event) {if(event.getType()== Event.EventType.NodeDeleted){latch.countDown();}}}
public void lock(){try{//创建临时子节点String myNode = zk.create(root+"/"+lockName,data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName()+myNode+"created");//取出所有子节点,并排序List subNode = zk.getChildren(root,false);TreeSet sortedNode = new TreeSet<>();for (String node: subNode) {sortedNode.add(root+"/"+node);}String smallNode = sortedNode.first();if(myNode.equals(smallNode)){//如果是最小节点,则表示获得锁this.nodeId.set(myNode);return;}String preNode = sortedNode.lower(myNode);CountDownLatch latch = new CountDownLatch(1);Stat stat = zk.exists(preNode,new LockWatcher(latch));//同时注册监听//判断比自己小一个数的节点是否存在,如果不存在则不等待锁,同时注册监听if(stat != null){latch.await();//等待其他线程释放锁nodeId.set(myNode);latch = null;}
} catch (Exception e) {e.printStackTrace();}}
public void unlock(){try {if(null!= nodeId){zk.delete(nodeId.get(),-1);}nodeId.remove();//释放锁} catch (Exception e) {e.printStackTrace();}
}
}

关于Redundant declaration:@SpringBootApplication already applies given @ComponentScan异常

1.@ComponentScan默认扫描使用该注解的类所在的包,包括这个包下的类和子包,所以如果没有配置basepackages,并且类都放在子包中,是可以正常访问的2.如果配置了@ComponentScn中的basepackages,那么就要把所有需要扫描的包都配置.这种情况下,@ComponentScan是不会再去扫描当前类所在的包的.之前我之所以以为@ComponentScan对启动类之外的包无能为力,就是因为配置了domain包,但是没有配controller类的包,导致程序无法访问.


推荐内容