redis-module原理学习

前言

redis的module能力十分强大,是其作为开源软件建立开源生态的重要护城河,在此基础上出现了各种附加的数据结构,如过滤器、咆哮位图等。更强大的是,开发者做一个module非常简单,redis内核部份做了足够的抽象,也提供了众多对外暴露的api给开发者使用。

module基本用于两种用途

  1. 拼接redis命令,做一定的处理,实现类似lua的能力
  2. 实现新的数据结构

其大致有三种加载方法:

  1. 配置文件,如以下注释,给出.so的文件路径即可加载,可指定多个
    1
    2
    3
    4
    5
    # Load modules at startup. If the server is not able to load modules
    # it will abort. It is possible to use multiple loadmodule directives.
    #
    # loadmodule /path/to/my_module.so
    # loadmodule /path/to/other_module.so
  2. 命令行参数
  3. 执行命令热加载与卸载

redis官方也给了很丰富的实例,这里不给出详细例子,给出资料位置

最近公司内部自研缓存kcache要做一个module,内核老员工写了一个版本,非常复杂,还用了python做编译器,写了十几个文件,总觉得过于复杂,因此趁这次机会深入看看redis内核怎么实现的module。

代码走读

module使用

先看看给开发者提供的头文件,其具体使用的方式是怎么调用的redis内部api

用一个比较简单的例子,可以看到核心的数据结构:RedisModuleCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
int HelloPushNative_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 3) return RedisModule_WrongArity(ctx);

RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);

RedisModule_ListPush(key,REDISMODULE_LIST_TAIL,argv[2]);
size_t newlen = RedisModule_ValueLength(key);
RedisModule_CloseKey(key);
RedisModule_ReplyWithLongLong(ctx,newlen);
return REDISMODULE_OK;
}

api实际上是个函数指针,这个指针在后续代码走读中可以看到实际上是查找了内核中的字典获取的实际函数

1
2
// redismodule.h
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;

函数指针在此处初始化,这个函数内核会在加载module时调用,重点关注void *getapifuncptr = ((void**)ctx)[0];,ctx的首地址处记录了获取api函数的函数,所以在一系列的REDISMODULE_GET_API()后,所有函数指针便指向了内核中的具体函数,因为RedisModule_GetApi()在内核中的实现就是通过api名字查字典来获取对应函数。

在Init结尾,使用RedisModule_SetModuleAttribs将Module的相关信息保存了起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx,"helloworld",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;

/* Log the list of parameters passing loading the module. */
for (int j = 0; j < argc; j++) {
const char *s = RedisModule_StringPtrLen(argv[j],NULL);
printf("Module loaded with ARGV[%d] = %s\n", j, s);
}

if (RedisModule_CreateCommand(ctx,"hello.simple",
HelloSimple_RedisCommand,"readonly",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
...
}

static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
REDISMODULE_GET_API(Alloc);
...
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
return REDISMODULE_OK;
}

#define REDISMODULE_GET_API(name) \
RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))

void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
RedisModule *module;

if (ctx->module != NULL) return;
module = zmalloc(sizeof(*module));
module->name = sdsnew((char*)name);
module->ver = ver;
module->apiver = apiver;
module->types = listCreate();
module->usedby = listCreate();
module->using = listCreate();
module->filters = listCreate();
module->in_call = 0;
module->in_hook = 0;
module->options = 0;
module->info_cb = 0;
ctx->module = module;
}

注册命令的过程RedisModule_CreateCommand()对应于内核函数如下,即将对应的命令组成redisCommand结构放置到命令字典里。

注意命令处理函数为RedisModuleCommandDispatcher,其执行命令前创建一个全新的上下文变量ctx,然后记录上下文,执行绑定在上面的RedisModuleCmdFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
if (flags == -1) return REDISMODULE_ERR;
if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
return REDISMODULE_ERR;

struct redisCommand *rediscmd;
RedisModuleCommandProxy *cp;
sds cmdname = sdsnew(name);

/* Check if the command name is busy. */
if (lookupCommand(cmdname) != NULL) {
sdsfree(cmdname);
return REDISMODULE_ERR;
}

/* Create a command "proxy", which is a structure that is referenced
* in the command table, so that the generic command that works as
* binding between modules and Redis, can know what function to call
* and what the module is.
*
* Note that we use the Redis command table 'getkeys_proc' in order to
* pass a reference to the command proxy structure. */
cp = zmalloc(sizeof(*cp));
cp->module = ctx->module;
cp->func = cmdfunc;
cp->rediscmd = zmalloc(sizeof(*rediscmd));
cp->rediscmd->name = cmdname;
cp->rediscmd->proc = RedisModuleCommandDispatcher;
cp->rediscmd->arity = -1;
cp->rediscmd->flags = flags | CMD_MODULE;
cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
cp->rediscmd->firstkey = firstkey;
cp->rediscmd->lastkey = lastkey;
cp->rediscmd->keystep = keystep;
cp->rediscmd->microseconds = 0;
cp->rediscmd->calls = 0;
dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
cp->rediscmd->id = ACLGetCommandID(cmdname); /* ID used for ACL. */
return REDISMODULE_OK;
}

void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;

ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
ctx.module = cp->module;
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
...
}

初始化

出于简单,只阅读配置文件加载module的方式,按时间顺序走读。

初始化内核管理module模块必要的数据结构。

