Dapr 源码解析 | 组件模块
Table of Contents
Components
是 dapr 抽象出来的提供某种特定功能的可插拔组件, 为 dapr 或者 building blocks 所使用.
例如: 服务发现功能就是 dapr 的一个 component, 它的提供者可以使 mDNS, Kubernetes 和 consul, dapr 允许用户根据需求选择使用提供者的某种.
功能介绍 #
dapr 目前有如下这些组件:
- Secret Stores 秘钥管理
- State Stores 键值存储
- Pub/sub brokers 发布订阅
- Name Resolutions 服务发现
- Bindings 事件绑定
- Middlewares 中间件
实现 #
由于 component 设计成了可插拔的方式, 所以单个功能会有多种实现方式, 为了方便维护 dapr 将 component 实现代码分离在了 https://github.com/dapr/components-contrib 项目.
我们以 Secret Stores
为例, 分析下源码:
首先, 由于是某种特定功能, 所以 component 功能定义为一个 interface.
// http://github.com/zcong1993/components-contrib/blob/ff9f357a77f74a9ebaa0032da71c1f571143a1ca/secretstores/secret_store.go#L9
type SecretStore interface {
// Init authenticates with the actual secret store and performs other init operation
Init(metadata Metadata) error
// GetSecret retrieves a secret using a key and returns a map of decrypted string/string values
GetSecret(req GetSecretRequest) (GetSecretResponse, error)
// BulkGetSecrets retrieves all secrets in the store and returns a map of decrypted string/string values
BulkGetSecret(req BulkGetSecretRequest) (BulkGetSecretResponse, error)
}
基本上所有组件定义都分为两个部分, Init
和功能部分
.
Init 方法是用来校验配置和初始化组件. 而 matadata 就是 map[string]string
形式的组件配置, 通过 component config 获取, 之前配置模块文章介绍过.
https://github.com/zcong1993/components-contrib/tree/dapr-v1.4.3/secretstores 文件夹下面包含了所有实现 SecretStore
的 provider.
以 kubernetes 实现为例:
// http://github.com/zcong1993/components-contrib/blob/ff9f357a77f74a9ebaa0032da71c1f571143a1ca/secretstores/kubernetes/kubernetes.go
type kubernetesSecretStore struct {
kubeClient kubernetes.Interface
logger logger.Logger
}
// 初始化 k8s provider
func NewKubernetesSecretStore(logger logger.Logger) secretstores.SecretStore {
return &kubernetesSecretStore{logger: logger}
}
// impl Init
func (k *kubernetesSecretStore) Init(metadata secretstores.Metadata) error {
client, err := kubeclient.GetKubeClient()
if err != nil {
return err
}
k.kubeClient = client
return nil
}
// impl GetSecret
func (k *kubernetesSecretStore) GetSecret(req secretstores.GetSecretRequest) (secretstores.GetSecretResponse, error) {
resp := secretstores.GetSecretResponse{
Data: map[string]string{},
}
namespace, err := k.getNamespaceFromMetadata(req.Metadata)
if err != nil {
return resp, err
}
secret, err := k.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), req.Name, meta_v1.GetOptions{})
if err != nil {
return resp, err
}
for k, v := range secret.Data {
resp.Data[k] = string(v)
}
return resp, nil
}
// impl BulkGetSecret
func (k *kubernetesSecretStore) BulkGetSecret(req secretstores.BulkGetSecretRequest) (secretstores.BulkGetSecretResponse, error) {
// ...
}
可以看到, 此模块就是简单包装了 k8s secrets
API, Init 初始化 k8s client, GetSecret 和 BulkGetSecret 则是做了简单的校验和结果转换.
Provider 管理 #
component provider 以 runtime option 的形式注册到 runtime 实例上:
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/cmd/daprd/main.go#L155
err = rt.Run(
runtime.WithSecretStores(
secretstores_loader.New("kubernetes", func() secretstores.SecretStore {
return sercetstores_kubernetes.NewKubernetesSecretStore(logContrib)
}),
secretstores_loader.New("azure.keyvault", func() secretstores.SecretStore {
return keyvault.NewAzureKeyvaultSecretStore(logContrib)
}),
secretstores_loader.New("hashicorp.vault", func() secretstores.SecretStore {
return vault.NewHashiCorpVaultSecretStore(logContrib)
}),
secretstores_loader.New("aws.secretmanager", func() secretstores.SecretStore {
return secretmanager.NewSecretManager(logContrib)
}),
secretstores_loader.New("aws.parameterstore", func() secretstores.SecretStore {
return parameterstore.NewParameterStore(logContrib)
}),
secretstores_loader.New("gcp.secretmanager", func() secretstores.SecretStore {
return gcp_secretmanager.NewSecreteManager(logContrib)
}),
secretstores_loader.New("local.file", func() secretstores.SecretStore {
return secretstore_file.NewLocalSecretStore(logContrib)
}),
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(logContrib)
}),
),
// ...
}
之后在 runtime.initRuntime
方法中交给 runtime.secretStoresRegistry
管理.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/components/secretstores/registry.go#L18
type (
// Registry is used to get registered secret store implementations.
Registry interface {
Register(components ...SecretStore)
Create(name, version string) (secretstores.SecretStore, error)
}
secretStoreRegistry struct {
secretStores map[string]func() secretstores.SecretStore
}
)
secretStoreRegistry
是一个 map[string]factorFunc
类型, 通过 Register
方法将所有 providers 注册到这个 map 中, Create
则可以通过 name 作为 key 和 version 找到并调用对应的 factorFunc
返回对应 provider 实例 , version 是为了扩展, 方便后续做多版本支持(目前大都为 v0, v1 版本号可以作为缺省值).
初始化模块 #
之前配置模块文章, 可以知道 dapr 根据运行平台以不同的方式 load components 配置, 然后将所有components
配置交给 runtime.processComponents
处理.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1639
func (a *DaprRuntime) processComponents() {
for comp := range a.pendingComponents {
if comp.Name == "" {
continue
}
err := a.processComponentAndDependents(comp)
if err != nil {
e := fmt.Sprintf("process component %s error: %s", comp.Name, err.Error())
if !comp.Spec.IgnoreErrors {
log.Warnf("process component error daprd process will exited, gracefully to stop")
a.shutdownRuntime(defaultGracefulShutdownDuration)
log.Fatalf(e)
}
log.Errorf(e)
}
}
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1577
func (a *DaprRuntime) loadComponents(opts *runtimeOpts) error {
// ...
for _, comp := range authorizedComps {
a.pendingComponents <- comp
}
}
processComponents
只是简单将 component 分配给 processComponentAndDependents
处理, 它做了以下工作:
- 通过
preprocessOneComponent
检查模块是否依赖其他模块, 如果依赖, 将模块放入pendingComponentDependents
中等待依赖初始化后处理 - 通过
extractComponentCategory
拿到模块分类, 例如: 秘钥管理就是secretstores
- 开启一个 goroutine 调用
doProcessOneComponent
函数初始化当前模块, 并进行超时控制 doProcessOneComponent
只是简单通过不同分类调用不同的初始化方法, 初始化方法通过配置中的 type 和 version 通过对应Registry.Create
拿到 provider, 然后调用它的Init
方法, 最后将初始化完毕的实例保存到runtime
对应模块map
中- 检查是否有模块依赖当前模块, 如果有, 初始化这些模块
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1666
func (a *DaprRuntime) processComponentAndDependents(comp components_v1alpha1.Component) error {
log.Debugf("loading component. name: %s, type: %s/%s", comp.ObjectMeta.Name, comp.Spec.Type, comp.Spec.Version)
// 步骤 1
res := a.preprocessOneComponent(&comp)
if res.unreadyDependency != "" {
a.pendingComponentDependents[res.unreadyDependency] = append(a.pendingComponentDependents[res.unreadyDependency], comp)
return nil
}
// 步骤 2
compCategory := a.extractComponentCategory(comp)
if compCategory == "" {
// the category entered is incorrect, return error
return errors.Errorf("incorrect type %s", comp.Spec.Type)
}
ch := make(chan error, 1)
timeout, err := time.ParseDuration(comp.Spec.InitTimeout)
if err != nil {
timeout = defaultComponentInitTimeout
}
// 步骤 3
go func() {
ch <- a.doProcessOneComponent(compCategory, comp)
}()
select {
case err := <-ch:
if err != nil {
return err
}
case <-time.After(timeout):
return fmt.Errorf("init timeout for component %s exceeded after %s", comp.Name, timeout.String())
}
log.Infof("component loaded. name: %s, type: %s/%s", comp.ObjectMeta.Name, comp.Spec.Type, comp.Spec.Version)
a.appendOrReplaceComponents(comp)
diag.DefaultMonitoring.ComponentLoaded()
// 步骤 5
dependency := componentDependency(compCategory, comp.Name)
if deps, ok := a.pendingComponentDependents[dependency]; ok {
delete(a.pendingComponentDependents, dependency)
for _, dependent := range deps {
if err := a.processComponentAndDependents(dependent); err != nil {
return err
}
}
}
return nil
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1717
// 步骤 4.1
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
case bindingsComponent:
return a.initBinding(comp)
case pubsubComponent:
return a.initPubSub(comp)
case secretStoreComponent:
return a.initSecretStore(comp)
case stateComponent:
return a.initState(comp)
}
return nil
}
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L2011
// 步骤 4.2
func (a *DaprRuntime) initSecretStore(c components_v1alpha1.Component) error {
secretStore, err := a.secretStoresRegistry.Create(c.Spec.Type, c.Spec.Version)
if err != nil {
log.Warnf("failed creating secret store %s/%s: %s", c.Spec.Type, c.Spec.Version, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation")
return err
}
err = secretStore.Init(secretstores.Metadata{
Properties: a.convertMetadataItemsToProperties(c.Spec.Metadata),
})
if err != nil {
log.Warnf("failed to init state store %s/%s named %s: %s", c.Spec.Type, c.Spec.Version, c.ObjectMeta.Name, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init")
return err
}
// 将当前实例储存在 runtime map 中
a.secretStores[c.ObjectMeta.Name] = secretStore
diag.DefaultMonitoring.ComponentInitialized(c.Spec.Type)
return nil
}
以下面配置为例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mycustomsecretstore
namespace: default
spec:
type: secretstores.kubernetes
version: v1
metadata:
- name: ''
最终会被初始化并存储在 runtime 中:
runtime.secretStores = map[string]secretstores.SecretStore{
"mycustomsecretstore": &kubernetesSecretStore{...},
}
同理别的部分模块也会被按照分类以 map 的形式存在 runtime 中:
type DaprRuntime struct {
// ...
stateStores map[string]state.Store
inputBindings map[string]bindings.InputBinding
outputBindings map[string]bindings.OutputBinding
secretStores map[string]secretstores.SecretStore
pubSubs map[string]pubsub.PubSub
// ...
}
为什么会出现模块依赖? #
组件配置中经常会需要配置一些敏感信息, 例如数据库密码之类. 使用纯文本是非常不安全的, 所以 dapr 允许在组件配置中引用 secret store
组件中的秘钥配置, 类似于 k8s 中环境变量引用 secrets.
例如, 可以这么声明一个基于 redis 实现的 state store:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
secretKeyRef:
name: redis-secret
key: redis-password
auth:
secretStore: mycustomsecretstore
redisPassword
就是引用的 mycustomsecretstore
这个 secret store 中的 key 为 redis-password
的秘钥. 因此这个组件也就依赖了 mycustomsecretstore
这个 secret store 组件.
如果运行环境为 k8s, 则 auth.secretStore
可以省略, 并且会直接从包装的 kubernetesSecretStore
直接获取, 不算做依赖.
查看源码:
preprocessOneComponent
调用 processComponentSecrets
来检查是否有依赖组件, 也说明了目前 dapr 只有这一种依赖类型.
// http://github.com/zcong1993/dapr-1/blob/a8ee30180e1183e2a2e4d00c283448af6d73d0d0/pkg/runtime/runtime.go#L1831-L1831
func (a *DaprRuntime) processComponentSecrets(component components_v1alpha1.Component) (components_v1alpha1.Component, string) {
cache := map[string]secretstores.GetSecretResponse{}
for i, m := range component.Spec.Metadata {
// metadata[i].SecretKeyRef 为空时代表没有引用
if m.SecretKeyRef.Name == "" {
continue
}
// 获取 auth.secretStore
secretStoreName := a.authSecretStoreOrDefault(component)
secretStore := a.getSecretStore(secretStoreName)
// secretStore 还没 load 好, 返回当前组件和此 secret 作为依赖
if secretStore == nil {
log.Warnf("component %s references a secret store that isn't loaded: %s", component.Name, secretStoreName)
return component, secretStoreName
}
// 下面是依赖 secret 组件已经 load 完成, 或者在 k8s 环境下
// k8s 环境 dapr operator 会帮我们将 secrets 引用放入 Value 中
if a.runtimeConfig.Mode == modes.KubernetesMode && secretStoreName == kubernetesSecretStore {
val := m.Value.Raw
var jsonVal string
err := json.Unmarshal(val, &jsonVal)
if err != nil {
log.Errorf("error decoding secret: %s", err)
continue
}
dec, err := base64.StdEncoding.DecodeString(jsonVal)
if err != nil {
log.Errorf("error decoding secret: %s", err)
continue
}
m.Value = components_v1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: dec,
},
}
component.Spec.Metadata[i] = m
continue
}
// 缓存
resp, ok := cache[m.SecretKeyRef.Name]
if !ok {
// 从 secretStore 获取 secrets
r, err := secretStore.GetSecret(secretstores.GetSecretRequest{
Name: m.SecretKeyRef.Name,
Metadata: map[string]string{
"namespace": component.ObjectMeta.Namespace,
},
})
if err != nil {
log.Errorf("error getting secret: %s", err)
continue
}
resp = r
}
// 优先使用 SecretKeyRef.Key 其次是 SecretKeyRef.Name 获取引用
secretKeyName := m.SecretKeyRef.Key
if secretKeyName == "" {
secretKeyName = m.SecretKeyRef.Name
}
// 将获取到的 secret 放入 Value 中
val, ok := resp.Data[secretKeyName]
if ok {
component.Spec.Metadata[i].Value = components_v1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte(val),
},
}
}
cache[m.SecretKeyRef.Name] = resp
}
return component, ""
}
总结一下:
- 检查组件
metadata[i].SecretKeyRef
是否有引用定义, 没有则代表当前组件没有依赖 - 有引用 secret store 且该组件没有初始化完成时, 会将引用 secret store 作为当前组件的依赖, 此关系会被存入
runtime.pendingComponentDependents
中, 当前组件会在引用 secret store 初始化完成之后再被初始化 - 有引用且 secret store 已初始化完成, k8s 模式下 operator 已经将引用 secret 放入 value 中了, 只需要简单 base64 decode 下; 其他模式下调用
secretStore.GetSecret
获取秘钥并放在 value 中