Skip to main content

Dapr 源码解析 | Resiliency

·4 mins

Dapr 1.17 版本已经发布, 本文介绍 1.17 版本最重磅的功能 Resiliency 弹性容错功能.

背景 #

分布式系统往往由非常多的微服务组成, 系统故障出现的可能性就大大增加了. 例如, 一个实例可能由于硬件、过多的请求、应用重启或其他一些原因而失败或无响应, 这些问题会导致应用间调用失败. 所以我们需要让我们的应用具有 容错性, 即检测、缓解和响应故障的能力, 使得我们的应用具有自愈能力.

service mesh 往往都会在网络层提供这样的弹性能力. 对于 dapr 来说, 它不但接管了我们的服务间请求, 还提供很多应用和中间件交互的组件, 所以 dapr 在 1.17 版本提供了三种策略, 超时重试熔断, 并且我们可以选择在 服务间调用, componentactor 维度使用他们.

配置 #

Resiliency 配置是 kind 是 Resiliency , 类型为 CRD 的全局配置, 主要分为 policiestargets 两个部分, 还支持 scopes 控制配置生效范围.

apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
scopes:
  # 可以指定对哪些 app 生效(范围)
spec:
  policies: # 声明三种策略, targets 来引用
    timeouts:
      # 超时模块配置
    retries:
      # 重试策略配置
    circuitBreakers:
      # 熔断策略配置
  targets: # 为不同的目标指定上述策略
    apps:
      # 服务间调用
    actors:
      # actors 相关
    components:
      # 组件模块

policies 配置可以分别声明出多种策略, targets 则是通过引用上面的 policy 实现对不同模块的弹性功能配置.

load 配置 #

配置加载之前文章也有讲过. 分别实现了 k8s 集群内通过 operator 获取 CRD 配置, 非 k8s 集群从文件中加载配置. 具体代码可见 pkg/resiliency/resiliency.go 方法为 LoadKubernetesResiliencyLoadStandaloneResiliency, 值得注意的是最终都会通过 filterResiliencyConfigs 方法保证只会加载 scope 中包含自己的配置.

源码分析 #

resiliency module #

Resiliency 模块的核心实现在 pkg/resiliency 文件夹下.

首先分析 Policy 方法, 它是一个高阶函数, 提供了根据三种弹性策略配置生成一个具有弹性能力的 function wrapper, 将应用逻辑放进去执行就可以获得对应的弹性能力.

// https://github.com/zcong1993/dapr-1/blob/3c4510a685470eb8a5403e9d9a451bc93e67e6c4/pkg/resiliency/policy.go#L37
func Policy(ctx context.Context, log logger.Logger, operationName string, t time.Duration, r *retry.Config, cb *breaker.CircuitBreaker) Runner {
  return func(oper Operation) error {
    operation := oper
    // 超时策略
    if t > 0 {
      operCopy := operation
      // 非常经典的 go 语言超时控制
      operation = func(ctx context.Context) error {
        ctx, cancel := context.WithTimeout(ctx, t)
        defer cancel()

        done := make(chan error, 1)
        go func() {
          done <- operCopy(ctx)
        }()

        select {
        case err := <-done:
          return err
        case <-ctx.Done():
          return ctx.Err()
        }
      }
    }

    // 熔断策略
    // 使用 github.com/sony/gobreaker 库
    if cb != nil {
      operCopy := operation
      operation = func(ctx context.Context) error {
        err := cb.Execute(func() error {
          return operCopy(ctx)
        })
        // 熔断生效时, 返回 backoff.Permanent 错误,
        // 让重试失效
        if r != nil && breaker.IsErrorPermanent(err) {
          err = backoff.Permanent(err)
        }
        return err
      }
    }

    if r == nil {
      return operation(ctx)
    }

    // 重试
    // 使用 github.com/cenkalti/backoff/v4 库
    b := r.NewBackOffWithContext(ctx)
    return retry.NotifyRecover(func() error {
      return operation(ctx)
    }, b, func(_ error, _ time.Duration) {
      log.Infof("Error processing operation %s. Retrying...", operationName)
    }, func() {
      log.Infof("Recovered processing operation %s.", operationName)
    })
  }
}

整个函数逻辑是简单的使用装饰器模式分别为上层的 Operation 增加了超时, 重试和熔断的能力.

但是从参数可以看出超时和重试仅仅需要传递配置, 但是熔断则是传递了 CircuitBreaker 实例, 因为熔断是需要根据历史数据计算熔断器状态的, 也就是有状态的, 并且熔断的粒度在不同场景下是不一样的, 例如服务间调用往往是以 API 请求类型作为维度(rpc method 或者 http endpoint)设置熔断器, 而数据库则经常以整体为维度设置熔断器. 因此引入了下一个关键部分 ProviderResiliency 管理组织共享超时, 重试配置和熔断器实例.

// https://github.com/zcong1993/dapr-1/blob/3c4510a685470eb8a5403e9d9a451bc93e67e6c4/pkg/resiliency/resiliency.go#L62
// Provider 抽象出了 resiliency 提供者
type Provider interface {
  // 为服务间调用提供 policy runner
  EndpointPolicy(ctx context.Context, service string, endpoint string) Runner
  // actor 相关(不分析)
  ActorPreLockPolicy(ctx context.Context, actorType string, id string) Runner
  // actor 相关(不分析)
  ActorPostLockPolicy(ctx context.Context, actorType string, id string) Runner
  // component 出口相关 (dapr sidecar -> 外部系统(db, queue, redis))
  ComponentOutboundPolicy(ctx context.Context, name string) Runner
  // component 入口相关 (dapr sidecar -> 用户应用)
  ComponentInboundPolicy(ctx context.Context, name string) Runner
}

