KubeEdgeKubeEdge,顾名思义就是依托k8s的容器编排能力和调度能力,实现云边协同、计算下沉、海量设备的平滑接入, 旨在为边缘计算和IoT这两大物联网领域提供统一的平台方案,本次分享将给大家简单介绍目前KubeEdge项目进展,它的架构设计,以及我们基于KubeEdge做了哪些工作, 并详细分析KubeEdge是如何解决云边网络通信问题的。

本文主要内容取自一次我在公司的内部分享

KubeEdge项目介绍

KubeEdge是华为推出的业内首个开源边缘容器平台项目,是由华为云边缘计算产品IEF开源而来,Apache 2.0开源协议,并于2019年3月捐献给CNCF基金会,在2019年9月晋级为孵化级项目, 在CNCF的社区建设上,KubeEdge积极参与kubernetes IoT Edge WG,同时也成立了Device/IoT与MEC两个SIG,可以说KubeEdge先发制人, 在社区已经拥有了足够多话语权。目前参与社区贡献的企业包括:中国联通,ARM,中国移动,谐云,中国电信,时速云,JD.com,浙大SEL实验室,EMQ, InfoBlox,Inovex,Midokura等。

基于kubernetes构建边缘计算的优点和痛点

为什么要基于kubernetes构建边缘计算平台呢?因为kubernetes是在太香了🤪,那我简单概括了kubernetes主要的四核心的优势:

容器化应用封装: 现已成为应用交付的一个趋势,我们可以把应用打包到容器中,且只需要只打包一次, 可以跑在各种地方,如果应用到IoT领域,有很多传统IoT嵌入式设备,它其实很多硬件和软件是强相关的, 更换一个硬件,可能软件(固件)都需要更改,如果使用容器化封装后,设备可以支持容器runtime,我们就可以将容器跑在任何IoT设备上。

通用应用抽象定义 kubernetes的API包括pod的定义以及development,daemonset等控制器的定义现在其实在业内已经形成一套标准, 大家都比较了解和认可,基于这些通用应用负载模型做边缘计算平台,大家也更能接受。

松耦合架构 kubernetess的架构设计具有相当的先进性,可拓展性比较好,比如我们基于k8s之上可以通过CRD来定义一些API,通过设备管理CRD来定义一些IoT里的device的API, 我们即可以通过k8s的一些管理方式管理这些设备;还有一些有意义的可拓展点,比如边缘节点可以对接各种runtime,有些边缘节点它的资源非常有限, 我们就可以对接一些轻量化的runtime。

丰富的社区生态 k8s其丰富的拓展性给我们带来无限可能,同时借助其生态,能够极大地赋能边缘计算平台,减少开发工作量的同时也有强大的社区保障。

下图是KubeEdge给出的其核心的优势,不难看出,大部分都是由于其借助了kubernetes构建平台所带来的。

当然基于kubernetes还有很多优势,那么有哪些边缘计算的常见的关键痛点需要KubeEdge解决呢?

  1. 资源有限:常见的网关设备内存很少,无法满足kubernetes对边缘节点的资源需求
  2. 网络不畅
    • 边缘位于私有网络,无公网IP
    • 云边跨越公网,带宽有限,延时高
    • k8s的List-watch需要数据中心网络
  3. 边缘节点如何自治
    • 网络不稳,随时可能离线
    • 边缘业务离线工作
    • 边缘离线可故障恢复
  4. 设备接入和管理
    • 缺少边缘设备抽象
    • 缺少边缘设备接入协议的支持

KubeEdge的架构及核心理念

KubeEdge架构主要是分了云、边、端三部分,云上边就是我们的控制面,边就是我们的边缘节点, 端就是跑了我们的一些端侧设备,云上左边是一个Kubernetes的master,是没有做过改动的原生的K8s控制), 后边KubeEdge的一个组件叫CloudCore,云上的组件主要是会拿一些Kubernetes控制面上的东西, 通过EdgeController和DeviceController做一些处理,然后通过下边的Cloud Hub,Cloud Hub主要是跟边端通信的, 边端有个EdgeHub和Cloud Hub通信,然后把数据拿下来。

