引言
各大公司都有庞大的数据库资源,涉及机器轻则上千,基本都是以万计。几乎每天都有机器宕机,或者发生网络延迟,或者发生机房故障。稍有不慎便容易出现管控缺失,数据库不稳定的问题。而数据库稳定性一般都是最重要的一环。
一般数据库管控维度为分数据库种类,再细分对应的业务集群。如何设计一个数据库管控系统,其具备一些公共能力,能够对各种数据库集群进行相应的具体的管控逻辑的开发,具有一定的挑战。
首先,当前业界通用的做法是如下图所示,通过在数据库机器中内嵌相应的agent进程,该agent负责管理当前机器上的数据库进程,上报与采集内核信息,以实现相应的管控逻辑。

当然一个好的平台不仅需要强大的后端组件支持,还需要前端项目用于展示和运维人员操作的,在本文中着重讨论后端部分。
运维模块
插件系统
一些数据库,对应运维操作相当复杂,即运维管控代码庞大。若以脚本语言如python完全去写起管控逻辑,到后期几乎无法维护,因此管控平台需要能够提供原生语言的编写能力,一般管控平台以golang编写,则应当允许数据库运维开发方能够直接使用golang编写运维逻辑。
一种实现方法是,提供某种代码生成器,将业务golang的接口代码直接生成与管控平台黏合的代码,并编译成.so
文件,供动态链接,这种方法的好处是,一个mgr可以链接多个插件系统,即服务多种数据库。
任务执行
负责任务的创建、管理、下发、agent回复处理等工作
该模块是数据库管控平台最重要的模块。每一个管控逻辑,都是由一个或多个任务组成。而强大的管控平台可以提供多种任务编写方式,如shell、python、甚至是直接的golang语言。
任务必须具备可取消的能力,即超时关闭,若直接在线程中运行任务显然无法取消。一种简单的方式是,执行任务的线程,再起一个任务线程并记录pid,并不断观测任务线程结果,必要时直接杀死对应线程。
任务必须具备提取返回值的能力,这对于golang来说实现简单,因为管控平台原生语言即为golang。而对于python而言捕获返回值相当困难。一种可行的实现是python返回结果直接json化后写到某个临时文件,供golang代码后续读并解码(python任务日志也可以如此实现)。
对于shell执行器,为了记录中间的echo xxx
输出,需要捕获相应的stdout并于日志文件中。需要关心shell脚本的退出码以判断任务的成功与否。
对于python执行器,python执行器应该能够只执行某个.py
文件的某个类中的一个成员函数。一种实现是可以用shell执行器作为中间方,以shell执行器启动python_control.sh args
,将任务入参放在args
,在里面执行python control.py options=args
,再以control.py
去调用相应具体python任务。
为了持续的观测任务的执行过程,供前端展示任务的执行情况,任务创建时应该往管控数据库中插入任务信息和状态信息,任务执行完后需要修改对应任务的状态。
任务执行需要区别mgr本地执行与agent远端执行,对于agent远端执行,需要区分同步与异步两种方式。实际两种方法本质一样,均向agent调用执行任务接口,agent执行完后会调用mgr的上报接口。同步的话,mgr要hold住请求不返回即可,直到agent回复。
定时调度
定时调度在数据库管控中相当常见,比如机器宕机检测与恢复。我们可以很方便的在任务执行模块上实现定时调度。
只需要将定时调度信息存于管控数据库中。使用某种cron
包进行调度即可。
流程引擎
有的运维操作相当冗长,如数据中心级别的故障转移。若以代码硬编码将无法维护,一般是以编写多个任务以类似流水线的方式串接起来,应该管控平台需要具备流程引擎的能力。
一般实现是,前端画流程图,并将对应流程图结构化插入到管控数据库中,后端mgr只需要取出管控数据库中已经结构化的流程定义即可。
使用方式:前端画流程图,生成流程定义到数据库中。后端从数据库中取流程定义,然后执行流程,将执行结果记录到数据库中。
前端实现

