# Load modules at startup. If the server is not able to load modules # it will abort. It is possible to use multiple loadmodule directives. # # loadmodule /path/to/my_module.so # loadmodule /path/to/other_module.so
/* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ intRedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ if (RedisModule_Init(ctx,"helloworld",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR;
/* Log the list of parameters passing loading the module. */ for (int j = 0; j < argc; j++) { constchar *s = RedisModule_StringPtrLen(argv[j],NULL); printf("Module loaded with ARGV[%d] = %s\n", j, s); }
if (RedisModule_CreateCommand(ctx,"hello.simple", HelloSimple_RedisCommand,"readonly",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; ... }
/* Check if the command name is busy. */ if (lookupCommand(cmdname) != NULL) { sdsfree(cmdname); return REDISMODULE_ERR; }
/* Create a command "proxy", which is a structure that is referenced * in the command table, so that the generic command that works as * binding between modules and Redis, can know what function to call * and what the module is. * * Note that we use the Redis command table 'getkeys_proc' in order to * pass a reference to the command proxy structure. */ cp = zmalloc(sizeof(*cp)); cp->module = ctx->module; cp->func = cmdfunc; cp->rediscmd = zmalloc(sizeof(*rediscmd)); cp->rediscmd->name = cmdname; cp->rediscmd->proc = RedisModuleCommandDispatcher; cp->rediscmd->arity = -1; cp->rediscmd->flags = flags | CMD_MODULE; cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsignedlong)cp; cp->rediscmd->firstkey = firstkey; cp->rediscmd->lastkey = lastkey; cp->rediscmd->keystep = keystep; cp->rediscmd->microseconds = 0; cp->rediscmd->calls = 0; dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd); dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd); cp->rediscmd->id = ACLGetCommandID(cmdname); /* ID used for ACL. */ return REDISMODULE_OK; }
/* Set up the keyspace notification subscriber list and static client */ moduleKeyspaceSubscribers = listCreate(); moduleFreeContextReusedClient = createClient(NULL); moduleFreeContextReusedClient->flags |= CLIENT_MODULE; moduleFreeContextReusedClient->user = NULL; /* root user. */
/* Set up filter list */ moduleCommandFilters = listCreate();
moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, "Can't create the pipe for module blocking commands: %s", strerror(errno)); exit(1); } /* Make the pipe non blocking. This is just a best effort aware mechanism * and we do not want to block not in the read nor in the write half. */ anetNonBlock(NULL,server.module_blocked_pipe[0]); anetNonBlock(NULL,server.module_blocked_pipe[1]);
/* Create the timers radix tree. */ Timers = raxNew();
/* Setup the event listeners data structures. */ RedisModule_EventListeners = listCreate();
/* Our thread-safe contexts GIL must start with already locked: * it is just unlocked when it's safe. */ pthread_mutex_lock(&moduleGIL); }
/* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ voidmoduleRegisterCoreAPI(void){ server.moduleapi = dictCreate(&moduleAPIDictType,NULL); server.sharedapi = dictCreate(&moduleAPIDictType,NULL); REGISTER_API(Alloc); REGISTER_API(Calloc); REGISTER_API(Realloc); ... }
listRewind(server.loadmodule_queue,&li); while((ln = listNext(&li))) { structmoduleLoadQueueEntry *loadmod = ln->value; if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc) == C_ERR) { serverLog(LL_WARNING, "Can't load module from %s: server aborting", loadmod->path); exit(1); } } }
/* Load a module and initialize it. On success C_OK is returned, otherwise * C_ERR is returned. */ intmoduleLoad(constchar *path, void **module_argv, int module_argc){ int (*onload)(void *, void **, int); void *handle; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.client = moduleFreeContextReusedClient; selectDb(ctx.client, 0);
structstat st; if (stat(path, &st) == 0) { // this check is best effort if (!(st.st_mode & (S_IXUSR | S_IXGRP | S_IXOTH))) { serverLog(LL_WARNING, "Module %s failed to load: It does not have execute permissions.", path); return C_ERR; } }
handle = dlopen(path,RTLD_NOW|RTLD_LOCAL); if (handle == NULL) { serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror()); return C_ERR; } onload = (int (*)(void *, void **, int))(unsignedlong) dlsym(handle,"RedisModule_OnLoad"); if (onload == NULL) { dlclose(handle); serverLog(LL_WARNING, "Module %s does not export RedisModule_OnLoad() " "symbol. Module not loaded.",path); return C_ERR; } if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { if (ctx.module) { moduleUnregisterCommands(ctx.module); moduleUnregisterSharedAPI(ctx.module); moduleUnregisterUsedAPI(ctx.module); moduleFreeModuleStructure(ctx.module); } dlclose(handle); serverLog(LL_WARNING, "Module %s initialization failed. Module not loaded",path); return C_ERR; }
/* Redis module loaded! Register it. */ dictAdd(modules,ctx.module->name,ctx.module); ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); /* Fire the loaded modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, REDISMODULE_SUBEVENT_MODULE_LOADED, ctx.module);
voidautoMemoryCollect(RedisModuleCtx *ctx){ if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return; /* Clear the AUTO_MEMORY flag from the context, otherwise the functions * we call to free the resources, will try to scan the auto release * queue to mark the entries as freed. */ ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY; int j; for (j = 0; j < ctx->amqueue_used; j++) { void *ptr = ctx->amqueue[j].ptr; switch(ctx->amqueue[j].type) { case REDISMODULE_AM_STRING: decrRefCount(ptr); break; case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break; case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break; case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break; case REDISMODULE_AM_INFO: RM_FreeServerInfo(NULL,ptr); break; } } ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY; zfree(ctx->amqueue); ctx->amqueue = NULL; ctx->amqueue_len = 0; ctx->amqueue_used = 0; }
/* Free the context after the user function was called. */ voidmoduleFreeContext(RedisModuleCtx *ctx){ moduleHandlePropagationAfterCommandCallback(ctx); ... }
/* We don't need to do anything here if the context was never used * in order to propagate commands. */ if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
if (c->flags & CLIENT_LUA) return;
/* Handle the replication of the final EXEC, since whatever a command * emits is always wrapped around MULTI/EXEC. */ alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL);
/* 这里才将所有的命令一次性追加给slave */ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) && server.also_propagate.numops) { for (int j = 0; j < server.also_propagate.numops; j++) { redisOp *rop = &server.also_propagate.ops[j]; int target = rop->target; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } redisOpArrayFree(&server.also_propagate); /* Restore the previous oparray in case of nexted use of the API. */ server.also_propagate = ctx->saved_oparray; /* We're done with saved_oparray, let's invalidate it. */ redisOpArrayInit(&ctx->saved_oparray); } }