数据库管控平台设计与实现-服务端

引言

各大公司都有庞大的数据库资源,涉及机器轻则上千,基本都是以万计。几乎每天都有机器宕机,或者发生网络延迟,或者发生机房故障。稍有不慎便容易出现管控缺失,数据库不稳定的问题。而数据库稳定性一般都是最重要的一环。

一般数据库管控维度为分数据库种类,再细分对应的业务集群。如何设计一个数据库管控系统,其具备一些公共能力,能够对各种数据库集群进行相应的具体的管控逻辑的开发,具有一定的挑战。

首先,当前业界通用的做法是如下图所示,通过在数据库机器中内嵌相应的agent进程,该agent负责管理当前机器上的数据库进程,上报与采集内核信息,以实现相应的管控逻辑。

当然一个好的平台不仅需要强大的后端组件支持,还需要前端项目用于展示和运维人员操作的,在本文中着重讨论后端部分。

运维模块

插件系统

一些数据库,对应运维操作相当复杂,即运维管控代码庞大。若以脚本语言如python完全去写起管控逻辑,到后期几乎无法维护,因此管控平台需要能够提供原生语言的编写能力,一般管控平台以golang编写,则应当允许数据库运维开发方能够直接使用golang编写运维逻辑。

一种实现方法是,提供某种代码生成器,将业务golang的接口代码直接生成与管控平台黏合的代码,并编译成.so文件,供动态链接,这种方法的好处是,一个mgr可以链接多个插件系统,即服务多种数据库。

任务执行

负责任务的创建、管理、下发、agent回复处理等工作

该模块是数据库管控平台最重要的模块。每一个管控逻辑,都是由一个或多个任务组成。而强大的管控平台可以提供多种任务编写方式,如shell、python、甚至是直接的golang语言。

任务必须具备可取消的能力,即超时关闭,若直接在线程中运行任务显然无法取消。一种简单的方式是,执行任务的线程,再起一个任务线程并记录pid,并不断观测任务线程结果,必要时直接杀死对应线程。

任务必须具备提取返回值的能力,这对于golang来说实现简单,因为管控平台原生语言即为golang。而对于python而言捕获返回值相当困难。一种可行的实现是python返回结果直接json化后写到某个临时文件,供golang代码后续读并解码(python任务日志也可以如此实现)。

对于shell执行器,为了记录中间的echo xxx输出,需要捕获相应的stdout并于日志文件中。需要关心shell脚本的退出码以判断任务的成功与否。

对于python执行器,python执行器应该能够只执行某个.py文件的某个类中的一个成员函数。一种实现是可以用shell执行器作为中间方,以shell执行器启动python_control.sh args,将任务入参放在args,在里面执行python control.py options=args,再以control.py去调用相应具体python任务。

为了持续的观测任务的执行过程,供前端展示任务的执行情况,任务创建时应该往管控数据库中插入任务信息和状态信息,任务执行完后需要修改对应任务的状态。

任务执行需要区别mgr本地执行与agent远端执行,对于agent远端执行,需要区分同步与异步两种方式。实际两种方法本质一样,均向agent调用执行任务接口,agent执行完后会调用mgr的上报接口。同步的话,mgr要hold住请求不返回即可,直到agent回复。

定时调度

定时调度在数据库管控中相当常见,比如机器宕机检测与恢复。我们可以很方便的在任务执行模块上实现定时调度。

只需要将定时调度信息存于管控数据库中。使用某种cron包进行调度即可。

流程引擎

有的运维操作相当冗长,如数据中心级别的故障转移。若以代码硬编码将无法维护,一般是以编写多个任务以类似流水线的方式串接起来,应该管控平台需要具备流程引擎的能力。

一般实现是,前端画流程图,并将对应流程图结构化插入到管控数据库中,后端mgr只需要取出管控数据库中已经结构化的流程定义即可。

使用方式:前端画流程图,生成流程定义到数据库中。后端从数据库中取流程定义,然后执行流程,将执行结果记录到数据库中。

前端实现


涉及的文件:

编辑流程:

  • edit_flow.html
  • flow_edit.js

流程运行:

  • instance.html
  • instance.js

后端实现

数据结构

process

