当前位置:首页 > 科技  > 软件

讲完Go并发控制,讲讲并发抑制

来源: 责编: 时间:2024-06-17 17:40:32 213观看
导读已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。当多个相同的关键词并发查询(

已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。QU928资讯网——每日最新资讯28at.com

让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。QU928资讯网——每日最新资讯28at.com

当多个相同的关键词并发查询(调用search函数)时,我们希望只产生一次数据库调用(调用query),第一个查询未完成时后续的重复查询会等待,当第一个查询完成时则会与其他查询分享结果,这样一来虽然只执行了一次数据库调用但是所有查询都拿到了最终的结果。QU928资讯网——每日最新资讯28at.com

图片图片QU928资讯网——每日最新资讯28at.com

什么是并发抑制:QU928资讯网——每日最新资讯28at.com

package mainimport ( "context" "fmt" "sync" "time" "golang.org/x/sync/errgroup")func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}// 实现search,在重复并发调用下仅执行一次query// 其他并发共享这次query的结果func search(ctx context.Context, word string) (string, error) {    return query(ctx, word)}func coSearch(ctx context.Context, words []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) results := make([]string, len(words)) for i, word := range words {  i, word := i, word  g.Go(func() error {   result, err := search(ctx, word)   if err != nil {    return err   }   results[i] = result   return nil  }) } err := g.Wait() return results, err}func main() { words := []string{"Go","Go", "Go", "Rust", "PHP", "JavaScript", "Java"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}

好了,可以先暂停想想该如何实现search函数了。QU928资讯网——每日最新资讯28at.com

一步一步实现并发抑制

我们先假设所有查询关键词都一样,那么问题简化成并发执行search时,只在第一次search时调用query,其他的search并发调用等待并共享这次的查询结果。QU928资讯网——每日最新资讯28at.com

通过waiting变量,其他goroutine等待第一个goroutine数据库调用完成,那么如何让其他goroutine等待在这个位置呢?QU928资讯网——每日最新资讯28at.com

