当前位置:首页 > 科技  > 软件

讲完Go并发控制,讲讲并发抑制

来源: 责编: 时间:2024-06-17 17:40:32 233观看
导读已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。当多个相同的关键词并发查询(

已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。NO428资讯网——每日最新资讯28at.com

让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。NO428资讯网——每日最新资讯28at.com

当多个相同的关键词并发查询(调用search函数)时,我们希望只产生一次数据库调用(调用query),第一个查询未完成时后续的重复查询会等待,当第一个查询完成时则会与其他查询分享结果,这样一来虽然只执行了一次数据库调用但是所有查询都拿到了最终的结果。NO428资讯网——每日最新资讯28at.com

图片图片NO428资讯网——每日最新资讯28at.com

什么是并发抑制:NO428资讯网——每日最新资讯28at.com

package mainimport ( "context" "fmt" "sync" "time" "golang.org/x/sync/errgroup")func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}// 实现search,在重复并发调用下仅执行一次query// 其他并发共享这次query的结果func search(ctx context.Context, word string) (string, error) {    return query(ctx, word)}func coSearch(ctx context.Context, words []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) results := make([]string, len(words)) for i, word := range words {  i, word := i, word  g.Go(func() error {   result, err := search(ctx, word)   if err != nil {    return err   }   results[i] = result   return nil  }) } err := g.Wait() return results, err}func main() { words := []string{"Go","Go", "Go", "Rust", "PHP", "JavaScript", "Java"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}

好了,可以先暂停想想该如何实现search函数了。NO428资讯网——每日最新资讯28at.com

一步一步实现并发抑制

我们先假设所有查询关键词都一样,那么问题简化成并发执行search时,只在第一次search时调用query,其他的search并发调用等待并共享这次的查询结果。NO428资讯网——每日最新资讯28at.com

通过waiting变量,其他goroutine等待第一个goroutine数据库调用完成,那么如何让其他goroutine等待在这个位置呢?NO428资讯网——每日最新资讯28at.com