当前端编辑完,保存后,会将流程的数据结构放于mongo表process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Process 流程
type Process struct {
// 基础信息
Name string `json:"name" bson:"name"` // 流程名
ChName string `json:"ch_name" bson:"ch_name"` // 流程中文名
// 属性信息
ParamList []ValueDefinition `json:"param_list" bson:"param_list"` // 全局入参
ResultList []ValueDefinition `json:"result_list" bson:"result_list"` // 全局结果
// 节点信息,记录了流程图一个节点的全部信息,如task,其入参信息等等
ActivityList []Activity `json:"activity_list" bson:"activity_list"`
SequenceList []Sequence `json:"sequence_list" bson:"sequence_list"` // activity连线列表
Author string `json:"author" bson:"author"` // 创建人
CreatedAt int64 `json:"created_at" bson:"created_at"` // 创建时间
UpdatedAt int64 `json:"updated_at" bson:"updated_at"` // 更新时间
Position map[string]string `json:"position" bson:"position"` // activity的位置信息,{node_1: "top:xx;left:xx"}
ConnectionDire map[string]string `json:"connection_dire" bson:"connection_dire"` // 连线的方向信息,{node_1-node_2: "rig-left"}
Labels []ValueDefinition `json:"labels" json:"labels"` // 流程拥有标签
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Activity 活动节点的定义
type Activity struct {
// 基础信息
Name string `json:"name" bson:"name"` // 名称,自动生成
ChName string `json:"ch_name" bson:"ch_name"` // 中文名称,选填,为空时按如下顺序赋值:任务/process
Type ActivityType `json:"type" bson:"type"` // 类型,对应ActivityType。比如为task或者start
// 属性信息
ParamList []ValueDefinition `json:"param_list" bson:"param_list"` // 节点入参
ResultList []ValueDefinition `json:"result_list" bson:"result_list"` // 输出结果
InputAnchor string `json:"input_anchor" bson:"input_anchor"` // 入参锚点
OutputAnchor string `json:"output_anchor" bson:"output_anchor"` // 出参锚点
Timeout int64 `json:"timeout" bson:"timeout"` // 超时时间,单位秒
LoopUntil string `json:"loop_until" bson:"loop_until"` // 循环等待
LoopInterval int64 `json:"loop_interval" bson:"loop_interval"` // 循环间隔,单位秒
SuccFlag string `json:"succ_flag" bson:"succ_flag"` // 任务执行成功与否的标识
// 内容
Content string `json:"content" bson:"content"` // 包含的任务或流程
Remote bool `json:"remote" bson:"remote"` // 针对作业平台任务,标识是否远程任务,默认false,如果为true,则入参默认增加target_hosts, 出参增加succ_hosts
}

1
2
3
4
5
type Sequence struct {
Source string `json:"source" bson:"source"` // 源activity name
Target string `json:"target" bson:"target"` // 目标activity name
Condition string `json:"condition" bson:"condition"` // 条件表达式
}
ProcessInstance

开始运行流程时,会根据全局传参和默认参数,合成一个最终流程全局入参,创建一个processInstance对象,并将该对象插入mongo中process_instance表内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ProcessInstance 流程实例
type ProcessInstance struct {
ID string `json:"id" bson:"_id"` // 流程实例ID
ParentID string `json:"parent_id" bson:"parent_id"` // 父流程实例ID
Name string `json:"name" bson:"name"` // 流程名
Params map[string]interface{} `json:"params" bson:"params"` // 流程的入参
Result map[string]interface{} `json:"result" bson:"result"` // 流程从执行结果
Status Status `json:"status" bson:"status"` // 状态
RunStack []string `json:"run_stack" bson:"run_stack"` // 执行栈
RunIndex int `json:"run_index" bson:"run_index"` // 指向执行栈的索引,从0开始,譬如停留在3,表明0、1、2已经顺利执行,3未执行或未彻底完成
Runner string `json:"runner" bson:"runner"` // 执行人
CreatedAt int64 `json:"created_at" bson:"created_at"` // 创建时间
UpdatedAt int64 `json:"updated_at" bson:"updated_at"` // 更新时间
Labels map[string]interface{} `json:"labels" bson:"labels"` // 流程实例标签
Audit bool `json:"audit" bson:"audit"` // 是否审计
StartEventStack []string `json:"start_event_stack" bson:"start_event_stack"` // 开始执行事件堆栈
StartEventRunIndex int `json:"start_event_run_index" bson:"start_event_run_index"` // 开始事件执行栈的索引
ExitEventStack []string `json:"exit_event_stack" bson:"exit_event_stack"` // 退出事件执行栈
ExitEventRunIndex int `json:"exit_event_run_index" bson:"exit_event_run_index"` // 退出执行栈索引
Event bool `json:"event" bson:"event"` // 是否触发事件的开关
}

执行流程

  1. 如果有onstart节点,先执行该节点。否则先找到start节点,开始执行
  2. 一直调用next,获取下一个节点,直到执行完毕
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Exec 执行流程实例
func (sm *StateMachine) exec() {
sm.log.Info("[stateMachine] 开始执行流程...")
var err error
// step 1 根据流程id,补全流程实例、流程定义、构建流程入参
if err = sm.initBaseInfo(); err != nil {
sm.log.Error("[stateMachine] 初始化流程基础信息失败. ", err)
return
}
defer sm.syncToDb()
// 每次执行都解析一次标签
sm.parseLabels()
...
needTriggerOnStart := false
if sm.processInstance.Status == data_meta.StatusInit || sm.processInstance.Status == data_meta.StatusFail || sm.processInstance.Status == data_meta.StatusWarn {
needTriggerOnStart = true
sm.processInstance.Event = true
}
sm.processInstance.Status = data_meta.StatusRunning
sm.syncToDb()
...
// 这里触发OnStart事件的执行
if needTriggerOnStart {
// 每次执行,onstart的所有任务都执行一遍
sm.processInstance.StartEventStack = make([]string, 0)
sm.processInstance.StartEventRunIndex = 0

//!!重要!!这里会全局入参插入sm.varSpace,一个参数栈,全局入参在栈底
if err = sm.buildHistoryVarSpaceOnStart(); err != nil {
sm.processInstance.Status = data_meta.StatusFail
return
}
if sm.triggerOnStartEvent() == false {
return
}
}
...
// step 3 循环执行
for true {
...
// 获取待执行的活动节点
sm.log.Info("[stateMachine] 获取待执行的节点...")
activity, activityInstance, err := sm.next()
if err != nil {
sm.log.Error("[stateMachine] 获取待执行节点失败. ", err)
sm.processInstance.Status = data_meta.StatusFail
return
}
// 增加end节点判断,主要针对失败重跑场景下,刚好runIndex指向end节点,此时获取下一个节点则为nil
if activity == nil || activityInstance == nil {
if sm.hasWarn {
sm.processInstance.Status = data_meta.StatusWarn
} else {
sm.processInstance.Status = data_meta.StatusFinish
}
sm.log.Info("[stateMachine] 达到END节点,流程执行结束.")
return
}
// 这里保存一次db,主要是run_stack会新增数据,需保存一次,避免重启丢失
sm.syncToDb()
// 开始执行
sm.log.Info("[stateMachine] 开始执行节点", activity.Name)
err = sm.activityService.Exec(sm.ctx, sm.processID, activity, activityInstance, sm.processInstance.Runner)
if err != nil {
sm.log.Error("[stateMachine] 节点执行失败 id: ", activityInstance.ID, " err:", err)
sm.processInstance.Status = data_meta.StatusFail
return
}
sm.log.Info("[stateMachine] 节点", activity.Name, "执行完成,状态:", activityInstance.Status)
// 节点执行成功,则根据状态区别对待
switch activityInstance.Status {
case data_meta.StatusWarn:
sm.hasWarn = true
sm.processInstance.RunIndex++
sm.appendVarSpace(activity.Name, activityInstance.Result)
case data_meta.StatusFinish:
sm.processInstance.RunIndex++
sm.appendVarSpace(activity.Name, activityInstance.Result)
if activity.Type == data_meta.ActivityTypeEnd {
sm.processInstance.Result = activityInstance.Result
if sm.hasWarn {
sm.processInstance.Status = data_meta.StatusWarn
} else {
sm.processInstance.Status = data_meta.StatusFinish
}
sm.log.Info("[stateMachine] 达到END节点,流程执行结束.")
return
}
case data_meta.StatusFail:
sm.processInstance.Status = data_meta.StatusFail
sm.log.Error("[stateMachine] 节点执行后状态为fail id: ", activityInstance.ID)
return
case data_meta.StatusPause:
sm.processInstance.Status = data_meta.StatusPause
sm.log.Info("[stateMachine] 流程暂停,等待手工触发执行")
return
default:
// 这里如果中途stop了,那么需要取消掉sleep
if sm.stop == true {
activityInstance.Status = data_meta.StatusFail
sm.activityService.syncToDb(activityInstance)
sm.processInstance.Status = data_meta.StatusPause
sm.log.Info("[stateMachine] 流程本应挂起,但有stop标识,流程退出执行")
} else {
sm.processInstance.Status = data_meta.StatusSleep
sm.log.Info("[stateMachine] 流程挂起,等待下次调度执行")
}
return
}
// 执行完成也保存一次,主要是run_index会有变化,避免进程重启
sm.syncToDb()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 获取下一个待执行的activity定义和实例ID。简化下逻辑,exec维护runningIndex,next只获取runningIndex指向的节点
func (sm *StateMachine) next() (*data_meta.Activity, *data_meta.ActivityInstance, error) {
// 这里设置锁,为了和停止流程冲突
sm.stopLocker.Lock()
defer sm.stopLocker.Unlock()
if len(sm.process.ActivityList) == 0 {
return nil, nil, errors.New("流程没有配置活动节点")
}
var activityPtr *data_meta.Activity
var activityInstancePtr *data_meta.ActivityInstance
var err error
// 流程实例没有被执行过,这是首次执行,返回start节点
if len(sm.processInstance.RunStack) == 0 {
sm.log.Info("[stateMachine] [next]首次执行,查找start节点")
for _, a := range sm.process.ActivityList {
if a.Type == data_meta.ActivityTypeStart {
sm.processInstance.RunIndex = 0
activityPtr = &a
break
}
}
if activityPtr == nil {
return nil, nil, errors.New("流程没有start节点")
}
sm.log.Info("[stateMachine] [next]找到start节点")
} else /*流程实例之前执行过,除了start节点,都会来到这*/ {
sm.log.Info("[stateMachine] [next]获取runIndex=", sm.processInstance.RunIndex, "待执行节点")
// RunIndex: 下一个要执行的节点索引,有可能还未初始化
// 针对失败重跑的场景,runIndex指向的索引已经实例化过,直接获取runIndex指向的节点即可
if sm.processInstance.RunIndex < len(sm.processInstance.RunStack) {
sm.log.Info("[stateMachine] [next]runIndex=", sm.processInstance.RunIndex, "节点实例已存在")
inst, err := sm.activityService.helperPtr.GetActivityInstance(sm.processInstance.RunStack[sm.processInstance.RunIndex])
if err != nil {
return nil, nil, err
}
activityInstancePtr = &inst
activityPtr = sm.getActivityByName(activityInstancePtr.Name)
} else /* runIndex的索引位置还没有初始化,需要寻找节点并初始化 */ {
// 需要获取新的节点
sm.log.Info("[stateMachine] [next]runIndex=", sm.processInstance.RunIndex, "为新节点,开始寻找")
currActivityInstance, err := sm.activityService.helperPtr.GetActivityInstance(sm.processInstance.RunStack[sm.processInstance.RunIndex-1])
if err != nil {
sm.log.Error("[stateMachine] [next]查询最后1个执行的节点实例失败. ", err)
return nil, nil, err
}
currActivity := sm.getActivityByName(currActivityInstance.Name)

nextActivityName, err := sm.getNextActivityFromSequence(currActivity)
if err != nil {
sm.log.Error("[stateMachine] [next]获取下一个待执行节点失败. ", err.Error())
return nil, nil, err
}
// 这里增加end节点判断
if nextActivityName == "" {
return nil, nil, nil
}
activityPtr = sm.getActivityByName(nextActivityName)
if activityPtr == nil {
sm.log.Error("[stateMachine] [next]获取待执行节点定义失败:", nextActivityName)
return nil, nil, errors.New("没有在流程定义中找到activity " + nextActivityName)
}
}
}
// 到这里,找到待执行节点没有初始化,初始化实例
if activityPtr == nil {
return nil, nil, errors.New("没有找到待执行的节点")
}
sm.log.Info("[stateMachine] [next]待执行节点为: ", activityPtr.Name)
if activityInstancePtr == nil {
// 没有初始化过,进行初始化
sm.log.Info("[stateMachine] [next]初始化待执行节点")
activityInstancePtr, err = sm.initActivity(activityPtr)
if err != nil {
return nil, nil, err
}
// 新初始化的需要加入runStack
sm.processInstance.RunStack = append(sm.processInstance.RunStack, activityInstancePtr.ID)
} else {
// 已经初始化过的,每次执行都重新渲染一遍入参
sm.log.Info("[stateMachine] [next]待执行节点重新渲染参数")
params, err := sm.parseParameter(activityPtr.ParamList)
if err != nil {
sm.log.Error("[stateMachine] [next]重新渲染参数失败.", err)
return nil, nil, err
}
activityInstancePtr.Params = params
}
return activityPtr, activityInstancePtr, nil
}

根据activity节点生成执行实例ActivityInstance,并插入数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 初始化活动实例
func (sm *StateMachine) initActivity(activity *data_meta.Activity) (*data_meta.ActivityInstance, error) {
// 先根据定义,解析参数
sm.log.Info("[stateMachine] 初始化实例:", activity.Name)
params, err := sm.parseParameter(activity.ParamList)
if err != nil {
return nil, err
}
// sm.log.Info("[stateMachine] 参数为: ", params)
inst, err := sm.activityService.InitActivityInstance(sm.processID, activity.Name, params)
return &inst, err
}

func (as *ActivityService) InitActivityInstance(processID string, activityName string, params map[string]interface{}) (data_meta.ActivityInstance, error) {
instance := data_meta.ActivityInstance{
ID: "",
ProcessID: processID,
Name: activityName,
Params: params,
Result: nil,
Status: data_meta.StatusInit,
ContentIDs: make([]string, 0),
CreatedAt: 0,
UpdatedAt: 0,
}
// 往数据库添加活动实例一行
id, err := as.helperPtr.AddActivityInstance(&instance)
if err != nil {
as.log.Error("[activity] 初始化实例失败. ", activityName, " ", err.Error())
return instance, err
}
instance.ID = id
as.log.Debug("[activity] 实例初始化完成,id: ", id)
return instance, nil
}

解析参数,解析获取节点的入参,从栈顶往栈底找(栈底即全局入参),如果为常量直接取,否则采用jsonpath语法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// 解析参数,使用jsonpath语法。
// 解析规则优先级: 常量和指定具体取值域的表达式(多个取值域时离当前节点最近的取值域优先) -> 未指定取值域和取值域为global的表达式
func (sm *StateMachine) parseParameter(paramsList []data_meta.ValueDefinition) (map[string]interface{}, error) {
sm.log.Debug("[stateMachine] 解析参数映射...")
paramsMap := make(map[string]interface{})
succFlags := make(map[string]int)
// 初始化标记
for _, v := range paramsList {
succFlags[v.Key] = 0
}
// 开始遍历每一个参数映射表达式,优先处理指定节点取值(非global)和常量值
for _, one := range paramsList {
// 如果已经解析过,不再解析
// if succFlags[one.Key] == 1 {
// sm.log.Info("[stateMachine]", one.Key, "@", one.Location, " ", one.Expression, " 该key已解析过,跳过...")
// continue
// }
// 开始解析
if one.Const { // 常量
sm.log.Info("[stateMachine]常量值: ", one.Key, "@", one.Location, " ", one.Expression, ", value: ", one.Value)
// 这里要判断一下深度
// 首次解析,直接登记
if succFlags[one.Key] == 0 {
paramsMap[one.Key] = one.Value
succFlags[one.Key] = 1 // 记录解析的索引值
sm.log.Info("[stateMachine]直接赋值")
} else {
// 之前已经解析过,如果本次的索引值比之前的大,说明离当前节点更近,这里应该不会执行到
if 1 > succFlags[one.Key] {
paramsMap[one.Key] = one.Value
succFlags[one.Key] = 1
sm.log.Info("[stateMachine] 比已解析的值更新,替换之")
} else {
sm.log.Info("[stateMachine] 比已解析的值旧,忽略")
}
}
continue
}
// 取值域为global的先跳过
if one.Location == "" || strings.ToLower(one.Location) == "global" {
continue
}
// 非常量,则需要应用jsonpath动态取值
// step1 倒序查找最相邻的取值域,index=0的是global
for i := len(sm.varSpace) - 1; i > 0; i-- {
if sm.varSpace[i].Owner == one.Location {
val, err := jsonpath.JsonPathLookup(sm.varSpace[i].DataMap, one.Expression)
if err != nil {
// return nil, errors.New(one.Key + "@" + one.Location + " " + one.Expression + "," + err.Error())
sm.log.Warn("[stateMachine]", one.Key, "@", one.Location, " ", one.Expression, " err: ", err.Error())
//return nil, err
} else {
sm.log.Info("[stateMachine]解析成功: ", one.Key, "@", one.Location, " ", one.Expression)
// 首次解析,直接登记
if succFlags[one.Key] == 0 {
paramsMap[one.Key] = val
succFlags[one.Key] = i // 记录解析的索引值
} else {
// 之前已经解析过,如果本次的索引值比之前的大,说明离当前节点更近
if i > succFlags[one.Key] {
paramsMap[one.Key] = val
succFlags[one.Key] = i
sm.log.Info("[stateMachine] 比已解析的值更新,替换之")
} else {
sm.log.Info("[stateMachine] 比已解析的值旧,忽略")
}
}
}
break
}
}
}
// 开始解析取值域为global,且没有同名指定节点的
for _, one := range paramsList {
// 如果已经解析过,不再解析
if succFlags[one.Key] > 0 {
sm.log.Info("[stateMachine]", one.Key, "@", one.Location, " ", one.Expression, " 该key已解析过,跳过...")
continue
}
// 取值域非global的跳过
if one.Location == "" || strings.ToLower(one.Location) == "global" {
val, err := jsonpath.JsonPathLookup(sm.varSpace[0].DataMap, one.Expression)
if err != nil {
// return nil, errors.New(one.Key + "@" + one.Location + " " + one.Expression + "," + err.Error())
sm.log.Error("[stateMachine]", one.Key, "@global ", one.Expression, " err: ", err.Error())
} else {
paramsMap[one.Key] = val
succFlags[one.Key] = 1
sm.log.Info("[stateMachine]解析成功: ", one.Key, "@global ", one.Expression)
}
}
}
// 校验参数解析的完整度
errKeys := make([]string, 0)
for k, v := range succFlags {
if v == 0 {
errKeys = append(errKeys, k)
}
}
if len(errKeys) == 0 {
return paramsMap, nil
}
return nil, errors.New("解析key失败: " + strings.Join(errKeys, ","))
}

真实执行任务的地方,会将执行结果放在activity_instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Exec 执行活动节点
func (as *ActivityService) Exec(ctx context.Context, processID string, activity *data_meta.Activity, activityInstance *data_meta.ActivityInstance, runner string) error {
...
// 透传进来的任务有三类:状态不为finish的普通任务、有部分成功特性、子流程
defer as.syncToDb(activityInstance)
// 状态标记为执行状态,因为涉及失败重跑、部分失败执行等等各种执行情况,这里不做区分,统一按照正常的执行逻辑来
activityInstance.Status = data_meta.StatusRunning
activityInstance.OpStatus = "" // 只要执行,那手工标记的状态就清空掉
as.syncToDb(activityInstance)

// 构建本次需要执行的参数,主要针对部分成功的任务,过滤掉已经执行成功的对象
as.log.Info("[activity] 构建执行参数...")
needRun, params, err := as.buildRunningParameter(activity.InputAnchor, activity.OutputAnchor, activityInstance.Params, activityInstance.Result)
if err != nil {
as.log.Error("[activity] 构建执行参数失败. err:", err)
activityInstance.Status = data_meta.StatusFail
return err
}
// 无需执行,说明活动节点是完成状态
if !needRun {
as.log.Info("[activity] 无需执行")
activityInstance.Status = data_meta.StatusFinish
return nil
}
as.log.Info("[activity] 执行参数: ", params)
// 这里需要执行活动节点了,区分各种节点类型,非err的状态由各个节点执行方法内部维护
switch activity.Type {
case data_meta.ActivityTypeStart:
err = as.execStartNode(ctx, activityInstance)
case data_meta.ActivityTypeClock:
//err = as.execClockNode(ctx, activityInstance)
err = as.execClockNodeV2(ctx, activityInstance)
case data_meta.ActivityTypeEnd:
err = as.execEndNode(ctx, activityInstance)
case data_meta.ActivityTypeTask:
err = as.execTaskNode(ctx, activity, activityInstance, params, runner)
case data_meta.ActivityTypeProcess:
// 这里需要使用全量的参数,由流程内部自行去过滤。使用增量参数会导致子流程内部某些非部分成功节点取到的参数有缺失
err = as.execSubProcess(ctx, activity, activityInstance, processID, activityInstance.Params, runner)
case data_meta.ActivityTypeUser:
err = as.execUserNode(ctx, activity, activityInstance, params, runner)
case data_meta.ActivityTypeOnStart:
err = as.execOnStartNode(ctx, activityInstance)
case data_meta.ActivityTypeOnExit:
err = as.execOnExitNode(ctx, activityInstance)
default:
err = errors.New("不支持节点类型")
}
...
return nil
}

执行完毕后会将结果压栈至变量空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
switch activityInstance.Status {
case data_meta.StatusFinish:
sm.processInstance.RunIndex++
sm.appendVarSpace(activity.Name, activityInstance.Result)
if activity.Type == data_meta.ActivityTypeEnd {
sm.processInstance.Result = activityInstance.Result
if sm.hasWarn {
sm.processInstance.Status = data_meta.StatusWarn
} else {
sm.processInstance.Status = data_meta.StatusFinish
}
sm.log.Info("[stateMachine] 达到END节点,流程执行结束.")
return
}
}

基础模块

选主与切换

manager作为管控中心,本身存在leader与follower,leader会遇到宕机的风险,有时也需要主动切主从使得leader部署到某个机房。因此必须具备选主的能力。

为了实现manager某种程度的无状态,一般将集群信息与主从信息记录到外部分布式数据库中。agent可直接查询数据库获得通信的主manager。

一般agent只需要与主mgr进行交流,若存在主mgr性能瓶颈,大多是因为数据库运维设置的定时调度运行过于频繁,经测试,一台manager管理数千台agent是没有压力的。

自身版本升级

在数据库管控中,升级操作一般就是下掉原来的进程,上一个新的进程。对于mgr同样可以如此做,而对于agent,甚至只需要重启即可,当然重启前并不是直接杀死再启动进程,需要设计一种信号机制,让agent捕获该信号,在处理完当前任务后,平滑退出。如使用go的context管理上下文协程。

在实际实现中,这里的设计较为复杂。

机器资源监控

agent组件实现机器资源的定期探测和上报是必不可少的。这对后续部署数据库节点的判断具有决定性的意义,也能够给前端直接展示相应的资源使用会存量,以供运维人员或自动化运维工具判断。

日志模块

需要设计与选型好日志组件,因为管控过程中有相当多不同的逻辑流,如执行任务,执行流程,定时调度,具体的数据库管控插件日志,为了方便后续定位位置,需要对不同的逻辑流写不同的滚动日志文件。

外部管理模块

文件管理

为了方便管理,我们可让具体的数据库管控脚本在mgr和agent同步,这种好处在于我们写了一个新的管控脚本,只需要下发给对应的mgr,便可以直接使用,无需在手动分发给各个agent。

这个模块实现文件夹同步,需要定期检测同步文件夹,具备压缩能力,具备md5计算能力防止重复发送。一般具体数据库管控版本不升级,这里只会调用发送md5检查的接口。

文件下发底层具体实现,直接用gin或者http即可

配置下发

数据库节点部署,配置文件的管理与下发是难点之一,一般是将当前集群的多版本配置文件存在管控数据库中,供前端展示和编写。前端负责管理相应的模版,和特异化的参数,负责渲染拼接,再存于数据库中,记录对应配置版本。mgr需要提供相应的接口将对应的配置文件下发到agent指定位置,供相应进程启动使用。

其与文件管理不同的点在于,配置是mgr从数据库中拉下来下发给agent的,不经过磁盘。

公共功能

分布式锁

在写数据库运维逻辑时,必定面对多运维操作并发的风险,如同时发起了对一个集群做扩容和缩容的流程,这种长时间运行的较重量级的流程不会允许并行。因此需要在Manager实现分布式锁,该锁需要考虑容量,因为有的操作是允许同时进行多个,如节点部署。

该锁的实现一种简单的方法是,将锁与其过期信息存于分布式数据库中(mongodb等,mysql主从强一致),加锁时检查数据库即可。