server.loadmodule_queue用双向列表存放所有要加载的module。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// server.c
// 重点关注moduleRegisterCoreAPI();
int main() {
...
moduleInitModulesSystem();
...
}

void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL);

/* Set up the keyspace notification subscriber list and static client */
moduleKeyspaceSubscribers = listCreate();
moduleFreeContextReusedClient = createClient(NULL);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleFreeContextReusedClient->user = NULL; /* root user. */

/* Set up filter list */
moduleCommandFilters = listCreate();

moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,server.module_blocked_pipe[0]);
anetNonBlock(NULL,server.module_blocked_pipe[1]);

/* Create the timers radix tree. */
Timers = raxNew();

/* Setup the event listeners data structures. */
RedisModule_EventListeners = listCreate();

/* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */
pthread_mutex_lock(&moduleGIL);
}

server.moduleapi用字典存放对module开发者暴露的api,如api名字RedisModule_Alloc对应的处理函数RM_Alloc

server.sharedapi用于module之间通信共享函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
server.sharedapi = dictCreate(&moduleAPIDictType,NULL);
REGISTER_API(Alloc);
REGISTER_API(Calloc);
REGISTER_API(Realloc);
...
}

int moduleRegisterApi(const char *funcname, void *funcptr) {
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}

#define REGISTER_API(name) \
moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)

void *RM_Alloc(size_t bytes) {
return zmalloc(bytes);
}

在解析配置文件时,开始解析module相关配置,注意这里argc不一定等于2,即可以给module传递参数,如loadmodule /path/to/my_module.so arg1 arg2。会将module相关信息(路径,参数等)保存到双向队列server.loadmodule_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// main调用
loadServerConfig(configfile,options);
// 调用
loadServerConfigFromString(config);
// 解析
else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) {
queueLoadModule(argv[1],&argv[2],argc-2);
}
//
void queueLoadModule(sds path, sds *argv, int argc) {
int i;
struct moduleLoadQueueEntry *loadmod;

loadmod = zmalloc(sizeof(struct moduleLoadQueueEntry));
loadmod->argv = zmalloc(sizeof(robj*)*argc);
loadmod->path = sdsnew(path);
loadmod->argc = argc;
for (i = 0; i < argc; i++) {
loadmod->argv[i] = createRawStringObject(argv[i],sdslen(argv[i]));
}
listAddNodeTail(server.loadmodule_queue,loadmod);
}

加载module

紧接者main函数开始加载module,可以看到是迭代列表逐个加载的,将之前解析得到的参数传给了实际加载函数。通过dlopendlsym(handle,"RedisModule_OnLoad"),传入了一个空的ctx,执行了module开发者编写的RedisModule_OnLoad初始化函数。

最终将module名字与动态库地址的映射放入了内核的这个字典里: static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/。以便后续卸载module。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
int main() {
...
moduleLoadFromQueue();
...
}

void moduleLoadFromQueue(void) {
listIter li;
listNode *ln;

listRewind(server.loadmodule_queue,&li);
while((ln = listNext(&li))) {
struct moduleLoadQueueEntry *loadmod = ln->value;
if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
== C_ERR)
{
serverLog(LL_WARNING,
"Can't load module from %s: server aborting",
loadmod->path);
exit(1);
}
}
}

/* Load a module and initialize it. On success C_OK is returned, otherwise
* C_ERR is returned. */
int moduleLoad(const char *path, void **module_argv, int module_argc) {
int (*onload)(void *, void **, int);
void *handle;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);

struct stat st;
if (stat(path, &st) == 0)
{ // this check is best effort
if (!(st.st_mode & (S_IXUSR | S_IXGRP | S_IXOTH))) {
serverLog(LL_WARNING, "Module %s failed to load: It does not have execute permissions.", path);
return C_ERR;
}
}

handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
if (handle == NULL) {
serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
return C_ERR;
}
onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
if (onload == NULL) {
dlclose(handle);
serverLog(LL_WARNING,
"Module %s does not export RedisModule_OnLoad() "
"symbol. Module not loaded.",path);
return C_ERR;
}
if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
if (ctx.module) {
moduleUnregisterCommands(ctx.module);
moduleUnregisterSharedAPI(ctx.module);
moduleUnregisterUsedAPI(ctx.module);
moduleFreeModuleStructure(ctx.module);
}
dlclose(handle);
serverLog(LL_WARNING,
"Module %s initialization failed. Module not loaded",path);
return C_ERR;
}

/* Redis module loaded! Register it. */
dictAdd(modules,ctx.module->name,ctx.module);
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
/* Fire the loaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_LOADED,
ctx.module);

moduleFreeContext(&ctx);
return C_OK;
}

注意这一行很重要,阐述了module开发者怎么获得的暴露的api,也就是查找moduleapi字典

1
2
3
4
5
6
7
8
9
10
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;

#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}

int RM_GetApi(const char *funcname, void **targetPtrPtr) {
dictEntry *he = dictFind(server.moduleapi, funcname);
if (!he) return REDISMODULE_ERR;
*targetPtrPtr = dictGetVal(he);
return REDISMODULE_OK;
}

总结

redis的module模块实现水到渠成、理所应当,非常简洁。在内核层面十分简洁,对开发者而言也非常友好,主要是利用宏定义的名字拼接作为动态库和内核的桥梁。