func main() { words := []string{"Go", "Go", "Go", "Go", "Go"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}var ( waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) {  if waiting {    // 等待resp, err被赋值,即第一个query完成后再返回    // ...?      return resp, err  }  waiting = true  resp, err = query(ctx, word)  waiting = false  return resp, err}func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}

sync.WaitGroup{}并发控制

sync.WaitGroup{}是并发控制的核心,这里再次重申下用法:NO428资讯网——每日最新资讯28at.com

  • 当新运行一个goroutine时,我们需要调用wg.Add(1)。
  • 当一个goroutine运行完成的时候,我们需要调用wg.Done()。
  • wg.Wait()让程序阻塞在此处,直到所有的goroutine运行完毕。

利用 sync.WaitGroup{}便可实现上文代码中等待的效果:NO428资讯网——每日最新资讯28at.com

var ( wg      sync.WaitGroup waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) { if waiting {  // 其他goroutine等待第一个goroutine执行完成  wg.Wait()  return resp, err } waiting = true     wg.Add(1) resp, err = query(ctx, word)    wg.Done() // 第一个goroutine执行完成     waiting = false return resp, err}

并发安全

当多个goroutine对同一个内存区域进行读写时,就会产生并发安全的问题,它会导致程序运行的结果不符合预期,而上文的程序并发的读写了waiting变量,需要给waiting变量加把锁。NO428资讯网——每日最新资讯28at.com

释放锁的位置非常的有技巧,如果在在wg.Add(1)之前mu.Unlock(),可能 wg.Add(1)还未来得执行其他goroutine已经执行了wg.Wait(),并获取到了错误的数据。NO428资讯网——每日最新资讯28at.com

unlock在add之前;NO428资讯网——每日最新资讯28at.com

var (  wg      sync.WaitGroup  mu      sync.Mutex  waiting bool  resp    string  err     error)func search(ctx context.Context, word string) (string, error) { mu.Lock() if waiting {  mu.Unlock()  wg.Wait()  return resp, err } waiting = true     wg.Add(1)    // 在wg.Add(1)之后释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() resp, err = query(ctx, word)    wg.Done() mu.Lock() waiting = false mu.Unlock()      return resp, err}

完整版本

现在可以针对不同的关键词做区分了,使用一个map来代替原有的waiting,并将每一个关键词查询的WaitGroup和结果打包到map的value中。NO428资讯网——每日最新资讯28at.com

type call struct { wg   sync.WaitGroup resp string err  error}var (    mu sync.Mutex    m = make(map[string]*call))func search(ctx context.Context, word string) (string, error) { mu.Lock() if c, ok := m[word]; ok {  mu.Unlock()  c.wg.Wait()  return c.resp, c.err } c := &call{} m[word] = c c.wg.Add(1) // 在wg.Add(1)之后才释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() c.resp, c.err = query(ctx, word) c.wg.Done() mu.Lock() delete(m, word) mu.Unlock() return c.resp, c.err}

开源库 golang.org/x/sync/singleflight

上面一步一步教大家手搓了一个并发抑制的逻辑,我们的基本逻辑和开源库golang.org/x/sync/singleflight没有区别,只是singleflight内部实现更加严谨NO428资讯网——每日最新资讯28at.com

直接使用singleflight非常简单的就可以实现我们的诉求NO428资讯网——每日最新资讯28at.com

  • singleflight.Group 创建一个需要并发控制的范围
  • Do函数

第一个参数接收一个key来判断否重复调用NO428资讯网——每日最新资讯28at.com

第二个参数为要执行的函数,函数可以返回正常值或者errorNO428资讯网——每日最新资讯28at.com

Do函数返回值除了闭包函数的返回值之外,还返回了此次返回值是否由其他goroutine共享NO428资讯网——每日最新资讯28at.com

import ( "golang.org/x/sync/singleflight")var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  return query(ctx, word) }) return resp.(string), err}

错误处理

因为共享第一个goroutine的结果,因此如果第一次调用失败,那其他goroutine也都会失败NO428资讯网——每日最新资讯28at.com

如果在某些场景下允许第一个调用失败后再次尝试调用该函数,那么可以通过调用Forget方法来忘记这个keyNO428资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  val, err := query(ctx, word)  // 当出错并且允许重试时  if err != nil && true {   g.Forget(word)   return "", err  }  return val, err }) return resp.(string), err}

超时控制

当使用Do函数时,如果query长时间未响应(这里假设qeury不具备超时能力),那么所有的goroutine都会被阻塞并等待,利用DoChan+select可以实现超时逻辑NO428资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) {    ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel()     result := g.DoChan(word, func() (interface{}, error) {  return query(ctx, word) }) select { case r := <-result:  return r.Val.(string), r.Err case <-ctx.Done():  return "", ctx.Err() }}

使用场景

预防缓存穿透NO428资讯网——每日最新资讯28at.com

在高并发的状态下,一般会给热点数据设置缓存。但数据第一次访问或者缓存失效的状态下,如果直接去查询数据库,会给数据库造成极大压力,甚至直接打爆数据库。NO428资讯网——每日最新资讯28at.com

以上各种分享中被反复提到的场景,但!注意!使用singleflight就一劳永逸了么,不是的,在大规模集群下可能有数百台机器,当处在高并发状态时,即使每台机器只发起一个请求,也足以打爆你的数据库!结合实际,搭配适当的缓存策略、数据预热、限流等手段才能避免潜在的风险。挖个坑,以后有机会聊聊这些问题NO428资讯网——每日最新资讯28at.com

总结

本篇作为一个例子,给你讲透典型的Go并发控制的姊妹篇,讲述了另外一种并发控制模型,并介绍了开源库golang.org/x/sync/singleflight。NO428资讯网——每日最新资讯28at.com

当由一个goroutine并发向下发展成多个goroutine时,使用golang.org/x/sync/errgroupNO428资讯网——每日最新资讯28at.com

当多个goroutine并发向下抑制成一个goroutine时,使用golang.org/x/sync/singleflightNO428资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-94296-0.html讲完Go并发控制,讲讲并发抑制

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 超火超实用的 10 个前端工具库,可能就是你一直在寻找的!

下一篇: 盘点历届 Java 语言的关键字,一定有你不认识的

标签:
  • 热门焦点
  • 5月安卓手机好评榜:魅族20 Pro夺冠

    性能榜和性价比榜之后,我们来看最后的安卓手机好评榜,数据来源安兔兔评测,收集时间2023年5月1日至5月31日,仅限国内市场。第一名:魅族20 Pro好评率:97.50%不得不感慨魅族老品牌还
  • 如何通过Python线程池实现异步编程?

    线程池的概念和基本原理线程池是一种并发处理机制,它可以在程序启动时创建一组线程,并将它们置于等待任务的状态。当任务到达时,线程池中的某个线程会被唤醒并执行任务,执行完任
  • 从零到英雄:高并发与性能优化的神奇之旅

    作者 | 波哥审校 | 重楼作为公司的架构师或者程序员,你是否曾经为公司的系统在面对高并发和性能瓶颈时感到手足无措或者焦头烂额呢?笔者在出道那会为此是吃尽了苦头的,不过也得
  • 2天涨粉255万,又一赛道在抖音爆火

    来源:运营研究社作者 | 张知白编辑 | 杨佩汶设计 | 晏谈梦洁这个暑期,旅游赛道彻底火了:有的「地方」火了&mdash;&mdash;贵州村超旅游收入 1 个月超过 12 亿;有的「博主」火了&m
  • 梁柱接棒两年,腾讯音乐闯出新路子

    文丨田静 出品丨牛刀财经(niudaocaijing)7月5日,企鹅FM发布官方公告称由于业务调整,将于9月6日正式停止运营,这意味着腾讯音乐长音频业务走向消亡。腾讯在长音频领域还在摸索。为
  • 阿里大调整

    来源:产品刘有媒体报道称,近期淘宝天猫集团启动了近年来最大的人力制度改革,涉及员工绩效、层级体系等多个核心事项,目前已形成一个初步的&ldquo;征求意见版&rdquo;:1、取消P序列
  • iQOO Neo8 Pro评测:旗舰双芯加持 最强性能游戏旗舰

    【Techweb评测】去年10月,iQOO推出了一款Neo7手机,该机搭载了联发科天玑9000+,配备独显芯片Pro+,带来了同价位段最佳的游戏体验,一经上市便受到了诸多用
  • 联想小新Pad Pro 12.6将要推出,搭载高通骁龙 870 处理器

    联想小新Pad Pro 12.6将于秋季新品会上推出,官方按照惯例直接在发布会前给出了机型的所有参数。联想小新 Pad Pro 12.6 将搭载高通骁龙 870 处理器,重量为 5
  • 微软发布Windows 11新版 引入全新任务栏状态

    近日,微软发布了Windows 11新版,而Build 22563更新主要引入了几周前曝光的平板模式任务栏等,系统更流畅了。更新中,Windows 11加入了专门针对平板优化的任务栏
Top