Loading...
墨滴

楼仔

2021/04/24  阅读:50  主题:默认主题

Reids实现分布式锁

用Redis实现一个分布式锁,基于Go语言。

序言

前面的文章都是理论知识,写多了头有点大,突然想写点实战方面的内容,刚好最近公司在做异步任务迁移,用到了分布式锁和任务分片,所以打算写2篇实战方面的文章,分别介绍分布式锁和任务分片的实现方式,这个在实际项目中,应该会经常用到,今天这篇文章就先讲解分布式锁的实现方式。

使用场景

分布式锁的使用场景其实很多,在小米这边我主要遇到以下场景:

  • 在服务集群中执行定时任务,我们希望只有一台机器去执行,就需要用到分布式锁,只有拿到锁的机器,才能执行该定时任务;
  • 当外部请求打到集群中时,比如该请求是对订单进行操作,为了避免请求重入,我们需要在入口加上订单维度的分布式锁。

Redis分布式锁

Redis分布式锁是面试常面的考题,很多同学都知道用SetNx()去获取锁,如果面试官问你下面2个问题,你知道怎么回答么?

  • 如果获取锁的机器挂掉,如何处理?
  • 当锁超时时,A/B两台机器同时获取锁,可能会同时获取,如何解决?

其实Redis分布式锁,肯定不仅仅是SetNx()就能解决的,什么?你不知道什么是SetNx(),楼哥是暖男嘛,马上给你解释一下:

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。(返回值:设置成功,返回 1,设置失败,返回 0)

如果调用SetNx()返回1,表示获取到锁,如果返回0,表示没有获取到锁,为了避免机器宕机&重启,导致锁一直没有释放,所以我们需要记录锁的超时时间,整体执行流程如下:

  • 先通过SetNx()获取锁,并将value设置成超时时间,如果成功获取锁,直接返回;
  • 如果未获取到锁,可能是机器宕机&重启等,需要通过GetKey()获取锁的超时时间value,如果锁未超时,证明机器未宕机&重启,获取锁失败;
  • 如果锁已经超时,就可以重新去获取锁,并设置锁的新的超时时间,为了避免多台机器机器同时拿到锁,需要使用GetSet()方法,因为GetSet()会返回之前的旧值,如果此时有两台机器A/B同时执行GetSet()方法,假如A先执行,B后执行,那么A调用GetSet()返回的值,其实就等于之前调用GetKey()获取的的值current_time,B调用GetKey()返回的值,其实就是A设置的新值,肯定不等于current_time,所以我们可以通过两个时间是否相等,来判断是谁先拿到锁。(这里应该是分布式锁最难理解的地方,我每次重温这个逻辑,都会在这个地方卡一下。。。)

Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。(返回值:返回给定 key 的旧值。 当 key 没有旧值时,即 key 不存在时,返回 nil;当 key 存在但不是字符串类型时,返回一个错误。)

可能有同学说,写了一堆,看的我头都大了,来来来,楼哥给你画了一幅图,是不是就清晰很多了呢

具体实现

基本原理讲清楚了,下面就开始堆代码了哈,先看看获取锁的逻辑,里面的注释写的相当详细,即使不懂编程的同学,应该都能看懂:

// 获取分布式锁,需要考虑以下情况:
// 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置;
// 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。
func GetDistributeLock(key string, expireTime int64) bool {

 currentTime := time.Now().Unix()
 expires := currentTime + expireTime
 redisAlias := "jointly"

 // 1.获取锁,并将value值设置为锁的超时时间
 redisRet, err := redis.SetNx(redisAlias, key, expires)
 if nil == err && utils.MustInt64(1) == redisRet {
  // 成功获取到锁
  return true
 }

 // 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁
 // 2.1 获取锁的超时时间
 currentLockTime, err := redis.GetKey(redisAlias, key)
 if err != nil {
  return false
 }

 // 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回
 if utils.MustInt64(currentLockTime) >= currentTime {
  return false
 }

 // 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间
 oldLockTime, err := redis.GetSet(redisAlias, key, expires)
 if err != nil {
  return false
 }

 // 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁
 // 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间
 if utils.MustString(oldLockTime) == currentLockTime {
  return true
 }
 return false
}

对于里面的一些函数utils.MustString()、utils.MustInt64(),其实就是一些底层封装好的类型转换函数,应该不会影响大家理解哈,如果想直接拿去使用,这里需要简单修改一下。

再看看删除锁的逻辑:

// 删除分布式锁
// @return bool true-删除成功;false-删除失败
func DelDistributeLock(key string) bool {
 redisAlias := "jointly"
 redisRet := redis.Del(redisAlias, key)
 if redisRet != nil {
  return false
 }
 return true
}

然后是业务处理逻辑:

func DoProcess(processId int) {

 fmt.Printf("启动第%d个线程\n", processId)

 redisKey := "redis_lock_key"
 for {
  // 获取分布式锁
  isGetLock := GetDistributeLock(redisKey, 10)
  if isGetLock {
   fmt.Printf("Get Redis Key Success, id:%d\n", processId)
   time.Sleep(time.Second * 3)
   // 删除分布式锁
   DelDistributeLock(redisKey)
  } else {
   // 如果未获取到该锁,为了避免redis负载过高,先睡一会
   time.Sleep(time.Second * 1)
  }
 }
}

最后起个10个多线程,去执行这个DoProcess():

func main() {
 // 初始化资源
 var group string = "i18n"
 var name string = "jointly_shop"
 var host string

 // 初始化资源
 host = "http://ip:port"
 _, err := xrpc.NewXRpcDefault(group, name, host)
 if err != nil {
  panic(fmt.Sprintf("initRpc when init rpc  failed, err:%v", err))
 }
 redis.SetRedis("jointly""redis_jointly")

 // 开启10个线程,去抢Redis分布式锁
 for i := 0; i <= 9; i ++ {
  go DoProcess(i)
 }

 // 避免子线程退出,主线程睡一会
 time.Sleep(time.Second * 100)
 return
}

程序跑了100s,最后是执行结果:

启动第0个线程
启动第6个线程
启动第9个线程
启动第4个线程
启动第5个线程
启动第2个线程
启动第1个线程
启动第8个线程
启动第7个线程
启动第3个线程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3

遇到的坑

中间出现过一些坑,我简单说一下:

  • 之前我们做个一次服务迁移,需要将物理机迁移到Neo云,当把流量从物理机迁移Neo云后,千万不要忘了停掉物理机上的定时任务,否则物理机会去抢占这个分布式锁,特别是代码有变更后,物理机如果抢到锁,会继续执行旧的代码,那就是个大坑了。
  • 不要轻易去修改分布式锁的超时时间,之前为了能快速排查问题,修改过一次,然后出现了非常诡异的问题,当时排查了一天,具体问题也记不太清了,大家感兴趣,可以自己模拟一下。

后记

这个分布式锁其实是我2019年写的,已经在线上跑了2年,只需要进行简单修改,就可以放到线上跑,不用担心里面有坑哈,因为坑已经被我趟过了。

上周写了一篇限流的文章,加上今天这个分布式锁,其实都是最近项目中使用的,所以就整理一下,其实我最想写的,是任务分片的实现方式,也是最近在公司做异步任务时Get到的新技能,它支持多机并发执行一个任务,是不是很神奇,后面会分享给大家。

更多文章,请关注微信公众号“楼仔进阶之路”,点关注,不迷路~~

楼仔

2021/04/24  阅读:50  主题:默认主题

作者介绍

楼仔