这篇“Argo Event云原生的事件驱动架构怎么用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Argo Event云原生的事件驱动架构怎么用”文章吧。
1、Argo Event简介
Event事件大家都很熟悉,可以说Kubernetes就是完全由事件驱动的,不同的controller manager本质就是实现了不同的事件处理函数,比如所有ReplicaSet对象是由ReplicaSetController控制器管理,该控制器通过Informer监听ReplicaSet以及其关联的Pod的事件变化,从而维持运行状态和我们声明spec保持一致。
当然Kubernetes无论是什么Controller,其监听和处理的都是内部事件,而在应用层上我们也有很多外部事件,比如CICD事件、Webhook事件、日志事件等等,如何处理这些事件呢,目前Kubernetes原生是无法实现的。
当然你可以自己实现一个event handler运行在Kubernetes平台,不过实现难度也不小。而Argo Event组件完美解决了这个问题。
如图是Argo Event官方提供的的流程图:
首先事件源EventSource可以是Webhook、S3、Github、SQS等等,中间会经过一个叫Gateway(新版本叫EventBus)的组件,更准确地说老版本原来gateway的配置功能已经合并到EventSource了,EventBus是新引入的组件,后端默认基于高性能分布式消息中间件NATS[1]实现,当然其他中间件比如Kafka也是可以的。
这个EventBus可以看做是事件的一个消息队列,消息生产者连接EvenSource,EventSource又连接到Sensor。更详细地说EvenSource把事件发送给EvenBus,Sensor会订阅EvenBus的消息队列,EvenBus负责把事件转发到已订阅该事件的Sensor组件,EventSorce在上图中没有体现,具体设计文档可以参考Argo-events Enhancement Proposals[2]。
有些人可能会说为什么EventBus不直接到Trigger,中间引入一个Sensor,这主要是两个原因,一是为了使事件转发和处理松耦合,二是为了实现Trigger事件的参数化,通过Sensor不仅可以实现事件的过滤,还可以实现事件的参数化,比如后面的Trigger是创建一个Kubernetes Pod,那这个Pod的metadata、env等,都可以根据事件内容进行填充。
Sensor组件注册关联了一个或者多个触发器,这些触发器可以触发AWS Lambda事件、Argo Workflow事件、Kubernetes Objects等,通俗简单地说,可以执行Lambda函数,可以动态地创建Kubernetes的对象或者创建前面的介绍的Workflow。
还记得前面介绍的Argo Rollout吗,我们演示了手动promote实现应用发布或者回滚,通过Argo Event就可以很完美地和测试平台或者CI/CD事件结合起来,实现自动应用自动发布或者回滚。
2、一个简单的Webhook例子
关于Argo Event的部署非常简单,直接通过kubecl apply或者helm均可,可以参考文档Installation[3],这里不再赘述。
Argo Event部署完成后注意还需要部署EventBus,官方推荐使用NATS中间件,文档中有部署NATS stateful的文档。
接下来我们以一个最简单的Webhook事件为例,从而了解Argo Event的几个组件功能以及用法。
首先按照前面的介绍,我们需要先定义EventSource:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
webhook_example:
port: "12000"
endpoint: /webhook
method: POST
这个EventSource定义了一个webhook webhook_example
,端口为12000,路径为/webhook
,一般Webhook为POST方法,因此该Webhhok处理器我们配置只接收POST
方法。
为了把这个Webhook EventSource暴露,我们还创建了一个Service,端口也是12000。
此时我们可以手动curl该Service:
# kubectl get svc -l eventsource-name=webhook
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
webhook-eventsource-svc ClusterIP 10.96.93.24 <none> 12000/TCP 5m49s
# curl -X POST -d '{}' 10.96.93.24:12000/webhook
success
当然此时由于没有注册任何的Sensor,因此什么都不会发生。
接下来我们定义Sensor:
首先在dependencies
中定义了订阅的EventSource以及具体的Webhook,由于一个EventSource可以定义多个Webhook,因此必须同时指定EventSource和Webhook两个参数。
在Trigger中我们定义了对应Action为create一个workflow,这个workflow的spec定义在resource中配置。
最后的parameters
部分定义了workflow的参数,这些参数值从event中获取,这里我们会把整个event都当作workflow的input。当然你可以通过dataKey只汲取body部分:dataKey: body.message
。
此时我们再次curl这个webhook事件:
curl -X POST -d '{"message": "HelloWorld!"}' 10.96.93.24:12000/webhook
此时我们获取argo workflow列表发现新创建了一个实例:
# argo list
NAME STATUS AGE DURATION PRIORITY
webhook-8xt4s Succeeded 1m 18s 0
查看workflow输出如下:
由于我们是把整个event作为workflow input发过去的,因此data内容部分是base64编码,我们可以查看解码后的内容如下:
{
"header": {
"Accept": [
"*/*"
],
"Content-Length": [
"26"
],
"Content-Type": [
"application/x-www-form-urlencoded"
],
"User-Agent": [
"curl/7.58.0"
]
},
"body": {
"message": "HelloWorld!"
}
}
从这里我们也可以看出Event包含两个部分,一个是context,一个是data,data中又包含header部分以及body部分,在parameters
中可以通过Key获取任意部分内容。
如上的webhook触发是通过手动curl的,你可以很容易地在github或者bitbucket上配置到webhook中,这样一旦代码有更新就能触发这个事件了。
3、Kubernetes触发AWS Lambda函数
前面的例子中的EventSource使用了Webhook,除了Webhook,Argo Event还支持很多的EventSource,比如:
amqp
aws-sns
aws-sqs
github/gitlab
hdfs
kafka
redis
Kubernetes resource
...
Trigger也同样支持很多,比如:
如上官方都提供了非常丰富的例子,可以参考argo events examples[4]。
这里以Kubernetes resource事件源为例,这个事件监听Kubernetes的资源状态,比如Pod创建、删除等,这里以创建Pod为例:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: k8s-resource-demo
spec:
template:
serviceAccountName: argo-events-sa
resource:
pod_demo:
namespace: argo-events
version: v1
resource: pods
eventTypes:
- ADD
filter:
afterStart: true
labels:
- key: app
operation: "=="
value: my-pod
如上例子监听Pods的ADD事件,即创建Pod,filter中过滤只有包含app=my-pod标签的Pod,特别需要注意的是使用的serviceaccount argo-events-sa
必须具有Pod的list、watch权限。
接下来我们使用AWS Lambda触发器,Lambda函数已经在AWS提前创建好:
这个Lambda函数很简单,直接返回event本身。
创建Sensor如下:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: aws-lambda-trigger-demo
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: test-dep
eventSourceName: k8s-resource-demo
eventName: pod_demo
triggers:
- template:
name: lambda-trigger
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: cn-northwest-1
payload:
- src:
dependencyName: test-dep
dataKey: body.name
dest: name
如上AWS access key和access secret需要提前放到aws-secret中。
此时我们创建一个新的Pod my-pod:
apiVersion: v1
kind: Pod
metadata:
labels:
app: my-pod
name: my-pod
spec:
containers:
- image: nginx
name: my-pod
dnsPolicy: ClusterFirst
restartPolicy: Always
当Pod启动后,我们发现AWS Lambda函数被触发执行:
4、event filter
前面的例子中webhook中所有的事件都会被sensor触发,我们有时不需要处理所有的事件,Argo Event支持基于data以及context过滤,比如我们只处理message为hello
或者为hey
的事件,其他消息忽略,只需要在原来的dependencies
中test-dep
增加filter即可:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: webhook_example
filters:
- name: data-filter
data:
- path: body.message
type: string
value:
- "hello"
- "hey"
filter指定了基于data
过滤,过滤的字段为body.message
,匹配的内容为hello、hey
。
5、trigger policy
trigger policy主要用来判断最后触发器执行的结果是成功还是失败,如果是创建Kubernetes资源比如Workflow,可以根据Workflow最终状态决定这个Trigger的执行结果,而如果是触发一个HTTP或者AWS Lambda,则需要自定义policy status。
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: us-east-1
payload:
- src:
dependencyName: test-dep
dataKey: body.message
dest: message
policy:
status:
allow:
- 200
- 201
如上表示当AWS Lambda返回200或者201时表示Trigger成功。
以上就是关于“Argo Event云原生的事件驱动架构怎么用”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注天达云行业资讯频道。