Loading...
墨滴

一介刁民

2021/05/16  阅读:39  主题:橙心

为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源

为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源

我们都知道,Faas 最主要的两个特征是事件驱动自动扩速容(支持缩容到零),本文就带你研究 Faas 事件驱动的进阶开发:如何为你的 Faas 平台 添加自己的事件驱动源。

注意:仅适用于云原生的 Faas 平台,即基于 k8s 的 Faas 平台,因为本文的所有逻辑都是基于 k8s 的资源来进行操作的。

1. 基本原理

解释图中的几个概念

  • your Source CR: 这个是 k8s 的自定义资源 (CRD),CR 中主要包含事件源的相关字段,以及接收方的信息,(如果你是一个定时器的事件源,那么包含事件消息,事件的生成时间,接收方的地址等)如果你不知道 k8s 的 CRD 是什么,可以参考 k8s 官网 [https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/]
  • Adapter:这个是需要自己开发的,是 k8sDeployment,主要是用来生成事件,或者将其他信息源转为你的 Faas 平台统一的消息格式 (比如统一转为 Serverless 的标准格式 CloudEvents),即事件生成器或者事件适配器。
    • 消息如何产生:
      • adapter 自己生成:这种情况是没有外部输入的,adapter 自己生成消息数据,转化为统一的消息格式 (如 CloudEvents)
      • 基于外部信息源:如果是基于外部信息的事件源,则根据需要主动去外部轮询 pull 指定接口或者被动监听相关数据,然后转化为统一的消息格式 (如 CloudEvents)
    • 消息如何发出:
      • adapter 中需要有环境变量指定接收方的地址,图中 key 为 SinkURL 的环境变量就指定了消息接收方的地址,这里可以是 k8sservice 的集群内域名(yoursvc.yournamespace.svc.cluster.local) 或者 knative serviceurl
  • controller:这个是需要自己开发的, k8s 的自定义 controller,主要作用是将 CR 资源 生成 Adapter Deployment

开发流程

介绍完 基本原理之后,下面就带你研究如何一步步开发自己的事件驱动事件源。

前提条件:熟悉 kubernetes 开发,熟悉 Go 开发, 如果能利用 Ko 什么?云原生开发你竟然不知道 “Ko”,那更好不过了

代码参考 https://github.com/knative-sandbox/sample-source

1. API 定义

定义 事件源 CRD 结构体, CRD 结构体包含 事件驱动事件源的必要信息,如 duckv1.SourceSpec (包含 Sink )定义了事件接收方的地址

# pkg/apis/samples/v1alpha1/samplesource_types.go

// +genclient
// +genreconciler
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
type SampleSource struct {
 metav1.TypeMeta `json:",inline"`
 // +optional
 metav1.ObjectMeta `json:"metadata,omitempty"`

 // Spec holds the desired state of the SampleSource (from the client).
 Spec SampleSourceSpec `json:"spec"`

 // Status communicates the observed state of the SampleSource (from the controller).
 // +optional
 Status SampleSourceStatus `json:"status,omitempty"`
}

// SampleSourceSpec holds the desired state of the SampleSource (from the client).
type SampleSourceSpec struct {
 // inherits duck/v1 SourceSpec, which currently provides:
    // * Sink - a reference to an object that will resolve to a domain name or
    //   a URI directly to use as the sink.
    // * CloudEventOverrides - defines overrides to control the output format
    //   and modifications of the event sent to the sink.
    duckv1.SourceSpec `json:",inline"`

    // Interval is the time interval between events.
    //
    // The string format is a sequence of decimal numbers, each with optional
    // fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
    // units are "ns", "us" (or "µs"), "ms", "s", "m", "h". If unspecified
    // this will default to "10s".
    Interval string `json:"interval"`
}

// SampleSourceStatus communicates the observed state of the SampleSource (from the controller).
type SampleSourceStatus struct {
 duckv1.Status `json:",inline"`

 // SinkURI is the current active sink URI that has been configured
 // for the SampleSource.
 // +optional
 SinkURI *apis.URL `json:"sinkUri,omitempty"`
}

