从sdk出发

之后的所有源码解析均基于社区版。

社区版不支持但商业版支持的功能有

  • 强一致性
  • 快速启动(索引落盘)
  • 持久删除
  • 跨数据中心同步
  • 索引与数据均用ssd(社区版只支持索引放内存)

学习完aerospike的官方文档,接下来就是查看其支持的功能,最全最好的方法就是直接看sdk提供的接口。

scan

  1. 支持多种scan范围,如按整个集群或partition
  2. 不支持按key顺序scan,支持按key的hash值按序按量scan

单key的写操作

  1. 计算key的hash摘要,共20字节,取出前16位(实际其中的低12位),计算出分区id
  2. 根据分区id,查路由表,得到将要发送的目标server
  3. 根据设置的policy,决定是否只发摘要或者摘要与key值都发。write_buf包括: policy(写策略标识)+namespace+set+digest+key值(可选)

    写策略有如:持久删除、写命令类型(删除),commit时机(是否要所有从节点都回复)

sdk使用过程

对一行记录的,简单行操作如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 连接as,得到as结构题
aerospike as;
/* 初始化该连接的配置,此配置可以设置,比如修改写策略policy,实现多种能力。
如: 是否持久化写操作(默认否)、是否需要所有副本回复(默认是)、是否用key的元数据版本控制写入(默认否)。
*/
as_config config;
as_config_init(&config)
aerospike_init(as, &config);
aerospike_connect(as, &err)

// 1.申明key、ns、set
as_key key;
as_key_init_str(&key, "ns", "set", "key");
// 3.声明一行待插入的记录,包括bin名和bin值(类型与具体值)
as_record rec;
as_record_inita(&rec, 2);
as_record_set_int64(&rec, "test-bin-1", 1234);
as_record_set_str(&rec, "test-bin-2", "test-bin-2-data");
// 4.执行put
aerospike_key_put(&as, &err, NULL, &g_key, &rec)
// 5.可以get出来看结果,也可以select其中几个bin
as_record* p_rec = NULL;
aerospike_key_get(&as, &err, NULL, &g_key, &p_rec)

static const char* bins_1_3[] = { "test-bin-1", "test-bin-3", NULL };
aerospike_key_select(&as, &err, NULL, &g_key, bins_1_3, &p_rec)

对于复杂数据类型,以ordered_map举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 1.同样是连接as,指定默认配置
aerospike as;
as_config config;
as_config_init(&config)
aerospike_init(as, &config);
aerospike_connect(as, &err)
// 2. 创建hashmap,声明为2个key-value,value值为int
as_hashmap scores;
as_hashmap_init(&scores, 2);
as_string mkey1;
as_integer mval1;
as_string_init(&mkey1, "Bob", false);
as_integer_init(&mval1, 55);
as_hashmap_set(&scores, (as_val*)&mkey1, (as_val*)&mval1);
as_string mkey2;
as_integer mval2;
as_string_init(&mkey2, "Jim", false);
as_integer_init(&mval2, 98);
as_hashmap_set(&scores, (as_val*)&mkey2, (as_val*)&mval2);
// 3.创建写策略,默认策略是unordered,我们可以修改为有序。有序既可以声明为key有序,也可以先按key再按value。
as_map_policy map_policy;
as_map_policy_init(&map_policy);
map_policy.attributes = AS_MAP_KEY_VALUE_ORDERED; //here
// 4.声明put操作
as_operations ops;
as_operations_inita(&ops, 1); //1表示一个操作
as_operations_add_map_put_items(&ops, "bin名", &map_policy, (as_map*)&scores);
// 5.开始put
as_record* rec = NULL;
aerospike_key_operate(&as, &err, NULL, &g_key, &ops, &rec)
// 6.也可以对map中的单key做操作,比如增加"Bob"的值25
as_operations_inita(&ops, 1);

as_string_init(&mkey1, "Bob", false);
as_integer_init(&mval1, 25);
as_operations_add_map_increment(&ops, map_bin_name, &map_policy, (as_val*)&mkey1, (as_val*)&mval1);
aerospike_key_operate(&as, &err, NULL, &g_key, &ops, &rec)
// 7. 可以按分数顺序查询,比如取分数最高的两个值
as_operations_inita(&ops, 1);
as_operations_add_map_get_by_rank_range(&ops, map_bin_name, -2, 2, AS_MAP_RETURN_KEY_VALUE);
aerospike_key_operate(&as, &err, NULL, &g_key, &ops, &rec)

sdk代码实现

