Skip to main content

Dapr 源码解析 | 服务间调用

·8 mins

Service Invocation 是 dapr 对外提供的最基础功能, 也就是服务间调用. 另外别的一些功能也会间接使用它.

本文不会介绍功能如何使用, 相关资料请查看官方文档.

https://docs.dapr.io/developing-applications/building-blocks/service-invocation/howto-invoke-discover-services/

https://docs.dapr.io/developing-applications/building-blocks/service-invocation/howto-invoke-services-grpc/

总览 #

dapr-si-overview

官方文档给出来 Service Invocation 服务间调用的示意图, 已经是非常清晰了. 因为 dapr 是 sidecar 模式运行在业务 APP 旁边, 所以我们服务间调用也是通过 dapr 做的转发, dapr 也就是在这一步做了监控, 追踪, mTLS 等功能.

  1. service A 想通过 HTTP/gRPC 调用 service B, 会先请求自己本地 dapr A(sidecar)应用
  2. dapr A 通过 name resolution 服务发现模块获取到 dapr B 服务的地址
  3. dapr A 通过 gRPC 调用 dapr B (dapr 之间的调用都会用 gRPC 提高性能)
  4. dapr B 收到请求后转发请求给 service B
  5. service B 返回响应给 dapr B
  6. dapr B 将收到的响应返回给 dapr A
  7. dapr A 将收到的相应返回给 service A

整个流程经过了 dapr runtime 三个部分, 首先是 service A 通过 dapr API 调用 dapr sidecar(1, 7), 接着 sidecar 之间通过 internal grpc client 调用 internal grpc server(3, 6), 最后是目标 sidecar 通过 App Channel 调用 service B(4, 5).

Dapr API #

dapr sidecar 会启动 HTTP 和 gRPC 两种 API 服务供用户 app 调用.

以 HTTP API 为例:

1. 入口 #

dapr 对外提供的服务间调用 HTTP API 为:

POST/GET/PUT/DELETE http://localhost:<daprPort>/v1.0/invoke/<appId>/method/<method-name>

2. onDirectMessage (1, 7) #

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/http/api.go#L260
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
   return []Endpoint{
      {
        Methods: []string{router.MethodWild},
        Route:   "invoke/{id}/method/{method:*}",
        Alias:   "{method:*}",
        Version: apiVersionV1,
        Handler: a.onDirectMessage,
      },
   }
}

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/http/api.go#L865
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
  // 从请求中解析到目标 app-id
  targetID := a.findTargetID(reqCtx)

  // 拿到请求 method, 后续构建转发请求
  verb := strings.ToUpper(string(reqCtx.Method()))
  invokeMethodName := reqCtx.UserValue(methodParam).(string)

  // 构建转发的请求, 转发一些 metadata
  req := invokev1.NewInvokeMethodRequest(invokeMethodName).WithHTTPExtension(verb, reqCtx.QueryArgs().String())
  // ...

  // 通过 directMessaging 转发请求
  resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
  // err does not represent user application response
  if err != nil {
    // 响应错误
    return
  }

  // 响应请求, 如果是 grpc 响应则转化一下
  statusCode := int(resp.Status().Code)
  respond(reqCtx, with(statusCode, body))
}

onDirectMessage 为 API handler, 简单来说主要做了以下几点事情:

  1. 解析请求获取目标 app-id, 和一些简单校验工作
  2. 通过用户请求构建出内部请求(内部请求为 protobuf 格式, body 为 pb.Any 格式)
  3. 调用directMessaging.Invoke 转发请求
  4. 根据响应构建响应返回给用户

Internal gRPC remote call #

这一步到了 dapr sidecar 之间互相调用. 每个 dapr sidecar 都会启动一个 grpc 服务供其他 sidecar 调用, 叫做 Internal Server(proto 声明可见 dapr/proto/internals/v1).

1. directMessaging #

dapr 也是很标准的大写开头定义 interface 接口, 小写字母开头定义实现.

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L42
type DirectMessaging interface {
  Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L102
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 通过 name resolution 获取目标 app 信息
  app, err := d.getRemoteApp(targetAppID)
  if err != nil {
    return nil, err
  }
  // 发现目标为自己时, invokeLocal 调用自己 sidecar 本地的用户 app
  if app.id == d.appID && app.namespace == d.namespace {
    return d.invokeLocal(ctx, req)
  }
  // 目标不是自己, 调用目标 dapr sidecar
  return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}

