《Go 语言并发之道》读书笔记(七)
今天这篇笔记我们来学习Go 限流
限流是分布式系统中经常需要用到的技术,因为我们让请求没有限制,很容易就出现某个用户开很多线程把我们的服务拉跨,进而影响到别的用户。
限流
我们来看下Go语言层面可以怎么做到限流,先看一段不限流的代码,
type APIConnection struct{}
func Open() *APIConnection {
return &APIConnection{}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
//假装我们在这里有运行
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
//假装我们在这里有运行
return nil
}
func main() {
defer log.Printf("Done")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile : %v", err)
}
log.Printf("ReadFile")
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress : %v", err)
}
log.Printf("ResolveAddress")
}()
}
wg.Wait()
}
上面的代码我们定义了两个假想的方法ReadFile 和 ResolveAddress, 假设他们是去访问文件和读取网络,都是比较耗资源的操作。然后开启了20个goroutine去调用这两个方法
这段代码的运行结果如下
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 Done
我们可以看到一瞬间就都运行完了,如果我们访问了实际的资源,然后又开了很多的goroutine,那么很容易就耗尽资源。 为了防止这样的事情发生,我们引入限流,限定一段时间内,只能访问一定的资源。 我们今天要讲的是基于令牌桶算法的限速,令牌桶是什么算法呢? 很简单就是有一个基础的令牌数d, 然后有固定的速度r往令牌桶中放令牌, 用户拿到令牌才能进行下一步,拿不到就等待。
我们来看代码
type APIConnection struct {
rateLimiter *rate.Limiter
}
func Open() *APIConnection {
return &APIConnection{
rateLimiter: rate.NewLimiter(rate.Limit(2), 5),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
main func我们没有修改,这里只是在APIConnection 中增加了一个
rateLimiter: rate.NewLimiter(rate.Limit(2), 5)
rate是golang.org/x/time/rate 下面的一个包, rate.NewLimiter是限速器,方法定义如下
func NewLimiter(r Limit, b int) *Limiter
r就是我们前面说的速率,每秒多少个令牌
b 就是令牌桶的高度,开始的时候有几个。
然后在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx), Wait就是等待有令牌出现。
运行的结果如下所示
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ResolveAddress
02:48:17 ResolveAddress
02:48:17 ResolveAddress
02:48:18 ReadFile
02:48:18 ResolveAddress
02:48:19 ResolveAddress
02:48:19 ResolveAddress
02:48:20 ResolveAddress
02:48:20 ReadFile
02:48:21 ResolveAddress
02:48:21 ReadFile
02:48:22 ReadFile
02:48:22 ReadFile
02:48:23 ResolveAddress
02:48:23 ReadFile
02:48:24 ResolveAddress
02:48:24 Done
通过时间我们可以看到前面很快执行了5次,就是拿到了令牌桶中的5个令牌,后面每秒中执行两次,也就是我们的速率2个/秒。程序运行符合我们预期,达到了限速的效果。
组合限流
书中作者还举了两个例子,运用组合来限速,比如要求一秒中不能超过两个,同时一分钟不能超过10个。 属于Go语言的一点组合功能,示例代码如下
type RateLimiter interface {
Wait(context.Context) error
Limit() rate.Limit
}
type multiLimiter struct {
limiters []RateLimiter
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit)
return &multiLimiter{limiters: limiters}
}
func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit()
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
type APIConnection struct {
rateLimiter RateLimiter
}
func Open() *APIConnection {
secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
定义了multiLimiter来组合这些限速器,然后定义了Wait方法。比较简单,这里不详述
还可以不同的设备分不同的限速器, 这里也是贴出代码不详述
type APIConnection struct {
networkLimit,
diskLimit,
apiLimit RateLimiter
}
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter(
rate.NewLimiter(Per(2, time.Second), 1),
rate.NewLimiter(Per(10, time.Minute), 10),
),
diskLimit: MultiLimiter(
rate.NewLimiter(rate.Limit(1), 1),
),
networkLimit: MultiLimiter(
rate.NewLimiter(Per(3, time.Second), 3),
),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil {
return err
}
return nil
}
不同用户限流
我们做web请求的时候,会遇到这样的需求,根据不同的用户给不同的限速,这里简单的给个sample, 其实就是用map把用户和限速器关联起来。
var userLimit = make(map[string]*rate.Limiter)
func doWork(user string) {
if userLimit[user].Allow() {
log.Printf("%s do work
", user)
} else {
log.Printf("%s not work
", user)
}
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1)
userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5)
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user1")
}()
time.Sleep(500 * time.Millisecond)
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user2")
}()
time.Sleep(500 * time.Millisecond)
}
wg.Wait()
}
上面的例子中用户1被限制1秒访问2次,用户2被限制1分钟访问2次
03:19:14 user1 do work
03:19:15 user1 do work
03:19:15 user1 do work
03:19:16 user1 do work
03:19:16 user1 do work
03:19:17 user1 do work
03:19:17 user1 do work
03:19:18 user1 do work
03:19:18 user1 do work
03:19:19 user1 do work
03:19:20 user2 do work
03:19:20 user2 do work
03:19:21 user2 do work
03:19:21 user2 do work
03:19:22 user2 do work
03:19:22 user2 not work
03:19:23 user2 not work
03:19:23 user2 not work
03:19:24 user2 not work
03:19:24 user2 not work
这里用户1基本能得到执行,用户2执行了5次后,由于没有拿到令牌,就不能work了。这样达到了不同用户,不同的限速器。
总结
对于限速,可以在服务器层面进行限速,我们这里是在后台程序端进行限速, 也有不少现成的解决方案 https://www.jianshu.com/p/c13843d2e1ec
对于分布式的系统这样的限速自然是不够的,可以结合redis的功能来进行限速,网上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822
没有实际操作,后面我们再实际操作下再来记录。