1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
static rstatus_t redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq, uint32_t key_step) { struct mbuf *mbuf; struct msg **sub_msgs; uint32_t i; rstatus_t status; struct array *keys = r->keys;
ASSERT(array_n(keys) == (r->narg - 1) / key_step);
sub_msgs = nc_zalloc(nserver * sizeof(*sub_msgs)); ... ASSERT(r->frag_seq == NULL); r->frag_seq = nc_alloc(array_n(keys) * sizeof(*r->frag_seq)); ...
mbuf = STAILQ_FIRST(&r->mhdr); mbuf->pos = mbuf->start;
for (i = 0; i < 3; i++) { for (; *(mbuf->pos) != '\n';) { mbuf->pos++; } mbuf->pos++; }
r->frag_id = msg_gen_frag_id(); r->nfrag = 0; r->frag_owner = r;
for (i = 0; i < array_n(keys); i++) { struct msg *sub_msg; struct keypos *kpos = array_get(keys, i); uint32_t idx = msg_backend_idx(r, kpos->start, kpos->end - kpos->start); ASSERT(idx < nserver);
if (sub_msgs[idx] == NULL) { sub_msgs[idx] = msg_get(r->owner, r->request, r->redis); if (sub_msgs[idx] == NULL) { nc_free(sub_msgs); return NC_ENOMEM; } } r->frag_seq[i] = sub_msg = sub_msgs[idx];
sub_msg->narg++; status = redis_append_key(sub_msg, kpos->start, kpos->end - kpos->start); if (status != NC_OK) { nc_free(sub_msgs); return status; }
if (key_step == 1) { continue; } else { status = redis_copy_bulk(NULL, r); if (status != NC_OK) { nc_free(sub_msgs); return status; }
status = redis_copy_bulk(sub_msg, r); if (status != NC_OK) { nc_free(sub_msgs); return status; }
sub_msg->narg++; } }
for (i = 0; i < nserver; i++) { struct msg *sub_msg = sub_msgs[i]; if (sub_msg == NULL) { continue; }
if (r->type == MSG_REQ_REDIS_MGET) { status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmget\r\n", sub_msg->narg + 1); } else if (r->type == MSG_REQ_REDIS_DEL) { status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n", sub_msg->narg + 1); } else if (r->type == MSG_REQ_REDIS_MSET) { status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n", sub_msg->narg + 1); } else if (r->type == MSG_REQ_REDIS_TOUCH) { status = msg_prepend_format(sub_msg, "*%d\r\n$5\r\ntouch\r\n", sub_msg->narg + 1); } else if (r->type == MSG_REQ_REDIS_UNLINK) { status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nunlink\r\n", sub_msg->narg + 1); } else { NOT_REACHED(); } if (status != NC_OK) { nc_free(sub_msgs); return status; }
sub_msg->type = r->type; sub_msg->frag_id = r->frag_id; sub_msg->frag_owner = r->frag_owner; TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe); r->nfrag++; }
nc_free(sub_msgs); return NC_OK; }
|