当前位置:首页 > 科技  > 软件

Go-Zero 是如何实现令牌桶限流的?

来源: 责编: 时间:2023-08-14 22:01:53 3408观看
导读上一篇文章介绍了 如何实现计数器限流。主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。但是采用固定窗口实现的限流器会有两个问题:会出现请求量超出限制值两倍的情况无

上一篇文章介绍了 如何实现计数器限流。主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。8ZL28资讯网——每日最新资讯28at.com

但是采用固定窗口实现的限流器会有两个问题:8ZL28资讯网——每日最新资讯28at.com

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。8ZL28资讯网——每日最新资讯28at.com

工作原理

算法概念如下:8ZL28资讯网——每日最新资讯28at.com

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

图片图片8ZL28资讯网——每日最新资讯28at.com

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。8ZL28资讯网——每日最新资讯28at.com

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。8ZL28资讯网——每日最新资讯28at.com

// core/limit/tokenlimit.go// 生成 token 速率script = `local rate = tonumber(ARGV[1])// 通容量local capacity = tonumber(ARGV[2])// 当前时间戳local now = tonumber(ARGV[3])// 请求数量local requested = tonumber(ARGV[4])// 需要多少秒才能把桶填满local fill_time = capacity/rate// 向下取整,ttl 为填满时间 2 倍local ttl = math.floor(fill_time*2)// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量local last_tokens = tonumber(redis.call("get", KEYS[1]))if last_tokens == nil then    last_tokens = capacityend// 上次请求时间戳,如果为 nil 则赋值 0local last_refreshed = tonumber(redis.call("get", KEYS[2]))if last_refreshed == nil then    last_refreshed = 0end// 距离上一次请求的时间跨度local delta = math.max(0, now-last_refreshed)// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和// 与桶容量比较,取二者的小值local filled_tokens = math.min(capacity, last_tokens+(delta*rate))// 判断请求数量和桶内 token 数量的大小local allowed = filled_tokens >= requested// 被请求消耗掉之后,更新剩余 token 数量local new_tokens = filled_tokensif allowed then    new_tokens = filled_tokens - requestedend// 更新 redis tokenredis.call("setex", KEYS[1], ttl, new_tokens)// 更新 redis 刷新时间redis.call("setex", KEYS[2], ttl, now)return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。8ZL28资讯网——每日最新资讯28at.com

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。8ZL28资讯网——每日最新资讯28at.com

限流器初始化:8ZL28资讯网——每日最新资讯28at.com

// A TokenLimiter controls how frequently events are allowed to happen with in one second.type TokenLimiter struct {    // 生成 token 速率    rate           int    // 桶容量    burst          int    store          *redis.Redis    // 桶 key    tokenKey       string    // 桶刷新时间 key    timestampKey   string    rescueLock     sync.Mutex    // redis 健康标识    redisAlive     uint32    // redis 健康监控启动状态    monitorStarted bool    // 内置单机限流器    rescueLimiter  *xrate.Limiter}// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits// bursts of at most burst tokens.func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {    tokenKey := fmt.Sprintf(tokenFormat, key)    timestampKey := fmt.Sprintf(timestampFormat, key)    return &TokenLimiter{        rate:          rate,        burst:         burst,        store:         store,        tokenKey:      tokenKey,        timestampKey:  timestampKey,        redisAlive:    1,        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),    }}

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。8ZL28资讯网——每日最新资讯28at.com

图片图片8ZL28资讯网——每日最新资讯28at.com

提供了四个可调用方法:8ZL28资讯网——每日最新资讯28at.com

// Allow is shorthand for AllowN(time.Now(), 1).func (lim *TokenLimiter) Allow() bool {    return lim.AllowN(time.Now(), 1)}// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {    return lim.AllowNCtx(ctx, time.Now(), 1)}// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate.// Otherwise, use Reserve or Wait.func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {    return lim.reserveN(context.Background(), now, n)}// AllowNCtx reports whether n events may happen at time now with incoming context.// Use this method if you intend to drop / skip events that exceed the rate.// Otherwise, use Reserve or Wait.func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {    return lim.reserveN(ctx, now, n)}