边端是主要做了一个应用管理和设备管理的能力,应用管理左边会有一个Edged, 右边有DeviceTwin、EventBus,分别是应用管理和设备管理,左边有个DataStore,就是我们说的本地自治的能力, 比如说我们这应用或者设备的元素从云上分发下来,我们是先把它存到一个数据库里,然后再到它的Edged或者设备里边, 这样就能保证云边网络断开或者边缘节点重启了以后我应用的Edged它可以从数据库里把应用源数据拿出来, 这样就能保证在故障的情况下业务可以正常恢复。

已实现的核心功能

  1. 云边可靠协同
    • 双向多路服用消息通道,支持边缘节点位于私有网络
    • Websocket + 消息封装,大幅减少通信压力,高延时下仍可正常工作
    • 云边消息校验
  2. 边缘离线自治
    • 节点云数据持久化,实现节点级离线自治
    • 节点故障恢复,无需List-watch,降低网络压力,快速ready
  3. 边缘极致轻量
    • 重组Kubelet功能模块,极致轻量化(~70MB内存占用)
    • 支持CRI集成,Containerd、CRI-O,优化runtime资源消耗
  4. 边缘设备管理
    • 云端通过kubernetes API管理设备Device

KubeEdge2.0 Roadmap

以下是官方给的2.0计划支持的亮点功能:

  1. 支持istio;
  2. 支持FaaS;
  3. 支持更多类型的设备协议到边缘节点,如AMQP,蓝牙,ZigBee等;
  4. 支持千个边缘节点和数百万个设备的超大规模边缘集群(kubernetes推荐单集群最大节点数是1000);
  5. 实现数千节点的应用智能调度;

KubeEdge的不足以及我们需要去做的工作

  1. 相关组件没有容器化以及缺乏高可用性方案
  2. 项目处于发展中期,很多东西处于不稳定状态
  3. 社区不是特别活跃,比如example目前无法在1.4版本上正常运行
  4. 安装工具keadm没有支持离线安装,且有一些缺陷
  5. 没有具体性能测试数据支撑

目前我们做的

  1. 基于公司PaaS产品提供完整的云端用户体验包括统一内置镜像仓,管理界面等;
  2. 自定义EdgeAppController,以满足边缘场景下应用下发的需求;
  3. 完整的监控告警支持,以及自定义指标的开发(Exporter的改造);
  4. 管理员友好的Dashboard,包括更友好的应用部署,节点接入等功能;
  5. 一些稳定性改造(云端组件cloudcore的service化);
  6. KubeEdge的定制化,用于合并一些无法第一时间被社区合并Release的有价值的功能分支,定制化我们的功能;

kubeedge通信原理

我们是否能像管理普通节点一样管理边缘节点呢?比如说查看某个容器的调试日志,资源用量,Exec到某个容器中进行调试, 通过Prometheus监控节点和应用等等,这些功能对于无论是PaaS还是边缘计算平台都非常重要💰。KubeEdge较早版本没有给出解决方案, 也就是说这些功能无法从KubeEdge开箱直接获取,实现以上功能最大的障碍就是网络问题🌎。

目前针对这些功能,较新版本KubeEdge已经给出对应的解决方案,即通过隧道Tunnel+自定义消息实现,主要对应的模块是CloudStream和EdgeStream。 当我们需要这些功能时,必须开启这两个模块,同时进行配置,具体配置可以参考官方文档,以下这两个模块的通信示意图。

这里我以Prometheus抓取Edge节点容器运行时指标数据为例,帮大家大致梳理流程以及主要函数。

在普通kubernetes集群中可以通过安装cadvisor获取容器运行时的一些指标数据, KubeEdge受限于资源的限制,无法在边缘节点安装cadvisor组件,目前Edged已实现了cadvisor的指标定义。