由于 dapr 支持服务间调用, component 和 actor 三个部分的弹性功能, 所以 Provider 需要为各自模块提供获取 Policy Runner 的能力.

Resiliency 实现了 Provider, 核心点就在于共享熔断器实例.

// https://github.com/zcong1993/dapr-1/blob/3c4510a685470eb8a5403e9d9a451bc93e67e6c4/pkg/resiliency/resiliency.go#L78
type Resiliency struct {
  log logger.Logger

  // policies 中声明的 timeout 策略
  timeouts        map[string]time.Duration
  // policies 中声明的 retry 策略
  retries         map[string]*retry.Config
  // policies 中声明的熔断策略, 会作为 template 使用
  circuitBreakers map[string]*breaker.CircuitBreaker

  actorCBCaches map[string]*lru.Cache
  serviceCBs    map[string]*lru.Cache
  componentCBs  *circuitBreakerInstances

  apps       map[string]PolicyNames
  actors     map[string]ActorPolicies
  components map[string]ComponentPolicyNames
}

type circuitBreakerInstances struct {
  sync.RWMutex
  cbs map[string]*breaker.CircuitBreaker
}

timeouts, retries 都会将 policies 中声明的超时, 重试策略保存成 map[name]config 的形式, 后面会通过 name 来引用. 但是熔断器配置这里有点特殊, 由于会根据需求存在多个熔断器实例, 所以策略中的熔断器配置会作为 template 来使用.

actorCBCaches, serviceCBscomponentCBs 用来管理, 存储, 共享各自模块的熔断器实例.

服务间调用的熔断粒度为 API endpoint, 所以 serviceCBs 类型为 map[serviceName]lru<endpoint, CircuitBreaker>, 使用 lru 防止 endpoint 过多导致内存中有太多熔断器实例. 这个 lru 的默认大小为 100, 可以通过配置修改.

接着分析 EndpointPolicy, 它的作用是为服务间调用提供 policy runner.

// https://github.com/zcong1993/dapr-1/blob/3c4510a685470eb8a5403e9d9a451bc93e67e6c4/pkg/resiliency/resiliency.go#L360
func (r *Resiliency) EndpointPolicy(ctx context.Context, app string, endpoint string) Runner {
  var t time.Duration
  var rc *retry.Config
  var cb *breaker.CircuitBreaker
  operationName := fmt.Sprintf("endpoint[%s, %s]", app, endpoint)
  if r == nil {
    return Policy(ctx, r.log, operationName, t, rc, cb)
  }
  // 判断目标服务 target 有没有配置
  policyNames, ok := r.apps[app]
  if ok {
    // 根据 timeout 配置 name 获取对应超时策略配置
    if policyNames.Timeout != "" {
      t = r.timeouts[policyNames.Timeout]
    }
    // 根据 retry 配置 name 获取对应超时策略配置
    if policyNames.Retry != "" {
      rc = r.retries[policyNames.Retry]
    }
    if policyNames.CircuitBreaker != "" {
      // 根据熔断器名称获取对应的熔断策略配置
      template, ok := r.circuitBreakers[policyNames.CircuitBreaker]
      if ok {
        // 根据目标 app 拿到 lru 实例
        cache, ok := r.serviceCBs[app]
        if ok {
          // 从缓存中根据 endpoint 拿到对应熔断器
          cbi, ok := cache.Get(endpoint)
          if ok {
            cb, _ = cbi.(*breaker.CircuitBreaker)
          } else {
            // 不存在时根据模板配置创建并缓存
            cb = &breaker.CircuitBreaker{
              Name:        endpoint,
              MaxRequests: template.MaxRequests,
              Interval:    template.Interval,
              Timeout:     template.Timeout,
              Trip:        template.Trip,
            }
            cb.Initialize(r.log)
            cache.Add(endpoint, cb)
          }
        }
      }
    }
  }

  // 最终返回 policy runner factory
  return Policy(ctx, r.log, operationName, t, rc, cb)
}

ComponentOutboundPolicyComponentInboundPolicy 也是同理, 不过 component 的熔断器是以 component 实例为维度的, 数目是一定的, 所以不需要使用 lru.

应用 #

Resiliency 初始化后会放在 runtime 实例上, 在 dapr 的各个组件中共享. dapr sidecar 的 http/grpc server 模块也会拿到这个实例, 最终在对应的 API handler 中使用对应的 factory 初始化 policy runner 包装原来的逻辑.

以 http 服务间调用为例:

// https://github.com/zcong1993/dapr-1/blob/3c4510a685470eb8a5403e9d9a451bc93e67e6c4/pkg/http/api.go#L985
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
  // ...
  // 通过 resiliency.EndpointPolicy 创建 policy runner
  policy := a.resiliency.EndpointPolicy(reqCtx, targetID, fmt.Sprintf("%s:%s", targetID, invokeMethodName))
  err := policy(func(ctx context.Context) (rErr error) {
    // 原有逻辑
  })
  // ...
}

grpc server 分为两部分, InvokeService 方法和 proxy 模块. component 模块也是同理.

总结 #

dapr 对于弹性功能的实现非常简洁, 都是通过组合开源组件实现的. 并且配置分为了声明和引用两个部分, 减少了配置重复性. 我们自己的服务也可以借鉴这种方式实现自己的弹性模块. 使用 provider 的形式也可以实现 noopProvider 空实现的形式兼容未开启此功能的情况.

参考资料 #

wxmp