directMessaging 做了下面几件事情:

  1. 通过 name resolution 获取目标 app 的 address
  2. 如果发现调用是自己, 则通过 invokeLocal 调用自己 sidecar 本地的用户 app
  3. 发现不是自己时, 通过 invokeWithRetry 调用目标 dapr sidecar

2. name resolution (2) #

name resolution 部分会在后续讲解, 现在只需要知道它和我们服务发现差不多, ResolveID(req ResolveRequest) (string, error) 通过 interface 定义可以看出, 就是通过 appId 和 namespace 获取 address.

3. invokeWithRetry (3, 6) #

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L129
func (d *directMessaging) invokeWithRetry(
  ctx context.Context,
  numRetries int,
  backoffInterval time.Duration,
  app remoteApp,
  fn func(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error),
  req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  for i := 0; i < numRetries; i++ {
    // 简单封装了带有 backoff 的 retry
    // fn 为 d.invokeRemote
    resp, err := fn(ctx, app.id, app.namespace, app.address, req)
    if err == nil {
      return resp, nil
    }
    time.Sleep(backoffInterval)

    code := status.Code(err)
    if code == codes.Unavailable || code == codes.Unauthenticated {
      // 重新建立连接, recreateIfExists = true
      _, connerr := d.connectionCreatorFn(context.TODO(), app.address, app.id, app.namespace, false, true, false)
      if connerr != nil {
        return nil, connerr
      }
      continue
    }
    return resp, err
  }
  return nil, errors.Errorf("failed to invoke target %s after %v retries", app.id, numRetries)
}

invokeWithRetry 做了两件事情:

  1. 简单封装了带有 backoff 的 retry, 真正执行逻辑的函数为传进来的 d.invokeRemote
  2. 在出现 Unavailable 和 Unauthenticated 错误时, 尝试重新建立连接

3.1 connectionCreatorFn #

dapr 使用 [Manager](http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/grpc.go#L42-L42) 来管理 grpc client 连接, 保证每一个服务仅创建一个连接.

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/grpc.go#L77
func (g *Manager) GetGRPCConnection(ctx context.Context, address, id string, namespace string, skipTLS, recreateIfExists, sslEnabled bool, customOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
  g.lock.RLock()
  if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
    g.lock.RUnlock()
    return val, nil
  }
  g.lock.RUnlock()

  g.lock.Lock()
  defer g.lock.Unlock()
  // read the value once again, as a concurrent writer could create it
  if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
    return val, nil
  }

  opts := []grpc.DialOption{
    grpc.WithDefaultServiceConfig(grpcServiceConfig),
  }
  // ...
  opts = append(opts, customOpts...)
  conn, err := grpc.DialContext(ctx, dialPrefix+address, opts...)
  if err != nil {
    return nil, err
  }

  if c, ok := g.connectionPool[address]; ok {
    c.Close()
  }

  g.connectionPool[address] = conn

  return conn, nil
}

GetGRPCConnection 简单管理共享复用 grpc client 连接, map[string]*grpc.ClientConn . 当参数 recreateIfExists 为 true 时会关闭旧连接创建新连接. 后续 d.invokeRemote 也是通过此函数拿连接.

3.2 invokeRemote #

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L173
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 获取复用连接
  conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
  if err != nil {
    return nil, err
  }

  ctx = d.setContextSpan(ctx)

  d.addForwardedHeadersToMetadata(req)
  d.addDestinationAppIDHeaderToMetadata(appID, req)

  clientV1 := internalv1pb.NewServiceInvocationClient(conn)

  var opts []grpc.CallOption
  opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))

  // grpc 调用
  resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
  if err != nil {
    return nil, err
  }

  return invokev1.InternalInvokeResponse(resp)
}

invokeRemote 通过 grpc CallLocal 方法调用另一个 dapr sidecar 的 internal grpc server 方法. 这里的 CallLocal 不是本地调用的意思, 而是代表它是 dapr 的 internal 内部方法.

