通过 MapReduce 降低服务响应时间 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
kevinwan
V2EX    Go 编程语言

通过 MapReduce 降低服务响应时间

  •  1
     
  •   kevinwan 2020-09-12 15:59:23 +08:00 1623 次点击
    这是一个创建于 1922 天前的主题,其中的信息可能已经有所发展或是发生改变。

    通过 MapReduce 降低服务响应时间

    在微服务中开发中,api 网关扮演对外提供 restful api 的角色,而 api 的数据往往会依赖其他服务,复杂的 api 更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个 api 的耗时将会大大增加。

    那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go 基础库中为我们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且 WaitGroup 中对变量的赋值往往需要加锁,每个依赖函数都需要添加 Add 和 Done 对于新手来说比较容易出错

    基于以上的背景,go-zero 框架中为我们提供了并发处理工具MapReduce,该工具开箱即用,不需要做什么初始化,我们通过下图看下使用 MapReduce 和没使用的耗时对比:

    依赖耗时对比

    相同的依赖,串行处理的话需要 200ms,使用 MapReduce 后的耗时等于所有依赖中最大的耗时为 100ms,可见 MapReduce 可以大大降低服务耗时,而且随着依赖的增加效果就会越明显,减少处理耗时的同时并不会增加服务器压力

    并发处理工具MapReduce

    MapReduce是 Google 提出的一个软件架构,用于大规模数据集的并行运算,go-zero 中的 MapReduce 工具正是借鉴了这种架构思想

    go-zero 框架中的 MapReduce 工具主要用来对批量数据进行并发的处理,以此来提升服务的性能

    mapreduce 原理图

    我们通过几个示例来演示 MapReduce 的用法

    MapReduce 主要有三个参数,第一个参数为 generate 用以生产数据,第二个参数为 mapper 用以对数据进行处理,第三个参数为 reducer 用以对 mapper 后的数据做聚合返回,还可以通过 opts 选项设置并发处理的线程数量

    场景一: 某些功能的结果往往需要依赖多个服务,比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等,一般被依赖的服务都是以 rpc 的形式对外提供,为了降低依赖的耗时我们往往需要对依赖做并行处理

    func productDetail(uid, pid int64) (*ProductDetail, error) { var pd ProductDetail err := mr.Finish(func() (err error) { pd.User, err = userRpc.User(uid) return }, func() (err error) { pd.Store, err = storeRpc.Store(pid) return }, func() (err error) { pd.Order, err = orderRpc.Order(pid) return }) if err != nil { log.Printf("product detail error: %v", err) return nil, err } return &pd, nil } 

    该示例中返回商品详情依赖了多个服务获取数据,因此做并发的依赖处理,对接口的性能有很大的提升

    场景二: 很多时候我们需要对一批数据进行处理,比如对一批用户 id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的结果为效验合法的用户 id

    func checkLegal(uids []int64) ([]int64, error) { r, err := mr.MapReduce(func(source chan<- interface{}) { for _, uid := range uids { source <- uid } }, func(item interface{}, writer mr.Writer, cancel func(error)) { uid := item.(int64) ok, err := check(uid) if err != nil { cancel(err) } if ok { writer.Write(uid) } }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) { var uids []int64 for p := range pipe { uids = append(uids, p.(int64)) } writer.Write(uids) }) if err != nil { log.Printf("check error: %v", err) return nil, err } return r.([]int64), nil } func check(uid int64) (bool, error) { // do something check user legal return true, nil } 

    该示例中,如果 check 过程出现错误则通过 cancel 方法结束效验过程,并返回 error 整个效验过程结束,如果某个 uid 效验结果为 false 则最终结果不返回该 uid

    MapReduce 使用注意事项

    • mapper 和 reducer 中都可以调用 cancel,参数为 error,调用后立即返回,返回结果为 nil, error
    • mapper 中如果不调用 writer.Write 则 item 最终不会被 reducer 聚合
    • reducer 中如果不调用 writer.Wirte 则返回结果为 nil, ErrReduceNoOutput
    • reducer 为单线程,所有 mapper 出来的结果在这里串行聚合

    实现原理分析:

    MapReduce 中首先通过 buildSource 方法通过执行 generate(参数为无缓冲 channel)产生数据,并返回无缓冲的 channel,mapper 会从该 channel 中读取数据

    func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{}) go func() { defer close(source) generate(source) }() return source } 

    在 MapReduceWithSource 方法中定义了 cancel 方法,mapper 和 reducer 中都可以调用该方法,调用后主线程收到 close 信号会立马返回

    cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { // 默认的 error retErr.Set(ErrCancelWithNil) } drain(source) // 调用 close(ouput)主线程收到 Done 信号,立马返回 finish() }) 

    在 mapperDispatcher 方法中调用了 executeMappers,executeMappers 消费 buildSource 产生的数据,每一个 item 都会起一个 goroutine 单独处理,默认最大并发数为 16,可以通过 WithWorkers 进行设置

    var wg sync.WaitGroup defer func() { wg.Wait() // 保证所有的 item 都处理完成 close(collector) }() pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) // 将 mapper 处理完的数据写入 collector for { select { case <-done: // 当调用了 cancel 会触发立即返回 return case pool <- lang.Placeholder: // 控制最大并发数 item, ok := <-input if !ok { <-pool return } wg.Add(1) go func() { defer func() { wg.Done() <-pool }() mapper(item, writer) // 对 item 进行处理,处理完调用 writer.Write 把结果写入 collector 对应的 channel 中 }() } } 

    reducer 单 goroutine 对数 mapper 写入 collector 的数据进行处理,如果 reducer 中没有手动调用 writer.Write 则最终会执行 finish 方法对 output 进行 close 避免死锁

    go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) }() 

    在该工具包中还提供了许多针对不同业务场景的方法,实现原理与 MapReduce 大同小异,感兴趣的同学可以查看源码学习

    • MapReduceVoid 功能和 MapReduce 类似但没有结果返回只返回 error
    • Finish 处理固定数量的依赖,返回 error,有一个 error 立即返回
    • FinishVoid 和 Finish 方法功能类似,没有返回值
    • Map 只做 generate 和 mapper 处理,返回 channel
    • MapVoid 和 Map 功能类似,无返回

    本文主要介绍了 go-zero 框架中的 MapReduce 工具,在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。

    项目地址

    https://github.com/tal-tech/go-zero

    微信交流群

    1 条回复    2020-09-12 20:33:40 +08:00
    kangsheng9527
        1
    kangsheng9527  
       2020-09-12 20:33:40 +08:00
    大神之路免中介。。。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     4725 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 01:08 PVG 09:08 LAX 17:08 JFK 20:08
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86