最终调用的都是 reverveN 方法:8ZL28资讯网——每日最新资讯28at.com

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {    // 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器    if atomic.LoadUint32(&lim.redisAlive) == 0 {        return lim.rescueLimiter.AllowN(now, n)    }    // 执行限流脚本    resp, err := lim.store.EvalCtx(ctx,        script,        []string{            lim.tokenKey,            lim.timestampKey,        },        []string{            strconv.Itoa(lim.rate),            strconv.Itoa(lim.burst),            strconv.FormatInt(now.Unix(), 10),            strconv.Itoa(n),        })    // redis allowed == false    // Lua boolean false -> r Nil bulk reply    if err == redis.Nil {        return false    }    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {        logx.Errorf("fail to use rate limiter: %s", err)        return false    }    if err != nil {        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)        // 如果有异常的话,会启动进程内限流        lim.startMonitor()        return lim.rescueLimiter.AllowN(now, n)    }    code, ok := resp.(int64)    if !ok {        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)        lim.startMonitor()        return lim.rescueLimiter.AllowN(now, n)    }    // redis allowed == true    // Lua boolean true -> r integer reply with value of 1    return code == 1}

最后看一下进程内限流的启动与恢复:8ZL28资讯网——每日最新资讯28at.com

func (lim *TokenLimiter) startMonitor() {    lim.rescueLock.Lock()    defer lim.rescueLock.Unlock()    // 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动    if lim.monitorStarted {        return    }    lim.monitorStarted = true    atomic.StoreUint32(&lim.redisAlive, 0)    go lim.waitForRedis()}func (lim *TokenLimiter) waitForRedis() {    ticker := time.NewTicker(pingInterval)    // 更新监控进程的状态    defer func() {        ticker.Stop()        lim.rescueLock.Lock()        lim.monitorStarted = false        lim.rescueLock.Unlock()    }()    for range ticker.C {        // 对 redis 进行健康监测,如果 redis 服务恢复了        // 则更新 redisAlive 标识,并退出 goroutine        if lim.store.Ping() {            atomic.StoreUint32(&lim.redisAlive, 1)            return        }    }}

参考文章:8ZL28资讯网——每日最新资讯28at.com

  • https://juejin.cn/post/7052171117116522504
  • https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

本文链接:http://www.28at.com/showinfo-26-5770-0.htmlGo-Zero 是如何实现令牌桶限流的?

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 「Go面经」算法 并发模型 缓存落盘 etcd actor模型

下一篇: 阿里云还会继续降价吗?

标签:
  • 热门焦点
  • 一加Ace2 Pro真机揭晓 钛空灰配色质感拉满

    终于,在经过了几波预热之后,一加Ace2 Pro的外观真机图在网上出现了。还是博主数码闲聊站曝光的,这次的外观设计还是延续了一加11的方案,只是细节上有了调整,例如新加入了钛空灰
  • 直屏旗舰来了 iQOO 12和K70 Pro同台竞技

    旗舰机基本上使用的都是双曲面屏幕,这就让很多喜欢直屏的爱好者在苦等一款直屏旗舰,这次,你们等到了。据博主数码闲聊站带来的最新爆料称,Redmi下代旗舰K70 Pro和iQOO 12两款手
  • 线程通讯的三种方法!通俗易懂

    线程通信是指多个线程之间通过某种机制进行协调和交互,例如,线程等待和通知机制就是线程通讯的主要手段之一。 在 Java 中,线程等待和通知的实现手段有以下几种方式:Object 类下
  • 多线程开发带来的问题与解决方法

    使用多线程主要会带来以下几个问题:(一)线程安全问题  线程安全问题指的是在某一线程从开始访问到结束访问某一数据期间,该数据被其他的线程所修改,那么对于当前线程而言,该线程
  • 自动化在DevOps中的力量:简化软件开发和交付

    自动化在DevOps中扮演着重要角色,它提升了DevOps的效能。通过自动化工具和方法,DevOps团队可以实现以下目标:消除手动和重复性任务。简化流程。在整个软件开发生命周期中实现更
  • 华为举行春季智慧办公新品发布会 首次推出电子墨水屏平板

    北京时间2月27日晚,华为在巴塞罗那举行春季智慧办公新品发布会,在海外市场推出之前已经在中国市场上市的笔记本、平板、激光打印机等办公产品,并首次推出搭载
  • 英特尔Xe HPG游戏显卡:拥有512EU,单风扇版本

    据10 月 30 日外媒 TheVerge 消息报道,英特尔 Xe HPG Arc Alchemist 的正面实被曝光,不仅拥有 512 EU 版显卡,还拥有 128EU 的单风扇版本。另外,这款显卡 PCB
  • 上海举办人工智能大会活动,建设人工智能新高地

    人工智能大会在上海浦江两岸隆重拉开帷幕,人工智能新技术、新产品、新应用、新理念集中亮相。8月30日晚,作为大会的特色活动之一的上海人工智能发展盛典人工
  • “买真退假” 这种“羊毛”不能薅

    □ 法治日报 记者 王春   □ 本报通讯员 胡佳丽  2020年初,还在上大学的小东加入了一个大学生兼职QQ群。群主“七王”在群里介绍一些刷单赚
Top