结构体中 Spec 主要用于定义事件源的期望状态,status 中定义事件源的实际状态,statuscontroller来同步。

这是一个定时事件源,spec 中主要定义了 定时器的时间间隔 interval ,接收方的地址duckv1.SourceSpec.Sinkstatus 中主要定义了事件源 CR 的状态,(是否Ready),接收方的地址 SinkURI

其中,status 的状态由 controller 中对应资源的 Reconciler 来更改,可以通过下面的函数来对 status 进行更改

# pkg/apis/samples/VERSION/samplesource_lifecycle.go
// InitializeConditions sets relevant unset conditions to Unknown state.
func (s *SampleSourceStatus) InitializeConditions() {
 SampleCondSet.Manage(s).InitializeConditions()
}

...

// MarkSink sets the condition that the source has a sink configured.
func (s *SampleSourceStatus) MarkSink(uri *apis.URL) {
 s.SinkURI = uri
 if len(uri.String()) > 0 {
  SampleCondSet.Manage(s).MarkTrue(SampleConditionSinkProvided)
 } else {
  SampleCondSet.Manage(s).MarkUnknown(SampleConditionSinkProvided, "SinkEmpty""Sink has resolved to empty.%s""")
 }
}

// MarkNoSink sets the condition that the source does not have a sink configured.
func (s *SampleSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) {
 SampleCondSet.Manage(s).MarkFalse(SampleConditionSinkProvided, reason, messageFormat, messageA...)
}

2. Controller

其中 controller 的部分与之前的文章 如何基于 Knative 开发 自定义controller 类似,想深究的小伙伴可以翻看一下那篇文章,此处不做过多的详细解析。

controller 入口

#cmd/controller
import (
 // The set of controllers this controller process runs.
 "knative.dev/sample-source/pkg/reconciler/sample"

 // This defines the shared main for injected controllers.
 "knative.dev/pkg/injection/sharedmain"
)

func main() {
 sharedmain.Main("sample-source-controller", sample.NewController)
}

sharedmain.Main 会传入 controller 的实例化方法

#pkg/reconciler/sample/controller.go

func NewController(
 ctx context.Context,
 cmw configmap.Watcher,
)
 *controller.Impl
 {

  // 借助 injection 从 context 中获取 informer 
 deploymentInformer := deploymentinformer.Get(ctx)
 sampleSourceInformer := samplesourceinformer.Get(ctx)

  // 实例化 Reconciler
 r := &Reconciler{
  dr: &reconciler.DeploymentReconciler{KubeClientSet: kubeclient.Get(ctx)},
  // Config accessor takes care of tracing/config/logging config propagation to the receive adapter
  configAccessor: reconcilersource.WatchConfigurations(ctx, "sample-source", cmw),
 }
 if err := envconfig.Process("", r); err != nil {
  logging.FromContext(ctx).Panicf("required environment variable is not defined: %v", err)
 }

  // 实例化 controller.impl 返回 供 controller 框架调用
 impl := samplesource.NewImpl(ctx, r)

 r.sinkResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)

 logging.FromContext(ctx).Info("Setting up event handlers")

  // 添加 informer 的hander 函数
 sampleSourceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

 deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
  FilterFunc: controller.FilterControllerGK(v1alpha1.Kind("SampleSource")),
  Handler:    controller.HandleAll(impl.EnqueueControllerOf),
 })

 return impl
}

3. Reconciler

接下来看一下 Reconciler 的实现, Reconcile 的作用有:

  • 根据 SampleSource 这个 CR 资源, 创建 DeploymentDeployment 中运行的进程是 Receive Adapter
  • 根据 Receive Adapter 这个 Deployment 的状态和接收方的状态,更新 SampleSourcestatus