func main() { words := []string{"Go", "Go", "Go", "Go", "Go"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}var ( waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) {  if waiting {    // 等待resp, err被赋值,即第一个query完成后再返回    // ...?      return resp, err  }  waiting = true  resp, err = query(ctx, word)  waiting = false  return resp, err}func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}

sync.WaitGroup{}并发控制

sync.WaitGroup{}是并发控制的核心,这里再次重申下用法:QU928资讯网——每日最新资讯28at.com

  • 当新运行一个goroutine时,我们需要调用wg.Add(1)。
  • 当一个goroutine运行完成的时候,我们需要调用wg.Done()。
  • wg.Wait()让程序阻塞在此处,直到所有的goroutine运行完毕。

利用 sync.WaitGroup{}便可实现上文代码中等待的效果:QU928资讯网——每日最新资讯28at.com

var ( wg      sync.WaitGroup waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) { if waiting {  // 其他goroutine等待第一个goroutine执行完成  wg.Wait()  return resp, err } waiting = true     wg.Add(1) resp, err = query(ctx, word)    wg.Done() // 第一个goroutine执行完成     waiting = false return resp, err}

并发安全

当多个goroutine对同一个内存区域进行读写时,就会产生并发安全的问题,它会导致程序运行的结果不符合预期,而上文的程序并发的读写了waiting变量,需要给waiting变量加把锁。QU928资讯网——每日最新资讯28at.com

释放锁的位置非常的有技巧,如果在在wg.Add(1)之前mu.Unlock(),可能 wg.Add(1)还未来得执行其他goroutine已经执行了wg.Wait(),并获取到了错误的数据。QU928资讯网——每日最新资讯28at.com

unlock在add之前;QU928资讯网——每日最新资讯28at.com

var (  wg      sync.WaitGroup  mu      sync.Mutex  waiting bool  resp    string  err     error)func search(ctx context.Context, word string) (string, error) { mu.Lock() if waiting {  mu.Unlock()  wg.Wait()  return resp, err } waiting = true     wg.Add(1)    // 在wg.Add(1)之后释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() resp, err = query(ctx, word)    wg.Done() mu.Lock() waiting = false mu.Unlock()      return resp, err}

完整版本

现在可以针对不同的关键词做区分了,使用一个map来代替原有的waiting,并将每一个关键词查询的WaitGroup和结果打包到map的value中。QU928资讯网——每日最新资讯28at.com

type call struct { wg   sync.WaitGroup resp string err  error}var (    mu sync.Mutex    m = make(map[string]*call))func search(ctx context.Context, word string) (string, error) { mu.Lock() if c, ok := m[word]; ok {  mu.Unlock()  c.wg.Wait()  return c.resp, c.err } c := &call{} m[word] = c c.wg.Add(1) // 在wg.Add(1)之后才释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() c.resp, c.err = query(ctx, word) c.wg.Done() mu.Lock() delete(m, word) mu.Unlock() return c.resp, c.err}

开源库 golang.org/x/sync/singleflight

上面一步一步教大家手搓了一个并发抑制的逻辑,我们的基本逻辑和开源库golang.org/x/sync/singleflight没有区别,只是singleflight内部实现更加严谨QU928资讯网——每日最新资讯28at.com

直接使用singleflight非常简单的就可以实现我们的诉求QU928资讯网——每日最新资讯28at.com

  • singleflight.Group 创建一个需要并发控制的范围
  • Do函数

第一个参数接收一个key来判断否重复调用QU928资讯网——每日最新资讯28at.com

第二个参数为要执行的函数,函数可以返回正常值或者errorQU928资讯网——每日最新资讯28at.com

Do函数返回值除了闭包函数的返回值之外,还返回了此次返回值是否由其他goroutine共享QU928资讯网——每日最新资讯28at.com

import ( "golang.org/x/sync/singleflight")var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  return query(ctx, word) }) return resp.(string), err}

错误处理

因为共享第一个goroutine的结果,因此如果第一次调用失败,那其他goroutine也都会失败QU928资讯网——每日最新资讯28at.com

如果在某些场景下允许第一个调用失败后再次尝试调用该函数,那么可以通过调用Forget方法来忘记这个keyQU928资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  val, err := query(ctx, word)  // 当出错并且允许重试时  if err != nil && true {   g.Forget(word)   return "", err  }  return val, err }) return resp.(string), err}

超时控制

当使用Do函数时,如果query长时间未响应(这里假设qeury不具备超时能力),那么所有的goroutine都会被阻塞并等待,利用DoChan+select可以实现超时逻辑QU928资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) {    ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel()     result := g.DoChan(word, func() (interface{}, error) {  return query(ctx, word) }) select { case r := <-result:  return r.Val.(string), r.Err case <-ctx.Done():  return "", ctx.Err() }}

使用场景

预防缓存穿透QU928资讯网——每日最新资讯28at.com

在高并发的状态下,一般会给热点数据设置缓存。但数据第一次访问或者缓存失效的状态下,如果直接去查询数据库,会给数据库造成极大压力,甚至直接打爆数据库。QU928资讯网——每日最新资讯28at.com

以上各种分享中被反复提到的场景,但!注意!使用singleflight就一劳永逸了么,不是的,在大规模集群下可能有数百台机器,当处在高并发状态时,即使每台机器只发起一个请求,也足以打爆你的数据库!结合实际,搭配适当的缓存策略、数据预热、限流等手段才能避免潜在的风险。挖个坑,以后有机会聊聊这些问题QU928资讯网——每日最新资讯28at.com

总结

本篇作为一个例子,给你讲透典型的Go并发控制的姊妹篇,讲述了另外一种并发控制模型,并介绍了开源库golang.org/x/sync/singleflight。QU928资讯网——每日最新资讯28at.com

当由一个goroutine并发向下发展成多个goroutine时,使用golang.org/x/sync/errgroupQU928资讯网——每日最新资讯28at.com

当多个goroutine并发向下抑制成一个goroutine时,使用golang.org/x/sync/singleflightQU928资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-94296-0.html讲完Go并发控制,讲讲并发抑制

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 超火超实用的 10 个前端工具库,可能就是你一直在寻找的!

下一篇: 盘点历届 Java 语言的关键字,一定有你不认识的

标签:
  • 热门焦点
  • Mate60手机壳曝光 致敬自己的经典设计

    8月3日消息,今天下午博主数码闲聊站带来了华为Mate60的第三方手机壳图,可以让我们在真机发布之前看看这款华为全新旗舰的大致轮廓。从曝光的图片看,Mate 60背后摄像头面积依然
  • 跑分安卓第一!Redmi K60至尊版8月发布!卢伟冰:目标年度性能之王

    8月5日消息,Redmi K60至尊版将于8月发布,在此前举行的战略发布会上,官方该机将搭载搭载天玑9200+处理器,安兔兔V10跑分超177万分,是目前安卓阵营最高的分数
  • 一文看懂为苹果Vision Pro开发应用程序

    译者 | 布加迪审校 | 重楼苹果的Vision Pro是一款混合现实(MR)头戴设备。Vision Pro结合了虚拟现实(VR)和增强现实(AR)的沉浸感。其高分辨率显示屏、先进的传感器和强大的处理能力
  • 微信语音大揭秘:为什么禁止转发?

    大家好,我是你们的小米。今天,我要和大家聊一个有趣的话题:为什么微信语音不可以转发?这是一个我们经常在日常使用中遇到的问题,也是一个让很多人好奇的问题。让我们一起来揭开这
  • JavaScript学习 -AES加密算法

    引言在当今数字化时代,前端应用程序扮演着重要角色,用户的敏感数据经常在前端进行加密和解密操作。然而,这样的操作在网络传输和存储中可能会受到恶意攻击的威胁。为了确保数据
  • Python异步IO编程的进程/线程通信实现

    这篇文章再讲3种方式,同时讲4中进程间通信的方式一、 Python 中线程间通信的实现方式共享变量共享变量是多个线程可以共同访问的变量。在Python中,可以使用threading模块中的L
  • 拼多多APP上线本地生活入口,群雄逐鹿万亿市场

    Tech星球(微信ID:tech618)文 | 陈桥辉 Tech星球独家获悉,拼多多在其APP内上线了&ldquo;本地生活&rdquo;入口,位置较深,位于首页的&ldquo;充值中心&rdquo;内,目前主要售卖美食相关的
  • 华为HarmonyOS 4升级计划公布:首批34款机型今日开启公测

    8月4日消息,今天下午华为正式发布了HarmonyOS 4系统,在更流畅的前提下,还带来了不少新功能,UI设计也有变化,会让手机焕然一新。华为宣布,首批机型将会在
  • 华为和江淮汽车合作开发百万元问界MPV?双方回应来了

    8月1日消息,郭明錤今天在社交平台发文称,华为正在和江淮汽车合作,开发售价在100万元的问界MPV,预计在2024年第2季度量产,销量目标为上市首年交付5万辆。
Top