Dapr 源码解析 | 服务间调用
Table of Contents
Service Invocation 是 dapr 对外提供的最基础功能, 也就是服务间调用. 另外别的一些功能也会间接使用它.
本文不会介绍功能如何使用, 相关资料请查看官方文档.
总览 #
官方文档给出来 Service Invocation 服务间调用的示意图, 已经是非常清晰了. 因为 dapr 是 sidecar 模式运行在业务 APP 旁边, 所以我们服务间调用也是通过 dapr 做的转发, dapr 也就是在这一步做了监控, 追踪, mTLS 等功能.
- service A 想通过 HTTP/gRPC 调用 service B, 会先请求自己本地 dapr A(sidecar)应用
- dapr A 通过
name resolution
服务发现模块获取到 dapr B 服务的地址 - dapr A 通过
gRPC
调用 dapr B (dapr 之间的调用都会用 gRPC 提高性能) - dapr B 收到请求后转发请求给 service B
- service B 返回响应给 dapr B
- dapr B 将收到的响应返回给 dapr A
- dapr A 将收到的相应返回给 service A
整个流程经过了 dapr runtime 三个部分, 首先是 service A 通过 dapr API 调用 dapr sidecar(1, 7), 接着 sidecar 之间通过 internal grpc client 调用 internal grpc server(3, 6), 最后是目标 sidecar 通过 App Channel 调用 service B(4, 5).
Dapr API #
dapr sidecar 会启动 HTTP 和 gRPC 两种 API 服务供用户 app 调用.
以 HTTP API 为例:
1. 入口 #
dapr 对外提供的服务间调用 HTTP API 为:
POST/GET/PUT/DELETE http://localhost:<daprPort>/v1.0/invoke/<appId>/method/<method-name>
2. onDirectMessage (1, 7) #
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/http/api.go#L260
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{router.MethodWild},
Route: "invoke/{id}/method/{method:*}",
Alias: "{method:*}",
Version: apiVersionV1,
Handler: a.onDirectMessage,
},
}
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/http/api.go#L865
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
// 从请求中解析到目标 app-id
targetID := a.findTargetID(reqCtx)
// 拿到请求 method, 后续构建转发请求
verb := strings.ToUpper(string(reqCtx.Method()))
invokeMethodName := reqCtx.UserValue(methodParam).(string)
// 构建转发的请求, 转发一些 metadata
req := invokev1.NewInvokeMethodRequest(invokeMethodName).WithHTTPExtension(verb, reqCtx.QueryArgs().String())
// ...
// 通过 directMessaging 转发请求
resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
// err does not represent user application response
if err != nil {
// 响应错误
return
}
// 响应请求, 如果是 grpc 响应则转化一下
statusCode := int(resp.Status().Code)
respond(reqCtx, with(statusCode, body))
}
onDirectMessage 为 API handler, 简单来说主要做了以下几点事情:
- 解析请求获取目标 app-id, 和一些简单校验工作
- 通过用户请求构建出内部请求(内部请求为 protobuf 格式, body 为 pb.Any 格式)
- 调用
directMessaging.Invoke
转发请求 - 根据响应构建响应返回给用户
Internal gRPC remote call #
这一步到了 dapr sidecar 之间互相调用. 每个 dapr sidecar 都会启动一个 grpc 服务供其他 sidecar 调用, 叫做 Internal Server(proto 声明可见 dapr/proto/internals/v1).
1. directMessaging #
dapr 也是很标准的大写开头定义 interface 接口, 小写字母开头定义实现.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L42
type DirectMessaging interface {
Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L102
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 通过 name resolution 获取目标 app 信息
app, err := d.getRemoteApp(targetAppID)
if err != nil {
return nil, err
}
// 发现目标为自己时, invokeLocal 调用自己 sidecar 本地的用户 app
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req)
}
// 目标不是自己, 调用目标 dapr sidecar
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}
directMessaging 做了下面几件事情:
- 通过 name resolution 获取目标 app 的 address
- 如果发现调用是自己, 则通过
invokeLocal
调用自己 sidecar 本地的用户 app - 发现不是自己时, 通过
invokeWithRetry
调用目标 dapr sidecar
2. name resolution (2) #
name resolution 部分会在后续讲解, 现在只需要知道它和我们服务发现差不多, ResolveID(req ResolveRequest) (string, error)
通过 interface 定义可以看出, 就是通过 appId 和 namespace 获取 address.
3. invokeWithRetry (3, 6) #
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L129
func (d *directMessaging) invokeWithRetry(
ctx context.Context,
numRetries int,
backoffInterval time.Duration,
app remoteApp,
fn func(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error),
req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
for i := 0; i < numRetries; i++ {
// 简单封装了带有 backoff 的 retry
// fn 为 d.invokeRemote
resp, err := fn(ctx, app.id, app.namespace, app.address, req)
if err == nil {
return resp, nil
}
time.Sleep(backoffInterval)
code := status.Code(err)
if code == codes.Unavailable || code == codes.Unauthenticated {
// 重新建立连接, recreateIfExists = true
_, connerr := d.connectionCreatorFn(context.TODO(), app.address, app.id, app.namespace, false, true, false)
if connerr != nil {
return nil, connerr
}
continue
}
return resp, err
}
return nil, errors.Errorf("failed to invoke target %s after %v retries", app.id, numRetries)
}
invokeWithRetry 做了两件事情:
- 简单封装了带有 backoff 的 retry, 真正执行逻辑的函数为传进来的
d.invokeRemote
- 在出现 Unavailable 和 Unauthenticated 错误时, 尝试重新建立连接
3.1 connectionCreatorFn #
dapr 使用 [Manager](http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/grpc.go#L42-L42)
来管理 grpc client 连接, 保证每一个服务仅创建一个连接.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/grpc.go#L77
func (g *Manager) GetGRPCConnection(ctx context.Context, address, id string, namespace string, skipTLS, recreateIfExists, sslEnabled bool, customOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
g.lock.RLock()
if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
g.lock.RUnlock()
return val, nil
}
g.lock.RUnlock()
g.lock.Lock()
defer g.lock.Unlock()
// read the value once again, as a concurrent writer could create it
if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
return val, nil
}
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(grpcServiceConfig),
}
// ...
opts = append(opts, customOpts...)
conn, err := grpc.DialContext(ctx, dialPrefix+address, opts...)
if err != nil {
return nil, err
}
if c, ok := g.connectionPool[address]; ok {
c.Close()
}
g.connectionPool[address] = conn
return conn, nil
}
GetGRPCConnection
简单管理共享复用 grpc client 连接, map[string]*grpc.ClientConn
. 当参数 recreateIfExists
为 true 时会关闭旧连接创建新连接. 后续 d.invokeRemote
也是通过此函数拿连接.
3.2 invokeRemote #
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/messaging/direct_messaging.go#L173
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 获取复用连接
conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
if err != nil {
return nil, err
}
ctx = d.setContextSpan(ctx)
d.addForwardedHeadersToMetadata(req)
d.addDestinationAppIDHeaderToMetadata(appID, req)
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))
// grpc 调用
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
if err != nil {
return nil, err
}
return invokev1.InternalInvokeResponse(resp)
}
invokeRemote 通过 grpc CallLocal
方法调用另一个 dapr sidecar 的 internal grpc server 方法. 这里的 CallLocal
不是本地调用的意思, 而是代表它是 dapr 的 internal 内部方法.
4. grpcServer.CallLocal (4, 5) #
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/api.go#L147
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
// ...
// 包装转化 request
req, err := invokev1.InternalInvokeRequest(in)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, messages.ErrInternalInvokeRequest, err.Error())
}
// 路由权限管理
if a.accessControlList != nil {
// ...
callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)
if !callAllowed {
return nil, status.Errorf(codes.PermissionDenied, errMsg)
}
}
// 转发请求
resp, err := a.appChannel.InvokeMethod(ctx, req)
if err != nil {
err = status.Errorf(codes.Internal, messages.ErrChannelInvoke, err)
return nil, err
}
return resp.Proto(), err
}
发现 CallLocal 部分除了转换下请求和路由权限管理外, 将核心逻辑交给了 appChannel.InvokeMethod
来处理.
AppChannel #
dapr sidecar 通过 AppChannel
来调用用户服务, 由于用户服务可以是 gRPC 协议也可以是 HTTP 协议. 所以 appChannel 也有 HTTP 和 gRPC 两种实现.
dapr 会根据 runtimeConfig.ApplicationProtocol
中指定的类型(可以通过 dapr run--app-protocol
参数指定)选择初始化哪种作为 appChannel
:
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/channel.go#L21
type AppChannel interface {
GetBaseAddress() string
GetAppConfig() (*config.ApplicationConfig, error)
InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1962
func (a *DaprRuntime) createAppChannel() error {
if a.runtimeConfig.ApplicationPort > 0 {
var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int) (channel.AppChannel, error)
// 根据用户 app 协议类型初始化不同 appChannel
switch a.runtimeConfig.ApplicationProtocol {
case GRPCProtocol:
channelCreatorFn = a.grpc.CreateLocalChannel
case HTTPProtocol:
channelCreatorFn = http_channel.CreateLocalChannel
default:
return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
}
ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize)
// ...
// set appChannel
a.appChannel = ch
}
return nil
}
4.1 http_channel.InvokeMethod #
我们以 HTTP 协议为例, 继续分析 http_channel.InvokeMethod
:
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/http/http_channel.go#L118
func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 简单做了参数校验, 和 api 版本管理方便后续扩展
// 目前只会调用 invokeMethodV1
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/channel/http/http_channel.go#L142
func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 转换请求, 将请求地址转化为 http://localhost:3000/method?query1=value1
// 请求体 body protobuf 类型转化成 content-type 类型, 并且设置 header
channelReq := h.constructRequest(ctx, req)
// 限速
if h.ch != nil {
h.ch <- 1
}
// 指标监控相关
verb := string(channelReq.Header.Method())
diag.DefaultHTTPMonitoring.ClientRequestStarted(ctx, verb, req.Message().Method, int64(len(req.Message().Data.GetValue())))
startRequest := time.Now()
// 发送请求
resp := fasthttp.AcquireResponse()
err := h.client.Do(channelReq, resp)
defer func() {
fasthttp.ReleaseRequest(channelReq)
fasthttp.ReleaseResponse(resp)
}()
elapsedMs := float64(time.Since(startRequest) / time.Millisecond)
if err != nil {
diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(nethttp.StatusInternalServerError), int64(resp.Header.ContentLength()), elapsedMs)
return nil, err
}
if h.ch != nil {
<-h.ch
}
// 将用户 app 返回的 response 转化为 dapr 内部的 protobuf 类型
rsp := h.parseChannelResponse(req, resp)
diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(int(rsp.Status().Code)), int64(resp.Header.ContentLength()), elapsedMs)
return rsp, nil
}
invokeMethodV1 最终做的就是将内部的请求类型转化为用户服务类型, 例如 body 转为 json 类型, 拿到响应后再转为内部类型.
gRPC API #
dapr 1.4 使用 gRPC proxying
这个新特性来处理用户 app 和 dapr sidecar 之间的 gRPC 调用.
简单来说就是现在使用 dapr 进行 gRPC 调用时, 使用方式和直接调用自己服务没有区别, 仅仅需要把 client 连接地址换成 dapr sidecar gRPC API 的地址, 并且调用时需要在 metadata 中增加 dapr-app-id
.
为什么需要这个特性? #
gRPC 很重要的一个特性是, 它可以帮助我们根据 proto 定义生成出函数类型定义, 请求和响应定义, 也就是说 gRPC 相当于编程中的静态语言. 但是 dapr 的 InvokeService
为了通用性, 只能将 request/response data 定义为 any
类型, 相当于变成了动态语言, 使用方式也变成了下面这种:
content := &dapr.DataContent{
ContentType: "application/json",
Data: []byte(`{ "id": "a123", "value": "demo", "valid": true }`),
}
resp, err = client.InvokeMethodWithContent(ctx, "app-id", "method-name", "post", content)
在使用上是难以接受的.
源码部分 #
https://github.com/zcong1993/dapr-1/tree/learn-1.4.3/pkg/grpc/proxy
grpc proxy 部分基于 https://github.com/trusch/grpc-proxy 项目修改而来. 主要在 grpc server 部分.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/grpc/server.go#L221
func (s *server) getGRPCServer() (*grpc_go.Server, error) {
opts := s.getMiddlewareOptions()
// ...
opts = append(opts, grpc_go.MaxRecvMsgSize(s.config.MaxRequestBodySize*1024*1024), grpc_go.MaxSendMsgSize(s.config.MaxRequestBodySize*1024*1024))
if s.proxy != nil {
// 如果使用 proxy, 注册 proxy.Handler 到 grpc.UnknownServiceHandler
opts = append(opts, grpc_go.UnknownServiceHandler(s.proxy.Handler()))
}
return grpc_go.NewServer(opts...), nil
}
// 处理当前 grpc server 不认识的请求
func (p *proxy) Handler() grpc.StreamHandler {
return grpc_proxy.TransparentHandler(p.intercept)
}
// 主要是通过 metadata 中的 dapr-app-id 拿到请求的地址, 然后代理请求
func (p *proxy) intercept(ctx context.Context, fullName string) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)
v := md.Get(diagnostics.GRPCProxyAppIDKey)
if len(v) == 0 {
return ctx, nil, errors.Errorf("failed to proxy request: required metadata %s not found", diagnostics.GRPCProxyAppIDKey)
}
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
appID := v[0]
if appID == p.appID {
// proxy locally to the app
if p.acl != nil {
ok, authError := acl.ApplyAccessControlPolicies(ctx, fullName, common.HTTPExtension_NONE, config.GRPCProtocol, p.acl)
if !ok {
return ctx, nil, status.Errorf(codes.PermissionDenied, authError)
}
}
conn, err := p.connectionFactory(outCtx, p.localAppAddress, p.appID, "", true, false, false, grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
return ctx, conn, err
}
// proxy to a remote daprd
remote, err := p.remoteAppFn(appID)
if err != nil {
return ctx, nil, err
}
conn, err := p.connectionFactory(outCtx, remote.address, remote.id, remote.namespace, false, false, false, grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
outCtx = p.telemetryFn(outCtx)
return outCtx, conn, err
}
主要为两个步骤:
intercept
拿到 metadata 中的dapr-app-id
, 根据 name resolution 拿到目标 gRPC 服务地址后, 从 connectionFactory 拿到共享连接grpc_proxy.TransparentHandler
通过双向转发流数据, 进行代理
假如上面流程图里面 server A 和 service B 都是 gRPC 服务, 而且 A 调用 B 的 Echo
方法, 会经过一下流程:
- 直接连接 dapr A gRPC server, 调用
Echo
方法, metadatadapr-app-id
需要设置为 service-b - dapr A 不认识这个方法, grpc_proxy.TransparentHandler 处理请求, 根据
dapr-app-id
与 dapr B 建立连接, 并将请求转发给 dapr B - dapr B 也不认识这个方法, grpc_proxy.TransparentHandler 处理请求, 根据
dapr-app-id
发现请求目标是自己, 与 service B 建立连接, 并将请求转发给 service B - service B
Echo
处理请求, 并响应结果