现在,prometheus要抓取edge1(10.31.100.4)上edged所定义的指标时,请求10.31.100.4:10350/metrics/cadvisor, 由于我们配置iptables nat表的OUTPUT,所有target port为10350报文都会被转发到本机的10003的端口, 由于TCP协议栈处于内核,在应用层收到请求的Source IP和Port不会改变,至此该请求将会有CloudStream内置的Http Server处理, 这个Http服务默认端口为10003,它的主要功能就是处理被拦截的各种流量,目前包括Log、Metrics、节点统计等流量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *StreamServer) installDebugHandler() {
ws := new(restful.WebService)
ws.Path("/containerLogs")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getContainerLogs))
s.container.Add(ws)
ws = new(restful.WebService)
ws.Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec))
s.container.Add(ws)
ws = new(restful.WebService)
ws.Path("/stats")
ws.Route(ws.GET("").
To(s.getMetrics))
ws.Route(ws.GET("/summary").
To(s.getMetrics))
ws.Route(ws.GET("/container").
To(s.getMetrics))
ws.Route(ws.GET("/{podName}/{containerName}").
To(s.getMetrics))
ws.Route(ws.GET("/{namespace}/{podName}/{uid}/{containerName}").
To(s.getMetrics))
s.container.Add(ws)
// metrics api is widely used for Promethus
ws = new(restful.WebService)
ws.Path("/metrics")
ws.Route(ws.GET("").
To(s.getMetrics))
ws.Route(ws.GET("/cadvisor").
To(s.getMetrics))
s.container.Add(ws)
}

指标对应处理请求函数为getMetrics,主要过程是根据r.Request.Host找到对应Session(cloudstream维护了到每个edge节点的session池,Key即edge的Internal Ip, 每个session负责向Tunnel发送和读取Message),创建新的处理的协程,并将其添加到任务池Session.apiServerConn中,处理协程中将真实的请求数据(原请求)编码为Message, 并通过通道发送edgenode1的edgestream进行处理。

🧐此时,请求已经被发送到edgenode1上,且被edgestream的轮询监听函数收到,edgestream会通过函数ReadMessageFromTunnel对消息进行解码, 判断消息的类型,这里显然是一个Metrics类型的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func ReadMessageFromTunnel(r io.Reader) (*Message, error) {
buf := bufio.NewReader(r)
connectID, err := binary.ReadUvarint(buf)
if err != nil {
return nil, err
}
messageType, err := binary.ReadUvarint(buf)
if err != nil {
return nil, err
}
data, err := ioutil.ReadAll(buf)
if err != nil {
return nil, err
}
klog.Infof("Receive Tunnel message Connectid %d messageType %s data:%v string:[%v]",
connectID, MessageType(messageType), data, string(data))
return &Message{
ConnectID: connectID,
MessageType: MessageType(messageType),
Data: data,
}, nil
}