涉及的文件:
编辑流程:
- edit_flow.html
- flow_edit.js
流程运行:
- instance.html
- instance.js
后端实现
数据结构
process
当前端编辑完,保存后,会将流程的数据结构放于mongo表process
中

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type Process struct { Name string `json:"name" bson:"name"` ChName string `json:"ch_name" bson:"ch_name"` ParamList []ValueDefinition `json:"param_list" bson:"param_list"` ResultList []ValueDefinition `json:"result_list" bson:"result_list"` ActivityList []Activity `json:"activity_list" bson:"activity_list"` SequenceList []Sequence `json:"sequence_list" bson:"sequence_list"` Author string `json:"author" bson:"author"` CreatedAt int64 `json:"created_at" bson:"created_at"` UpdatedAt int64 `json:"updated_at" bson:"updated_at"` Position map[string]string `json:"position" bson:"position"` ConnectionDire map[string]string `json:"connection_dire" bson:"connection_dire"` Labels []ValueDefinition `json:"labels" json:"labels"` }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type Activity struct { Name string `json:"name" bson:"name"` ChName string `json:"ch_name" bson:"ch_name"` Type ActivityType `json:"type" bson:"type"` ParamList []ValueDefinition `json:"param_list" bson:"param_list"` ResultList []ValueDefinition `json:"result_list" bson:"result_list"` InputAnchor string `json:"input_anchor" bson:"input_anchor"` OutputAnchor string `json:"output_anchor" bson:"output_anchor"` Timeout int64 `json:"timeout" bson:"timeout"` LoopUntil string `json:"loop_until" bson:"loop_until"` LoopInterval int64 `json:"loop_interval" bson:"loop_interval"` SuccFlag string `json:"succ_flag" bson:"succ_flag"` Content string `json:"content" bson:"content"` Remote bool `json:"remote" bson:"remote"` }
|