对于 创建 Receive Adapter 的过程,下面来看下详细的步骤

  1. 组装 Receive Adapter 的参数
# pkg/reconciler/sample/samplesource.go

//1. 组装 Receive Adapter 的参数
raArgs := resources.ReceiveAdapterArgs{
  EventSource:    src.Namespace + "/" + src.Name,
        Image:          r.ReceiveAdapterImage,
        Source:         src,
        Labels:         resources.Labels(src.Name),
        AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
 }

2. 查看需要创建 的 Deployment 存在不存在,如果不存在需要重新创建

# pkg/reconciler/deployment.go
namespace := owner.GetObjectMeta().GetNamespace()
 ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(ctx, expected.Name, metav1.GetOptions{})

如果 deployment 已经存在,判断需要更新的情况下更新 Deployment

else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
    ra.Spec.Template.Spec = expected.Spec.Template.Spec
    if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
        return ra, err
    }

第一章中说明的 Deployment 中 的接收方的地址 SinkURI是怎么传进去的呢?别急,一步步带你研究

再来回头看下 ReconcileDeployment (sample-source/pkg/reconciler/sample/samplesource.go)这个函数

 ra, sb, event := r.dr.ReconcileDeployment(ctx, src, makeSinkBinding(src),
  resources.MakeReceiveAdapter(&resources.ReceiveAdapterArgs{
   EventSource:    src.Namespace + "/" + src.Name,
   Image:          r.ReceiveAdapterImage,
   Source:         src,
   Labels:         resources.Labels(src.Name),
   AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
  }),
 )

传入了 SinkBinding 对象,由 makeSinkBinding 构造完成 对象中包含 source中的 SourceSpec (这里面看之前的结构体可以知道,这里包含 接收方的地址 SinkURI

func makeSinkBinding(src *v1alpha1.SampleSource) *sourcesv1.SinkBinding {
 return &sourcesv1.SinkBinding{
  ObjectMeta: metav1.ObjectMeta{
   // this is necessary to track the change of sink reference.
   Name:      src.GetName(),
   Namespace: src.GetNamespace(),
  },
  Spec: sourcesv1.SinkBindingSpec{
   SourceSpec: src.Spec.SourceSpec,
  },
 }
}

ReconcileDeployment 中有个 syncSink 函数,这个比较关键,这是将 SinkURI 传入 deployment 环境变量的关键一步,

syncSink(ctx, binder, expected.Spec.Template.Spec)


func syncSink(ctx context.Context, binder *sourcesv1.SinkBinding, now corev1.PodSpec) {
	// call Do() to project sink information.
	ps := &duckv1.WithPod{}
	ps.Spec.Template.Spec = now

	binder.Do(ctx, ps)
}

binder.Do 中有以下逻辑,将 Sink 注入 PodEnv

  spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
   Name:  "K_SINK",
   Value: uri.String(),
  })

****

4. Receive Adapter

Receive Adapter 是 用来产生事件源的组件,事件源可以由 Receive Adapter 独自产生,或者 Receive Apapter 将其他信息转换成事件源。下面解析 Receive Adapter 的实现

  1. 入口函数
# sample-source/cmd/receive_adapter/main.go
package main

import (
 "knative.dev/eventing/pkg/adapter"
 myadapter "knative.dev/sample-source/pkg/adapter"
)

func main() {
 adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
}

adapter.Main 函数入参为 adapter.NewEnvadapter.NewAdapter 构造函数

  • adapter.Main 会解析运行时的环境变量并加载到 adapter.NewEnv 中定义的变量中,并调用 Adapter 结构提的 Start() 方法 knative.dev/eventing/pkg/adapter/v2/main.go#197 (`https://github.com/knative/eventing/blob/main/pkg/adapter/v2/main.go`),
  • 另外还是根据环境变量构造 CloudEvents client,传递给 adapter.NewAdapter
 #knative.dev/eventing/pkg/adapter/v2/cloudevents.go#48
 
 // 1. 获取到事件接受方的地址
 if target == "" && env != nil {
  target = env.GetSink()
 }
 
 #knative.dev/eventing/pkg/adapter/v2/cloudevents.go#54
 
 // 2. 构造 eventclient的 target 参数
  if len(target) > 0 {
  pOpts = append(pOpts, cloudevents.WithTarget(target))
 }