如果是合法的消息,edgestream创建新的协程进行处理,并把这些处理协程添加统一的任务池中,这里Metrics类型的处理过程为, 将Message进行解码得到原始请求数据,并发起真实请求(这时已经成功穿透到内网啦),并将Metrics数据在编码成Message通过通道发送到cloudstream中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 消息解码
func (s *TunnelSession) serveMetricsConnection(m *stream.Message) error {
metricsCon := &stream.EdgedMetricsConnection{
ReadChan: make(chan *stream.Message, 128),
}
if err := json.Unmarshal(m.Data, metricsCon); err != nil {
klog.Errorf("unmarshal connector data error %v", err)
return err
}

s.AddLocalConnection(m.ConnectID, metricsCon)
return metricsCon.Serve(s.Tunnel)
}
// 发送真实请求
func (ms *EdgedMetricsConnection) Serve(tunnel SafeWriteTunneler) error {
client := http.Client{}
req, err := http.NewRequest("GET", ms.URL.String(), nil)
if err != nil {
klog.Errorf("create new metrics request error %v", err)
return err
}
req.Header = ms.Header
// 目前通道只支持Text message,这里解决了Prometheus抓取失败的问题
req.Header.Set("accept-encoding", "identity")
resp, err := client.Do(req)
if err != nil {
klog.Errorf("request metrics error %v", err)
return err
}
defer resp.Body.Close()
scan := bufio.NewScanner(resp.Body)
stop := make(chan struct{})

go func() {
for mess := range ms.ReadChan {
if mess.MessageType == MessageTypeRemoveConnect {
klog.Infof("receive remove client id %v", mess.ConnectID)
close(stop)
return
}
}
}()

defer func() {
for retry := 0; retry < 3; retry++ {
msg := NewMessage(ms.MessID, MessageTypeRemoveConnect, nil)
if err := msg.WriteTo(tunnel); err != nil {
klog.Errorf("%v send %s message error %v", ms, msg.MessageType, err)
} else {
break
}
}
}()

for scan.Scan() {
select {
case <-stop:
klog.Infof("receive stop single, so stop metrics scan ...")
return nil
default:
}
// 10 = \n
msg := NewMessage(ms.MessID, MessageTypeData, append(scan.Bytes(), 10))
err := msg.WriteTo(tunnel)
if err != nil {
klog.Errorf("write tunnel message %v error", msg)
return err
}
klog.Infof("%v write metrics data %v", ms.String(), string(scan.Bytes()))
}
return nil
}

最后一步,终于要结束啦😅!cloudstream会读取Message,解码得到用户需要的指标数据,并响应请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *Session) Serve() {
defer s.Close()

for {
t, r, err := s.tunnel.NextReader()
if err != nil {
klog.Errorf("get %v reader error %v", s, err)
return
}
if t != websocket.TextMessage {
klog.Errorf("Websocket message type must be %v type", websocket.TextMessage)
return
}
message, err := stream.ReadMessageFromTunnel(r)
if err != nil {
klog.Errorf("Read message from tunnel %v error %v", s.String(), err)
return
}

if err := s.ProxyTunnelMessageToApiserver(message); err != nil {
klog.Errorf("Proxy tunnel message [%s] to kube-apiserver error %v", message.String(), err)
continue
}
}
}

思考一下通过拦截流量实现穿透有什么弊端?

讲讲跳过的坑

讲讲我在验证和学习kubeedge网络设计时的故事(坑爹过程):

  1. Kubernetes的Node的Status.daemonEndpoints.KubeletEndpoint.Port你知道吗? 其实当我们kubectl exec时,最终发起向kubelet请求,kubelet的端口就是由该字段确定。
  2. calico复杂的iptables规则,让容器的流量变得十分怪异,最终只能使用hostNetwork模式啦🤬
  3. 你知道prometheus抓取端口时,会自动加上请求头Accept-Encoding: gzip,并且kubeedge tunnel目前不支持Binary Message吗?
  4. 国家统计局相关统计数据表明,那两天我敲的curl命令可绕地球两圈,prometheus的重装次数高达百万次!👊

总结

KubeEdge有非常良好的前景,这主要得力于云边协同的思想,虽然目前开源的KubeEdge整体的方案还不完善,但是, 随着用户的增多和社区的发展,KubeEdge将会越来越成熟,对于想要加入KubeEdge社区的朋友,目前也是最佳时机。 同时,我们基于KubeEdge所做的产品化工作目前还处于初级阶段,但是有了Kubernetes的产品化改造的经验,相关工作也在有条不紊的进行。

参考资料

除了官方仓库和文档,以下资料也十分有价值。

  1. KubeEdge-Kubernetes Native Edge Computing Framework(2019-kubecon)
  2. KubeEdge Deep Dive(2019-kubecon)
  3. kubeEdge架构解读
  4. 华为云课堂-KubeEdge实战宝典
  5. QUIC-Quick UDP Internet Connection