| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 
 | 
 static rstatus_t
 redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq,
 uint32_t key_step)
 {
 struct mbuf *mbuf;
 struct msg **sub_msgs;
 uint32_t i;
 rstatus_t status;
 struct array *keys = r->keys;
 
 ASSERT(array_n(keys) == (r->narg - 1) / key_step);
 
 
 sub_msgs = nc_zalloc(nserver * sizeof(*sub_msgs));
 ...
 ASSERT(r->frag_seq == NULL);
 
 r->frag_seq = nc_alloc(array_n(keys) * sizeof(*r->frag_seq));
 ...
 
 mbuf = STAILQ_FIRST(&r->mhdr);
 mbuf->pos = mbuf->start;
 
 
 for (i = 0; i < 3; i++) {
 for (; *(mbuf->pos) != '\n';) {
 mbuf->pos++;
 }
 mbuf->pos++;
 }
 
 r->frag_id = msg_gen_frag_id();
 r->nfrag = 0;
 r->frag_owner = r;
 
 
 for (i = 0; i < array_n(keys); i++) {
 struct msg *sub_msg;
 struct keypos *kpos = array_get(keys, i);
 uint32_t idx = msg_backend_idx(r, kpos->start, kpos->end - kpos->start);
 ASSERT(idx < nserver);
 
 if (sub_msgs[idx] == NULL) {
 sub_msgs[idx] = msg_get(r->owner, r->request, r->redis);
 if (sub_msgs[idx] == NULL) {
 nc_free(sub_msgs);
 return NC_ENOMEM;
 }
 }
 r->frag_seq[i] = sub_msg = sub_msgs[idx];
 
 sub_msg->narg++;
 status = redis_append_key(sub_msg, kpos->start, kpos->end - kpos->start);
 if (status != NC_OK) {
 nc_free(sub_msgs);
 return status;
 }
 
 if (key_step == 1) {
 continue;
 } else {
 status = redis_copy_bulk(NULL, r);
 if (status != NC_OK) {
 nc_free(sub_msgs);
 return status;
 }
 
 status = redis_copy_bulk(sub_msg, r);
 if (status != NC_OK) {
 nc_free(sub_msgs);
 return status;
 }
 
 sub_msg->narg++;
 }
 }
 
 
 
 
 
 for (i = 0; i < nserver; i++) {
 struct msg *sub_msg = sub_msgs[i];
 
 if (sub_msg == NULL) {
 continue;
 }
 
 if (r->type == MSG_REQ_REDIS_MGET) {
 status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmget\r\n",
 sub_msg->narg + 1);
 } else if (r->type == MSG_REQ_REDIS_DEL) {
 status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n",
 sub_msg->narg + 1);
 } else if (r->type == MSG_REQ_REDIS_MSET) {
 status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n",
 sub_msg->narg + 1);
 } else if (r->type == MSG_REQ_REDIS_TOUCH) {
 status = msg_prepend_format(sub_msg, "*%d\r\n$5\r\ntouch\r\n",
 sub_msg->narg + 1);
 } else if (r->type == MSG_REQ_REDIS_UNLINK) {
 status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nunlink\r\n",
 sub_msg->narg + 1);
 } else {
 NOT_REACHED();
 }
 if (status != NC_OK) {
 nc_free(sub_msgs);
 return status;
 }
 
 sub_msg->type = r->type;
 sub_msg->frag_id = r->frag_id;
 sub_msg->frag_owner = r->frag_owner;
 
 TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
 r->nfrag++;
 }
 
 nc_free(sub_msgs);
 return NC_OK;
 }
 
 |