1 2 3 4 5
| type Sequence struct { Source string `json:"source" bson:"source"` Target string `json:"target" bson:"target"` Condition string `json:"condition" bson:"condition"` }
|
ProcessInstance
开始运行流程时,会根据全局传参和默认参数,合成一个最终流程全局入参,创建一个processInstance
对象,并将该对象插入mongo中process_instance
表内

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type ProcessInstance struct { ID string `json:"id" bson:"_id"` ParentID string `json:"parent_id" bson:"parent_id"` Name string `json:"name" bson:"name"` Params map[string]interface{} `json:"params" bson:"params"` Result map[string]interface{} `json:"result" bson:"result"` Status Status `json:"status" bson:"status"` RunStack []string `json:"run_stack" bson:"run_stack"` RunIndex int `json:"run_index" bson:"run_index"` Runner string `json:"runner" bson:"runner"` CreatedAt int64 `json:"created_at" bson:"created_at"` UpdatedAt int64 `json:"updated_at" bson:"updated_at"` Labels map[string]interface{} `json:"labels" bson:"labels"` Audit bool `json:"audit" bson:"audit"` StartEventStack []string `json:"start_event_stack" bson:"start_event_stack"` StartEventRunIndex int `json:"start_event_run_index" bson:"start_event_run_index"` ExitEventStack []string `json:"exit_event_stack" bson:"exit_event_stack"` ExitEventRunIndex int `json:"exit_event_run_index" bson:"exit_event_run_index"` Event bool `json:"event" bson:"event"` }
|
执行流程
- 如果有onstart节点,先执行该节点。否则先找到start节点,开始执行
- 一直调用next,获取下一个节点,直到执行完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| func (sm *StateMachine) exec() { sm.log.Info("[stateMachine] 开始执行流程...") var err error if err = sm.initBaseInfo(); err != nil { sm.log.Error("[stateMachine] 初始化流程基础信息失败. ", err) return } defer sm.syncToDb() sm.parseLabels() ... needTriggerOnStart := false if sm.processInstance.Status == data_meta.StatusInit || sm.processInstance.Status == data_meta.StatusFail || sm.processInstance.Status == data_meta.StatusWarn { needTriggerOnStart = true sm.processInstance.Event = true } sm.processInstance.Status = data_meta.StatusRunning sm.syncToDb() ... if needTriggerOnStart { sm.processInstance.StartEventStack = make([]string, 0) sm.processInstance.StartEventRunIndex = 0
if err = sm.buildHistoryVarSpaceOnStart(); err != nil { sm.processInstance.Status = data_meta.StatusFail return } if sm.triggerOnStartEvent() == false { return } } ... for true { ... sm.log.Info("[stateMachine] 获取待执行的节点...") activity, activityInstance, err := sm.next() if err != nil { sm.log.Error("[stateMachine] 获取待执行节点失败. ", err) sm.processInstance.Status = data_meta.StatusFail return } if activity == nil || activityInstance == nil { if sm.hasWarn { sm.processInstance.Status = data_meta.StatusWarn } else { sm.processInstance.Status = data_meta.StatusFinish } sm.log.Info("[stateMachine] 达到END节点,流程执行结束.") return } sm.syncToDb() sm.log.Info("[stateMachine] 开始执行节点", activity.Name) err = sm.activityService.Exec(sm.ctx, sm.processID, activity, activityInstance, sm.processInstance.Runner) if err != nil { sm.log.Error("[stateMachine] 节点执行失败 id: ", activityInstance.ID, " err:", err) sm.processInstance.Status = data_meta.StatusFail return } sm.log.Info("[stateMachine] 节点", activity.Name, "执行完成,状态:", activityInstance.Status) switch activityInstance.Status { case data_meta.StatusWarn: sm.hasWarn = true sm.processInstance.RunIndex++ sm.appendVarSpace(activity.Name, activityInstance.Result) case data_meta.StatusFinish: sm.processInstance.RunIndex++ sm.appendVarSpace(activity.Name, activityInstance.Result) if activity.Type == data_meta.ActivityTypeEnd { sm.processInstance.Result = activityInstance.Result if sm.hasWarn { sm.processInstance.Status = data_meta.StatusWarn } else { sm.processInstance.Status = data_meta.StatusFinish } sm.log.Info("[stateMachine] 达到END节点,流程执行结束.") return } case data_meta.StatusFail: sm.processInstance.Status = data_meta.StatusFail sm.log.Error("[stateMachine] 节点执行后状态为fail id: ", activityInstance.ID) return case data_meta.StatusPause: sm.processInstance.Status = data_meta.StatusPause sm.log.Info("[stateMachine] 流程暂停,等待手工触发执行") return default: if sm.stop == true { activityInstance.Status = data_meta.StatusFail sm.activityService.syncToDb(activityInstance) sm.processInstance.Status = data_meta.StatusPause sm.log.Info("[stateMachine] 流程本应挂起,但有stop标识,流程退出执行") } else { sm.processInstance.Status = data_meta.StatusSleep sm.log.Info("[stateMachine] 流程挂起,等待下次调度执行") } return } sm.syncToDb() } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| func (sm *StateMachine) next() (*data_meta.Activity, *data_meta.ActivityInstance, error) { sm.stopLocker.Lock() defer sm.stopLocker.Unlock() if len(sm.process.ActivityList) == 0 { return nil, nil, errors.New("流程没有配置活动节点") } var activityPtr *data_meta.Activity var activityInstancePtr *data_meta.ActivityInstance var err error if len(sm.processInstance.RunStack) == 0 { sm.log.Info("[stateMachine] [next]首次执行,查找start节点") for _, a := range sm.process.ActivityList { if a.Type == data_meta.ActivityTypeStart { sm.processInstance.RunIndex = 0 activityPtr = &a break } } if activityPtr == nil { return nil, nil, errors.New("流程没有start节点") } sm.log.Info("[stateMachine] [next]找到start节点") } else { sm.log.Info("[stateMachine] [next]获取runIndex=", sm.processInstance.RunIndex, "待执行节点") if sm.processInstance.RunIndex < len(sm.processInstance.RunStack) { sm.log.Info("[stateMachine] [next]runIndex=", sm.processInstance.RunIndex, "节点实例已存在") inst, err := sm.activityService.helperPtr.GetActivityInstance(sm.processInstance.RunStack[sm.processInstance.RunIndex]) if err != nil { return nil, nil, err } activityInstancePtr = &inst activityPtr = sm.getActivityByName(activityInstancePtr.Name) } else { sm.log.Info("[stateMachine] [next]runIndex=", sm.processInstance.RunIndex, "为新节点,开始寻找") currActivityInstance, err := sm.activityService.helperPtr.GetActivityInstance(sm.processInstance.RunStack[sm.processInstance.RunIndex-1]) if err != nil { sm.log.Error("[stateMachine] [next]查询最后1个执行的节点实例失败. ", err) return nil, nil, err } currActivity := sm.getActivityByName(currActivityInstance.Name)
nextActivityName, err := sm.getNextActivityFromSequence(currActivity) if err != nil { sm.log.Error("[stateMachine] [next]获取下一个待执行节点失败. ", err.Error()) return nil, nil, err } if nextActivityName == "" { return nil, nil, nil } activityPtr = sm.getActivityByName(nextActivityName) if activityPtr == nil { sm.log.Error("[stateMachine] [next]获取待执行节点定义失败:", nextActivityName) return nil, nil, errors.New("没有在流程定义中找到activity " + nextActivityName) } } } if activityPtr == nil { return nil, nil, errors.New("没有找到待执行的节点") } sm.log.Info("[stateMachine] [next]待执行节点为: ", activityPtr.Name) if activityInstancePtr == nil { sm.log.Info("[stateMachine] [next]初始化待执行节点") activityInstancePtr, err = sm.initActivity(activityPtr) if err != nil { return nil, nil, err } sm.processInstance.RunStack = append(sm.processInstance.RunStack, activityInstancePtr.ID) } else { sm.log.Info("[stateMachine] [next]待执行节点重新渲染参数") params, err := sm.parseParameter(activityPtr.ParamList) if err != nil { sm.log.Error("[stateMachine] [next]重新渲染参数失败.", err) return nil, nil, err } activityInstancePtr.Params = params } return activityPtr, activityInstancePtr, nil }
|
根据activity节点生成执行实例ActivityInstance
,并插入数据库

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (sm *StateMachine) initActivity(activity *data_meta.Activity) (*data_meta.ActivityInstance, error) { sm.log.Info("[stateMachine] 初始化实例:", activity.Name) params, err := sm.parseParameter(activity.ParamList) if err != nil { return nil, err } inst, err := sm.activityService.InitActivityInstance(sm.processID, activity.Name, params) return &inst, err }
func (as *ActivityService) InitActivityInstance(processID string, activityName string, params map[string]interface{}) (data_meta.ActivityInstance, error) { instance := data_meta.ActivityInstance{ ID: "", ProcessID: processID, Name: activityName, Params: params, Result: nil, Status: data_meta.StatusInit, ContentIDs: make([]string, 0), CreatedAt: 0, UpdatedAt: 0, } id, err := as.helperPtr.AddActivityInstance(&instance) if err != nil { as.log.Error("[activity] 初始化实例失败. ", activityName, " ", err.Error()) return instance, err } instance.ID = id as.log.Debug("[activity] 实例初始化完成,id: ", id) return instance, nil }
|
解析参数,解析获取节点的入参,从栈顶往栈底找(栈底即全局入参),如果为常量直接取,否则采用jsonpath语法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
func (sm *StateMachine) parseParameter(paramsList []data_meta.ValueDefinition) (map[string]interface{}, error) { sm.log.Debug("[stateMachine] 解析参数映射...") paramsMap := make(map[string]interface{}) succFlags := make(map[string]int) for _, v := range paramsList { succFlags[v.Key] = 0 } for _, one := range paramsList { if one.Const { sm.log.Info("[stateMachine]常量值: ", one.Key, "@", one.Location, " ", one.Expression, ", value: ", one.Value) if succFlags[one.Key] == 0 { paramsMap[one.Key] = one.Value succFlags[one.Key] = 1 sm.log.Info("[stateMachine]直接赋值") } else { if 1 > succFlags[one.Key] { paramsMap[one.Key] = one.Value succFlags[one.Key] = 1 sm.log.Info("[stateMachine] 比已解析的值更新,替换之") } else { sm.log.Info("[stateMachine] 比已解析的值旧,忽略") } } continue } if one.Location == "" || strings.ToLower(one.Location) == "global" { continue } for i := len(sm.varSpace) - 1; i > 0; i-- { if sm.varSpace[i].Owner == one.Location { val, err := jsonpath.JsonPathLookup(sm.varSpace[i].DataMap, one.Expression) if err != nil { sm.log.Warn("[stateMachine]", one.Key, "@", one.Location, " ", one.Expression, " err: ", err.Error()) } else { sm.log.Info("[stateMachine]解析成功: ", one.Key, "@", one.Location, " ", one.Expression) if succFlags[one.Key] == 0 { paramsMap[one.Key] = val succFlags[one.Key] = i } else { if i > succFlags[one.Key] { paramsMap[one.Key] = val succFlags[one.Key] = i sm.log.Info("[stateMachine] 比已解析的值更新,替换之") } else { sm.log.Info("[stateMachine] 比已解析的值旧,忽略") } } } break } } } for _, one := range paramsList { if succFlags[one.Key] > 0 { sm.log.Info("[stateMachine]", one.Key, "@", one.Location, " ", one.Expression, " 该key已解析过,跳过...") continue } if one.Location == "" || strings.ToLower(one.Location) == "global" { val, err := jsonpath.JsonPathLookup(sm.varSpace[0].DataMap, one.Expression) if err != nil { sm.log.Error("[stateMachine]", one.Key, "@global ", one.Expression, " err: ", err.Error()) } else { paramsMap[one.Key] = val succFlags[one.Key] = 1 sm.log.Info("[stateMachine]解析成功: ", one.Key, "@global ", one.Expression) } } } errKeys := make([]string, 0) for k, v := range succFlags { if v == 0 { errKeys = append(errKeys, k) } } if len(errKeys) == 0 { return paramsMap, nil } return nil, errors.New("解析key失败: " + strings.Join(errKeys, ",")) }
|
真实执行任务的地方,会将执行结果放在activity_instance
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| func (as *ActivityService) Exec(ctx context.Context, processID string, activity *data_meta.Activity, activityInstance *data_meta.ActivityInstance, runner string) error { ... defer as.syncToDb(activityInstance) activityInstance.Status = data_meta.StatusRunning activityInstance.OpStatus = "" as.syncToDb(activityInstance)
as.log.Info("[activity] 构建执行参数...") needRun, params, err := as.buildRunningParameter(activity.InputAnchor, activity.OutputAnchor, activityInstance.Params, activityInstance.Result) if err != nil { as.log.Error("[activity] 构建执行参数失败. err:", err) activityInstance.Status = data_meta.StatusFail return err } if !needRun { as.log.Info("[activity] 无需执行") activityInstance.Status = data_meta.StatusFinish return nil } as.log.Info("[activity] 执行参数: ", params) switch activity.Type { case data_meta.ActivityTypeStart: err = as.execStartNode(ctx, activityInstance) case data_meta.ActivityTypeClock: err = as.execClockNodeV2(ctx, activityInstance) case data_meta.ActivityTypeEnd: err = as.execEndNode(ctx, activityInstance) case data_meta.ActivityTypeTask: err = as.execTaskNode(ctx, activity, activityInstance, params, runner) case data_meta.ActivityTypeProcess: err = as.execSubProcess(ctx, activity, activityInstance, processID, activityInstance.Params, runner) case data_meta.ActivityTypeUser: err = as.execUserNode(ctx, activity, activityInstance, params, runner) case data_meta.ActivityTypeOnStart: err = as.execOnStartNode(ctx, activityInstance) case data_meta.ActivityTypeOnExit: err = as.execOnExitNode(ctx, activityInstance) default: err = errors.New("不支持节点类型") } ... return nil }
|
执行完毕后会将结果压栈至变量空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| switch activityInstance.Status { case data_meta.StatusFinish: sm.processInstance.RunIndex++ sm.appendVarSpace(activity.Name, activityInstance.Result) if activity.Type == data_meta.ActivityTypeEnd { sm.processInstance.Result = activityInstance.Result if sm.hasWarn { sm.processInstance.Status = data_meta.StatusWarn } else { sm.processInstance.Status = data_meta.StatusFinish } sm.log.Info("[stateMachine] 达到END节点,流程执行结束.") return } }
|
基础模块
选主与切换
manager作为管控中心,本身存在leader与follower,leader会遇到宕机的风险,有时也需要主动切主从使得leader部署到某个机房。因此必须具备选主的能力。
为了实现manager某种程度的无状态,一般将集群信息与主从信息记录到外部分布式数据库中。agent可直接查询数据库获得通信的主manager。
一般agent只需要与主mgr进行交流,若存在主mgr性能瓶颈,大多是因为数据库运维设置的定时调度运行过于频繁,经测试,一台manager管理数千台agent是没有压力的。
自身版本升级
在数据库管控中,升级操作一般就是下掉原来的进程,上一个新的进程。对于mgr同样可以如此做,而对于agent,甚至只需要重启即可,当然重启前并不是直接杀死再启动进程,需要设计一种信号机制,让agent捕获该信号,在处理完当前任务后,平滑退出。如使用go的context管理上下文协程。
在实际实现中,这里的设计较为复杂。
机器资源监控
agent组件实现机器资源的定期探测和上报是必不可少的。这对后续部署数据库节点的判断具有决定性的意义,也能够给前端直接展示相应的资源使用会存量,以供运维人员或自动化运维工具判断。
日志模块
需要设计与选型好日志组件,因为管控过程中有相当多不同的逻辑流,如执行任务,执行流程,定时调度,具体的数据库管控插件日志,为了方便后续定位位置,需要对不同的逻辑流写不同的滚动日志文件。
外部管理模块
文件管理
为了方便管理,我们可让具体的数据库管控脚本在mgr和agent同步,这种好处在于我们写了一个新的管控脚本,只需要下发给对应的mgr,便可以直接使用,无需在手动分发给各个agent。
这个模块实现文件夹同步,需要定期检测同步文件夹,具备压缩能力,具备md5计算能力防止重复发送。一般具体数据库管控版本不升级,这里只会调用发送md5检查的接口。
文件下发底层具体实现,直接用gin或者http即可
配置下发
数据库节点部署,配置文件的管理与下发是难点之一,一般是将当前集群的多版本配置文件存在管控数据库中,供前端展示和编写。前端负责管理相应的模版,和特异化的参数,负责渲染拼接,再存于数据库中,记录对应配置版本。mgr需要提供相应的接口将对应的配置文件下发到agent指定位置,供相应进程启动使用。
其与文件管理不同的点在于,配置是mgr从数据库中拉下来下发给agent的,不经过磁盘。
公共功能
分布式锁
在写数据库运维逻辑时,必定面对多运维操作并发的风险,如同时发起了对一个集群做扩容和缩容的流程,这种长时间运行的较重量级的流程不会允许并行。因此需要在Manager实现分布式锁,该锁需要考虑容量,因为有的操作是允许同时进行多个,如节点部署。
该锁的实现一种简单的方法是,将锁与其过期信息存于分布式数据库中(mongodb等,mysql主从强一致),加锁时检查数据库即可。