#knative.dev/eventing/pkg/adapter/v2/main.go#166

 // 3. 调用adapter.NewAdapter 构造函数,实例化 Adapter 结构体
 adapter := ctor(ctx, env, eventsClient)


#knative.dev/eventing/pkg/adapter/v2/main.go#196

  // 4. 调用 Adapter Start 函数
 if err := adapter.Start(ctx); err != nil {
  logger.Fatalw("Start returned an error", zap.Error(err))
 }

2.下面看 adapter.NewAdapter

下面看 Adapter 结构体,Adapter 实现了 Start 方法,用于 main函数的调用

// Adapter generates events at a regular interval.
type Adapter struct {
 logger   *zap.Logger
 interval time.Duration
 nextID   int
 client   cloudevents.Client
}
...

Start() 方法

Start() 方法中 调用了 Main函数中传入的 Cloudevent client 来发送消息 a.client.Send...

func (a *Adapter) Start(ctx context.Context) error {
 a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String()))
 for {
  select {
  case <-time.After(a.interval):
   event := a.newEvent()
   a.logger.Infow("Sending new event", zap.String("event", event.String()))
   if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) {
    a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result))
    // We got an error but it could be transient, try again next interval.
    continue
   }
  case <-ctx.Done():
   a.logger.Info("Shutting down...")
   return nil
  }
 }
}

5. CRD 定义与 示例 CR

CRD 的生成方法参考本公众号发的这篇文章: 如何基于 Knative 开发 自定义controller

# sample-source/config/300-samplesource.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  labels:
    samples.knative.dev/release: devel
    eventing.knative.dev/source: "true"
    knative.dev/crd-install: "true"
  annotations:
    registry.knative.dev/eventTypes: |
      [
        { "type": "dev.knative.sample" }
      ]
  name: samplesources.samples.knative.dev
spec:
  group: samples.knative.dev
  versions:
    - &version
      name: v1alpha1
      served: true
      storage: true
      subresources:
        status: {}
      schema:
        openAPIV3Schema:
          type: object
          # this is a work around so we don't need to flesh out the
          # schema for each version at this time
          #
          # see issue: https://github.com/knative/serving/issues/912
          x-kubernetes-preserve-unknown-fields: true
      additionalPrinterColumns:
        - name: Ready
          type: string
          jsonPath: ".status.conditions[?(@.type=='Ready')].status"
        - name: Reason
          type: string
          jsonPath: ".status.conditions[?(@.type=='Ready')].reason"
        - name: Sink
          type: string
          jsonPath: .status.sinkUri
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp
  names:
    categories:
    - all
    - knative
    - eventing
    - sources
    kind: SampleSource
    plural: samplesources
  scope: Namespaced

自定义事件源举例

apiVersion: samples.knative.dev/v1alpha1
kind: SampleSource
metadata:
  name: sample-source
  namespace: knative-samples
spec:
  interval: "10s"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

6. Debug

详细代码可以 fork 仓库 https://github.com/knative-sandbox/sample-source

如果已经看过之前的这篇文章[什么?云原生开发你竟然不知道 “Ko”],那么调试起来就非常轻松了,只需一行命令就可以:

ko apply -f config/*

还等什么,赶紧操练起来吧!

关注公众号: Knative,了解更多 Serverless 、Knative,云原生相关资讯

关注公众号,回复 "进群",即可进群与众多云原生 Serverless 技术大佬探讨技术,探讨人生。

在这里插入图片描述
在这里插入图片描述

一介刁民

2021/05/16  阅读:39  主题:橙心

作者介绍

一介刁民