haiyux's blog

路漫漫其修远兮,吾将上下而求索


  • 首页

  • 归档

  • 关于我

  • 公益404

  • 搜索

从kratos分析breaker熔断器源码实现

时间: 2021-09-04 分类: microservice   go   字数: 1000 字 阅读: 2分钟 阅读次数:

为什么要用熔断

前面我们讲过限流保证服务的可用性,不被突如其来的流量打爆。但是两种情况是限流解决不了的。

  1. 如果我们服务只能处理1000QPS,但是有10wQPS打过来,服务还是会炸。因为拒绝请求也需要成本。
  2. 服务但是io型的,会把mysql,redis,mq等中间件打挂。

所以,我们遵循一个思路,可不可以client端在失败的多的时候就不调用了,直接返回错误呢?

什么是熔断

熔断器是为了当依赖的服务已经出现故障时,主动阻止对依赖服务的请求。保证自身服务的正常运行不受依赖服务影响,防止雪崩效应。

源码分析

源码地址

  • https://github.com/go-kratos/aegis/tree/main/circuitbreaker

CircuitBreaker 接口

type CircuitBreaker interface {
	Allow() error
	MarkSuccess()
	MarkFailed()
}
  1. Allow()
    • 判断熔断器是否允许通过
  2. MarkSuccess()
    • 熔断器成功的回调
  3. MarkFailed()
    • 熔断器失败的回调

Group 结构体

type Group struct {
   mutex sync.Mutex
   val   atomic.Value

   New func() CircuitBreaker
}
  1. mutex
    • 互斥锁,使val这个map不产生数据竞争
  2. val
    • map,存储name -> CircuitBreaker
  3. New
    • 生成一个CircuitBreaker

Get方法

// Get .
func (g *Group) Get(name string) CircuitBreaker {
	m, ok := g.val.Load().(map[string]CircuitBreaker)
	if ok {
		breaker, ok := m[name]
		if ok {
			return breaker // 很具name从val拿出 breaker 如果存在返回
		}
	}
	// slowpath for group don`t have specified name breaker.
	g.mutex.Lock()
	nm := make(map[string]CircuitBreaker, len(m)+1)
	for k, v := range m {
		nm[k] = v
	}
	breaker := g.New()
	nm[name] = breaker // 如果不存在 生成一个 并放入map 并返回
	g.val.Store(nm)
	g.mutex.Unlock()
	return breaker
}

Breaker 结构体

// Breaker is a sre CircuitBreaker pattern.
type Breaker struct {
   stat window.RollingCounter
   r    *rand.Rand
   // rand.New(...) returns a non thread safe object
   randLock sync.Mutex

   // Reducing the k will make adaptive throttling behave more aggressively,
   // Increasing the k will make adaptive throttling behave less aggressively.
   k       float64
   request int64

   state int32
}
  1. stat
    • 滑动窗口,记录成功失败
  2. r
    • 随机数
  3. randLock
    • 读写锁
  4. k 成功系数
    • total(总数) = success * k
  5. request 请求数
    • 当总数 < request时,不判断是否熔断
  6. state
    • 熔断器状态 打开或者关闭

Allow()方法

// Allow request if error returns nil.
func (b *Breaker) Allow() error {
	success, total := b.summary() // 从活动窗口获取成功数和总数
	k := b.k * float64(success) // 根据k成功系数 获取

	// check overflow requests = K * success
	if total < b.request || float64(total) < k { // 如果总数<request 或者  总数 < k 
		if atomic.LoadInt32(&b.state) == StateOpen { 
			atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) // 如果state是打开 关闭
		}
		return nil
	}
	if atomic.LoadInt32(&b.state) == StateClosed { 
		atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) // 如果state是关闭 打开
	}
	dr := math.Max(0, (float64(total)-k)/float64(total+1)) // 获取系数,当k越大 dr越小
	drop := b.trueOnProba(dr)
	// trueOnProba 获取水机数
  // 返回是否<dr
  
	if drop { // 如果是 拒绝请求
		return circuitbreaker.ErrNotAllowed
	}
	return nil
}

func (b *Breaker) trueOnProba(proba float64) (truth bool) {
	b.randLock.Lock()
	truth = b.r.Float64() < proba
	b.randLock.Unlock()
	return
}

使用trueOnProba的原因是,当熔断器关闭时,随机让一部分请求通过,当success越大,请求的通过的数量就越多。用这些数据成功与否,放入窗口统计,当成功数达到要求时,就可以关闭熔断器了。

MarkSuccess()以及MarkFailed()方法

// MarkSuccess mark requeest is success.
func (b *Breaker) MarkSuccess() {
	b.stat.Add(1) // 成功数+1
}

// MarkFailed mark request is failed.
func (b *Breaker) MarkFailed() {
	// NOTE: when client reject requets locally, continue add counter let the
	// drop ratio higher.
	b.stat.Add(0) // 失败数+1
}

流程图


QQ扫一扫交流

标题:从kratos分析breaker熔断器源码实现

链接:https://www.zhaohaiyu.com/post/microservice/breaker/

作者:haiyux

声明: 本博客文章除特别声明外,均采用 CC BY-NC-SA 3.0许可协议,转载请注明出处!

创作实属不易,如有帮助,那就打赏博主些许茶钱吧 ^_^
WeChat Pay

微信打赏

Alipay

支付宝打赏

Go工程化 - 依赖注入
从kratos分析BBR限流源码实现
  • 文章目录
  • 站点概览
haiyux

haiyux

路漫漫其修远兮,吾将上下而求索

62 日志
8 分类
20 标签
GitHub Email
友情链接
  • kratos
  • farer
  • 我的博客园
  • 李文周
  • 凡梦星尘
标签云
  • Golang 30
  • 微服务 8
  • Mysql 7
  • HTTP 4
  • Protobuf 4
  • 运维 4
  • Grpc 3
  • 设计模式 3
  • Redis 2
  • Ddd 1
  • Docker 1
  • Kratos 1
  • Makefile 1
  • Mq 1
  • Nginx 1
  • Raft 1
  • Thrift 1
  • Traceing 1
  • Wire 1
  • 泛型 1
  • 为什么要用熔断
  • 什么是熔断
  • 源码分析
    • 源码地址
    • CircuitBreaker 接口
    • Group 结构体
    • Breaker 结构体
    • Allow()方法
    • MarkSuccess()以及MarkFailed()方法
  • 流程图
© 2010 - 2022 haiyux's blog
Powered by - Hugo v0.98.0 / Theme by - NexT
Storage by 又拍云存储 / 京ICP备19052634号-2
0%