对于一行记录的put,经历的整个过程细节如下,主要关注需要传递给server哪些信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// key初始化函数,生成as_key
as_key* as_key_init_str(as_key* key, const char* ns, const char* set, const char* value);
//key的结构体定义。含ns、set、value、digest
typedef struct as_key_s {
as_namespace ns;
as_set set;
/**
* The key value.
*/
as_key_value value;
/**
* The key value pointer.
* It can point to as_key.value or a different value.
*/
as_key_value* valuep;
// hash值
as_digest digest;

} as_key

// 生成发送的buffer
as_status
aerospike_key_put(
aerospike* as, as_error* err, const as_policy_write* policy, const as_key* key, as_record* rec
)
{
// 计算key对应的分区(digest的前16位)
as_partition_info pi;
as_status status = as_command_prepare_write(as, err, &policy->base, key, &pi);

if (status != AEROSPIKE_OK) {
return status;
}
// 分配内存空间,用于发送整条put请求,每一个bin一个buffer
as_queue buffers;
as_queue_inita(&buffers, sizeof(as_buffer), rec->bins.size);
as_put put;
status = as_put_init(&put, policy, key, rec, &buffers, err);

as_command cmd;
as_command_init_write(&cmd, as->cluster, &policy->base, policy->replica, key, put.size, &pi,
as_command_parse_header, NULL);
// 发送命令。as_put_write函数将所有待发送信息放到tcp用户缓存buffer中
status = as_command_send(&cmd, err, compression_threshold, as_put_write, &put);
return status;
}

static as_status
as_put_init(
as_put* put, const as_policy_write* policy, const as_key* key, as_record* rec,
as_queue* buffers, as_error* err
)
{
put->policy = policy;
put->key = key;
put->rec = rec;
// 每一个buffer对应一个bin
put->buffers = buffers;
// 计算key所需的空间
put->size = as_command_key_size(&policy->base, policy->key, key, true, &put->tdata);
put->n_bins = rec->bins.size;

as_bin* bins = rec->bins.entries;

for (uint16_t i = 0; i < put->n_bins; i++) {
// 这里计算size,同时填入了bin的值,根据bin的类型有不同的序列化方式
as_status status = as_command_bin_size(&bins[i], buffers, &put->size, err);
}
return AEROSPIKE_OK;
}

对于map的operator操作,实现细节如下。

解析如下过程函数:

1
2
3
4
as_map_policy map_policy;
as_map_policy_init(&map_policy);
as_operations_add_map_put_items(&ops, map_bin_name, &map_policy, (as_map*)&scores);
aerospike_key_operate(&as, &err, NULL, &g_key, &ops, &rec)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
bool as_operations_add_map_put_items(
as_operations* ops, const char* name, as_map_policy* policy, as_map* items
)
{
return as_operations_map_put_items(ops, name, NULL, policy, items);
}
bool
as_operations_map_put_items(
as_operations* ops, const char* name, as_cdt_ctx* ctx, as_map_policy* policy,
as_map* items
)
{
as_packer pk = as_cdt_begin();
// 命令的序列化过程。将命令类型(PUT_ITEMS)、实际items键值、排序类型(哪种有序或无序)打包进as_pack结构
as_cdt_pack_header(&pk, ctx, policy->items_command, 2);
as_pack_val(&pk, (as_val*)items);
as_pack_uint64(&pk, policy->attributes);

as_cdt_end(&pk);
as_map_destroy(items);
// 这里将bin的名字、对bin的操作类型和上一步的整体数据一起打包进ops中
return as_cdt_add_packed(&pk, ops, name, AS_OPERATOR_MAP_MODIFY);
}

as_status
aerospike_key_operate(
aerospike* as, as_error* err, const as_policy_operate* policy, const as_key* key,
const as_operations* ops, as_record** rec
)
{
// 多少个bin操作
uint32_t n_operations = ops->binops.size;
// 每个bin操作一个buffer
as_queue buffers;
as_queue_inita(&buffers, sizeof(as_buffer), n_operations);

as_policy_operate policy_local;
as_operate oper;

as_status status = as_operate_init(&oper, as, policy, &policy_local, key, ops, &buffers, err);

policy = oper.policy;

as_partition_info pi;
status = as_command_prepare(as->cluster, err, &policy->base, key, &pi);

as_operate_size(&oper);

as_command_parse_result_data data;
data.record = rec;
data.deserialize = policy->deserialize;

as_command cmd;

if (oper.write_attr & AS_MSG_INFO2_WRITE) {
as_command_init_write(&cmd, as->cluster, &policy->base, policy->replica, key, oper.size, &pi,
as_command_parse_result, &data);
}
...
// 用的as_operate_write函数构造tcp-buffer
status = as_command_send(&cmd, err, compression_threshold, as_operate_write, &oper);

return status;
}