4. grpcServer.CallLocal (4, 5) #

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/api.go#L147
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
  // ...
  // 包装转化 request
  req, err := invokev1.InternalInvokeRequest(in)
  if err != nil {
    return nil, status.Errorf(codes.InvalidArgument, messages.ErrInternalInvokeRequest, err.Error())
  }

  // 路由权限管理
  if a.accessControlList != nil {
    // ...
    callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)
    if !callAllowed {
      return nil, status.Errorf(codes.PermissionDenied, errMsg)
    }
  }

  // 转发请求
  resp, err := a.appChannel.InvokeMethod(ctx, req)
  if err != nil {
    err = status.Errorf(codes.Internal, messages.ErrChannelInvoke, err)
    return nil, err
  }
  return resp.Proto(), err
}

发现 CallLocal 部分除了转换下请求和路由权限管理外, 将核心逻辑交给了 appChannel.InvokeMethod 来处理.

AppChannel #

dapr sidecar 通过 AppChannel 来调用用户服务, 由于用户服务可以是 gRPC 协议也可以是 HTTP 协议. 所以 appChannel 也有 HTTP 和 gRPC 两种实现.

dapr 会根据 runtimeConfig.ApplicationProtocol 中指定的类型(可以通过 dapr run--app-protocol 参数指定)选择初始化哪种作为 appChannel:

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/channel.go#L21
type AppChannel interface {
  GetBaseAddress() string
  GetAppConfig() (*config.ApplicationConfig, error)
  InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1962
func (a *DaprRuntime) createAppChannel() error {
  if a.runtimeConfig.ApplicationPort > 0 {
    var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int) (channel.AppChannel, error)
    // 根据用户 app 协议类型初始化不同 appChannel
    switch a.runtimeConfig.ApplicationProtocol {
    case GRPCProtocol:
      channelCreatorFn = a.grpc.CreateLocalChannel
    case HTTPProtocol:
      channelCreatorFn = http_channel.CreateLocalChannel
    default:
      return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
    }

    ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize)
    // ...
    // set appChannel
    a.appChannel = ch
  }

  return nil
}

4.1 http_channel.InvokeMethod #

我们以 HTTP 协议为例, 继续分析 http_channel.InvokeMethod:

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/http/http_channel.go#L118
func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 简单做了参数校验, 和 api 版本管理方便后续扩展
  // 目前只会调用 invokeMethodV1
}

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/http/http_channel.go#L142
func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 转换请求, 将请求地址转化为 http://localhost:3000/method?query1=value1
  // 请求体 body protobuf 类型转化成 content-type 类型, 并且设置 header
  channelReq := h.constructRequest(ctx, req)

  // 限速
  if h.ch != nil {
    h.ch <- 1
  }

  // 指标监控相关
  verb := string(channelReq.Header.Method())
  diag.DefaultHTTPMonitoring.ClientRequestStarted(ctx, verb, req.Message().Method, int64(len(req.Message().Data.GetValue())))
  startRequest := time.Now()

  // 发送请求
  resp := fasthttp.AcquireResponse()
  err := h.client.Do(channelReq, resp)
  defer func() {
    fasthttp.ReleaseRequest(channelReq)
    fasthttp.ReleaseResponse(resp)
  }()

  elapsedMs := float64(time.Since(startRequest) / time.Millisecond)

  if err != nil {
    diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(nethttp.StatusInternalServerError), int64(resp.Header.ContentLength()), elapsedMs)
    return nil, err
  }

  if h.ch != nil {
    <-h.ch
  }

  // 将用户 app 返回的 response 转化为 dapr 内部的 protobuf 类型
  rsp := h.parseChannelResponse(req, resp)
  diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(int(rsp.Status().Code)), int64(resp.Header.ContentLength()), elapsedMs)

  return rsp, nil
}

invokeMethodV1 最终做的就是将内部的请求类型转化为用户服务类型, 例如 body 转为 json 类型, 拿到响应后再转为内部类型.

gRPC API #

dapr 1.4 使用 gRPC proxying 这个新特性来处理用户 app 和 dapr sidecar 之间的 gRPC 调用.

简单来说就是现在使用 dapr 进行 gRPC 调用时, 使用方式和直接调用自己服务没有区别, 仅仅需要把 client 连接地址换成 dapr sidecar gRPC API 的地址, 并且调用时需要在 metadata 中增加 dapr-app-id.

为什么需要这个特性? #

gRPC 很重要的一个特性是, 它可以帮助我们根据 proto 定义生成出函数类型定义, 请求和响应定义, 也就是说 gRPC 相当于编程中的静态语言. 但是 dapr 的 InvokeService 为了通用性, 只能将 request/response data 定义为 any 类型, 相当于变成了动态语言, 使用方式也变成了下面这种:

content := &dapr.DataContent{
    ContentType: "application/json",
    Data:        []byte(`{ "id": "a123", "value": "demo", "valid": true }`),
}

resp, err = client.InvokeMethodWithContent(ctx, "app-id", "method-name", "post", content)

在使用上是难以接受的.

源码部分 #

https://github.com/zcong1993/dapr-1/tree/learn-1.4.3/pkg/grpc/proxy

grpc proxy 部分基于 https://github.com/trusch/grpc-proxy 项目修改而来. 主要在 grpc server 部分.

// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/server.go#L221
func (s *server) getGRPCServer() (*grpc_go.Server, error) {
  opts := s.getMiddlewareOptions()
  // ...
  opts = append(opts, grpc_go.MaxRecvMsgSize(s.config.MaxRequestBodySize*1024*1024), grpc_go.MaxSendMsgSize(s.config.MaxRequestBodySize*1024*1024))

  if s.proxy != nil {
    // 如果使用 proxy, 注册 proxy.Handler 到 grpc.UnknownServiceHandler
    opts = append(opts, grpc_go.UnknownServiceHandler(s.proxy.Handler()))
  }

  return grpc_go.NewServer(opts...), nil
}

// 处理当前 grpc server 不认识的请求
func (p *proxy) Handler() grpc.StreamHandler {
  return grpc_proxy.TransparentHandler(p.intercept)
}

// 主要是通过 metadata 中的 dapr-app-id 拿到请求的地址, 然后代理请求
func (p *proxy) intercept(ctx context.Context, fullName string) (context.Context, *grpc.ClientConn, error) {
  md, _ := metadata.FromIncomingContext(ctx)

  v := md.Get(diagnostics.GRPCProxyAppIDKey)
  if len(v) == 0 {
    return ctx, nil, errors.Errorf("failed to proxy request: required metadata %s not found", diagnostics.GRPCProxyAppIDKey)
  }

  outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
  appID := v[0]

  if appID == p.appID {
    // proxy locally to the app
    if p.acl != nil {
      ok, authError := acl.ApplyAccessControlPolicies(ctx, fullName, common.HTTPExtension_NONE, config.GRPCProtocol, p.acl)
      if !ok {
        return ctx, nil, status.Errorf(codes.PermissionDenied, authError)
      }
    }

    conn, err := p.connectionFactory(outCtx, p.localAppAddress, p.appID, "", true, false, false, grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
    return ctx, conn, err
  }

  // proxy to a remote daprd
  remote, err := p.remoteAppFn(appID)
  if err != nil {
    return ctx, nil, err
  }

  conn, err := p.connectionFactory(outCtx, remote.address, remote.id, remote.namespace, false, false, false, grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
  outCtx = p.telemetryFn(outCtx)

  return outCtx, conn, err
}

主要为两个步骤:

  1. intercept 拿到 metadata 中的 dapr-app-id, 根据 name resolution 拿到目标 gRPC 服务地址后, 从 connectionFactory 拿到共享连接
  2. grpc_proxy.TransparentHandler 通过双向转发流数据, 进行代理

假如上面流程图里面 server A 和 service B 都是 gRPC 服务, 而且 A 调用 B 的 Echo 方法, 会经过一下流程:

  1. 直接连接 dapr A gRPC server, 调用 Echo 方法, metadata dapr-app-id 需要设置为 service-b
  2. dapr A 不认识这个方法, grpc_proxy.TransparentHandler 处理请求, 根据 dapr-app-id 与 dapr B 建立连接, 并将请求转发给 dapr B
  3. dapr B 也不认识这个方法, grpc_proxy.TransparentHandler 处理请求, 根据 dapr-app-id 发现请求目标是自己, 与 service B 建立连接, 并将请求转发给 service B
  4. service B Echo 处理请求, 并响应结果

参考资料 #

wxmp