aerospike-SSD引擎

引言 - 从ssd工作原理出发

经过深入的学习后,我逐渐认识到,aerospike最核心的模块就是SSD引擎,其为SSD专门制作和优化的IO引擎,具备理论上限的吞吐能力。其低时延、高吞吐的最重要的原因就是其SSD引擎。

aerospike的内存索引结构,wblock的数据缓存swb结构,或是数据在磁盘上的组织结构,均基于ssd的block块设计。充分利用了ssd的物理层面上的特性,如极快的随机寻址后按块擦除或整块写能力。其直接操作ssd的方法,使得ssd硬件驱动自身几乎不需要做任何的垃圾回收,同时充分的利用了ssd多路复用,榨干了ssd的io能力。

因为其读写过程几乎完全随机寻址的访问block块设计,所以相对于传统的机械硬盘便不那么“吃香”,但是考虑到当前ssd的性能远超hdd,且成本已经越来越低,各个公司的主存储硬件基本都采用了ssd,所以aerospike是一种先进的架构模式。

而它这种ssd引擎的设计,天生适合做哈希引擎的存储底座。

一个标准的ssd结构大致如下图所示

比如

  • 页大小:4kb
  • 块大小: 128Kb

ssd以页为单位读写,以块为单位擦除。寻址时间是几乎可以不计的。

操作系统通过传给ssd硬件驱动页号,对其页进行读写。在ssd内部,每个页只允许写入一次,如果第二次写入,则必须搬迁到其他的页中,并擦除原来页所在的整个块。

对于传统的lsm结构,如rocksdb,其追加写模式最初是为机械硬盘设计的,能减少占比最大的寻道时间。但一定程度上也对ssd结构友好。大量的追加写会使得操作系统尽量传连续的页号给ssd,ssd的ftl也会尽量使其能够在同一个块中进行写入,保证每个页只会写入一次,减少了因搬迁造成的性能损耗。

但显然这有缺陷,首先,上层无法控制操作系统传多少连续的页号,举个例子,可能一个块有32个页,而上层传的地址范围,对应页号到操作系统可能是间断的,比如3、7、9个页,那么就做了三次访问ssd的io操作。当然上面的这种情况操作系统会做足够的优化,只要你不主动调用sync刷盘,操作系统会自己决定什么时候刷page cache,影响不算大。第二种情况是无法控制一个文件会不会跨块,我们知道compaction涉及到删除文件,那么删除一个文件可能会擦除别的文件的块,造成ssd内部的页数据搬运。

再者,lsm不可避免的引入了读放大和写放大。读操作而言,如果内存中读不到数据,读可能会读多层sst文件。写操作而言,每一个写操作最终可能有10多倍的写io放大,浪费了很多ssd的带宽。

对于顺序写文件模型,内核实现存在一个普遍的问题,即只能单线程写wal。比如对于rocksdb,一般会选一个writer_leader出来做某批write_batch的write()系统调用,并调用sync刷盘。但as可以多线程写盘,最大化利用ssd写带宽。

总的来说,lsm树性能很好,但不是对ssd结构最优的。大致有以下几点:

  1. 访问ssd硬件次数不可控
  2. 有多余的块擦除和搬运
  3. 不可避免的依赖操作系统的黑盒操作和长链路
  4. compaction阶段势必会占用大量的ssd带宽,造成性能抖动
  5. 读写放大严重,浪费ssd带宽 (性能影响最大因素)
  6. 顺序写文件模型只能单线程写wal。

而aerospike的ssd引擎解决了或缓解了上面这些问题。第1-2点问题是推导的,但笔者使用fio做了详细的各种场景测试,走文件系统或裸设备,开关direct-io,4kb与128kb读写等,基本证明了上面的猜想。

总览

主要从上层架构上讲清楚其ssd管理模型。

aerospike的ssd管理方式,和本身ssd的底层块擦写原理契合,而基于磁盘的数据库引擎(如rocksdb)其瓶颈基本都在IO上,因此aerospike依靠其ssd引擎,能实现较高的qps和稳定的极低的延时,原因在于没有操作系统的文件系统链路,延时在纯读写ssd块数据等待硬件响应完成上,配合代码控制读写块的并发度,延时是可预测和可控的。如果命中buf(数量可配)缓存时延就更低。

因为直接和ssd通信,根据ssd的种类如NVME的PCIE通道数,能够更好的设置写盘的线程数,而传统的基于操作系统的文件系统IO,因为操作系统链路的黑盒,很难设置读写IO线程数。

首先是块组织,aerospike将一个ssd设备会组织为如下图中的形式。其只会以某个可配置值wblock_size(默认1M)覆盖写ssd,永不修改其中某一部分或者覆盖0值。

当然,覆盖写某个逻辑块,在ssd硬件自身会做擦除原块,并取一个崭新块刷入。但是因为数据库自身不需要将块清0写入,所以变相减少了块的io次数。同时以ssd的底层块为整数倍做io,使得ssd无需为了其空洞页做垃圾回收(即搬运迁移操作),这在用操作系统的io是做不到的。

当然即使上层传了一个连续的块地址,用direct-io,如pwrite0~128kb,操作系统实际上也是用的LBA逻辑块号以最小io大小如4kb对齐,传递给的ssd驱动。所以经过了ssd的ftl也不一定对应于一个ssd的物理块,所以上面的as减少了垃圾回收的次数属于一种推论,实际上还得看ftl的算法,即ssd自身硬件有没有将这批连续的逻辑页号真正映射到物理页。

aerospike管理的最小单元为rblock(固定为16字节)。索引定位数据位置通过rblock_id定位。对于wblock,其内存维护了wblock_state结构管理ssd所有wblock状态。

当我们删除一条record,aerospike会维护红黑树结构,直接删除内存索引树节点。并减少wblock_state结构中对该wblock的使用大小inuse。注意没有做任何的io操作!。

  • 一旦inuse降为0,将该wblock状态标记为free,并纳入wblock的free队列,直接可以复用,复用即后续覆盖写,因此该处没有擦除动作。
  • 一旦碎片率过高(默认50%),会将该wblock传递的碎片整理线程,碎片整理线程会读出该wblock的全部record,然后迁移走,到一个新的wblock中。

碎片整理的迁移过程中,会记录该wblock迁往的目标wblock列表。直到目标wblock对应的swb缓存全部刷盘完毕,该源wblock才可回收。大多少情况下只会迁往某一个wblock中

以2个低利用率的wblock为例,其被碎片整理线程迁往了某个wblock中。后被刷屏线程刷盘持久化,再回收该低利用率的wblock。

因此,aerospike的删除操作并非持久删除。宕机重启若不进行主从状态同步,删除数据会back回内存索引。

刷盘后,wblock_state关联的ssd数据缓存swb会判断当前buf数量是否超过设置值(默认256个缓存buf)。没超过则不重置swb,减少频繁的碎片清理或ssd读取的io操作。利用了wblock访问的局部性。

aerospike默认的wblock缓存数据较小,以wblock为1M为例,默认只会缓存256个wblock在内存中。其缓存时机为刷盘后不释放。如在刷盘后处理

1
ssd_post_write(drv_ssd *ssd, ssd_write_buf *swb)

函数中,刷完盘后并不会立马释放swb,而是判断是否大于了缓存数量大小,如果大于后再释放swb。

而对于读record操作,也是先看该缓存是否存在,不存在才重新从ssd中读record

这里有很细节的地方,没有把整个wblock都读进来导致放大写,而是以ssd的最小io大小做地址对齐,读ssd的整数个page,囊括整个record数据,再通过record_size和起始地址等信息,取出需要的部分。

相当于读操作也是跟ssd底层的按page读原理对应,该读多少读多少,不会放大读。读进来后也不会填充缓存,用完内存直接释放。

完整的wblock状态转移图如下

as的写瓶颈取决于刷盘线程,如果刷盘队列中大于64M(默认),则开始拒写。如果刷盘队列大于192M,defrag线程会睡眠等待。

读写线程模型与ssd

在aerospike的ssd模式下,默认会开cpu数量五倍的服务线程数。服务线程有以下特点:

  • 每一个线程有一个epoll,管理一批client连接的fd
  • 负责连接的tcp读写、应用层协议解析、以及读写全过程、直到回包。即负责一个连接的全生命周期
  • 连接的分发方式采用round_robin给各个服务线程。

以读操作为例。假如读的record在对应的wblock已经在内存中有缓存buf(前文提到的默认256个刷盘后不释放的buf)。则直接读出结果,否则按页按需读取ssd,读完后立即释放内存。

写操作。对于一条新纪录:

  1. 获取或创建该记录的内存索引
  2. 获取一个swb(ssd_write_buffer,每个ssd最多同时3个写缓冲swb),往该swb中追加写入记录,若记录写满该swb,将swb追加到刷盘队列中,让刷盘线程刷盘。
  3. 若上一步写ssd成功,将内存索引指向该新的位置。
  4. 如果该写操作覆盖了老的记录,减少老记录对于wblock_state的size值,若老wblock碎片率高,则转交给碎片整理线程处理。

ssd分区

值得关注的是:aerospike正常运行后几乎只有刷盘线程在写盘,对于NVME设备,有4个PCIE通道,aerospike建议将该设备划分为多个分区(比如最少4个,最佳实践是小于cpu数量,用bench测),这样代码就会创建多个写线程能同时刷盘,最大化IO。假如一个NVME设备用原始裸设备配置在aerospike上,只会创建一个写线程,在满负载时,用linux命令iostat也会显示100%的磁盘使用率,但是可能只是用满了一个PCIE通道。

https://aerospike.com/docs/database/manage/planning/ssd/setup/#partition-your-flash-devices

测试了一下分区带来的影响。

当整块ssd不分区的时候。测试命令如下。写命令比例100%

1
./asbench -h infra-bjx-e20-139.idchb1az1.hb1.kwaidc.com -n test -s aerospike -b f -k 100000000 -o S200 -w RU,0 -z 64 -t 600 -prefix_str test -R

iostat显示的结果如下,看到说io已经满负载了。最终稳定tps约78.7W(若100%读为104W),ssd带宽约为3.3G,写带宽约228M。

1
2
3
4
iostat -x -d 5

Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
nvme0n1 0.00 0.00 770913.40 640.80 3351662.40 228760.80 9.28 3.80 0.15 0.15 1.01 0.00 100.02

然后我将原ssd分成了4个分区,配置文件也修改如下

1
2
3
4
5
6
7
8
9
10
11
12
namespace test {
replication-factor 2
storage-engine device {
# Use one or more lines like those below with actual device paths.
device /dev/nvme0n1p1
device /dev/nvme0n1p2
device /dev/nvme0n1p3
device /dev/nvme0n1p4
cold-start-empty true
# write-block-size 128K
}
}

再进行了一次测试,tps 111.5W(换100%读112.3W,几乎一模一样), 读带宽到了3.4G,写带宽到了408M。tps表现增加了整整40W,写带宽翻了一倍!

提升相当巨大。

1
2
3
iostat -x -d 5
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
nvme0n1 0.00 0.00 734149.00 1102.20 3425954.40 408572.80 10.43 5.09 0.17 0.16 1.12 0.00 100.00

结果分析:只能说很好很强大,读写吞吐完全一致,单nvme的ssd能达到100多万的qps。

初始化

在as启动阶段,会对ssd进行初始化,主要是扫描一遍ssd的全部数据,填充相应的partition索引,主要会经历以下流程

  1. 硬件信息获取与初始化
    • 获取配置文件设置的写块大小(默认1M),配置相关的写队列缓存。
    • 获取ssd最小一次io的大小, 比如4096字节
    • 获取ssd总大小,以块大小对齐
    • 初始化记录每个块状态的结构,标识每个块未使用
    • 与ssd交互的驱动队列初始化
  2. header读取与校验
    • 读取ssd最前的8M空间,校验是否满足as的header格式,若不是,表示ssd未格式化,直接退出
    • 将读出的header初始化结构体。header里面记录了一系列的元信息,包括对应的namespace名字、partition列表等等
    • 根据header初始化partition的索引树结构
  3. ssd全盘扫描,构建内存索引
    • as的块设计,会将ssd数据放在序号小的block_id上,因此从block_id从小到大扫,实际是扫到连续的10个未使用的block时停止继续扫描,认为完毕。
    • 2T的ssd,扫描30%(数据存储使用30%),用时约2小时

以下为ssd初始化结构的核心源码摘录与注释。(删除了所有非核心链路的代码)

main函数中的第一个ssd入口为as_storage_init_ssd,该函数完成了除全盘扫描以外的全部操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// ssd初始化
as_storage_init_ssd(as_namespace *ns) {
drv_ssds *ssds;
ssd_init_devices(ns, &ssds);
...
/*
ns->storage_max_write_cache = 64M 记录最大写buffer
ns->storage_write_block_size = 1M 一次操作的ssd块大小
ns->storage_max_write_q = 64 计算出最多64个块缓冲
*/
ns->storage_max_write_q = (uint32_t)
(ssds->n_ssds * ns->storage_max_write_cache /
ns->storage_write_block_size);

// 碎片清理百分比,默认50%,当垃圾超过50%时开始ssd垃圾回收
ns->defrag_lwm_size =
(ns->storage_write_block_size * ns->storage_defrag_lwm_pct) / 100;
...
// 留一个DRV_HEADER_SIZE大小的作为ssd驱动头部份(8M),得出第一个块id为8
uint32_t first_wblock_id = DRV_HEADER_SIZE / ns->storage_write_block_size;

// 初始化drv_ssd structures
for (int i = 0; i < ssds->n_ssds; i++) {
drv_ssd *ssd = &ssds->ssds[i];

ssd->ns = ns;
ssd->file_id = i;
...

ssd->running = true;
...

// 简单赋值给结构题
ssd->write_block_size = ns->storage_write_block_size;
ssd->first_wblock_id = first_wblock_id;
ssd->pristine_wblock_id = first_wblock_id;
/* 对所有的ssd的block,申请了对应的block_state结构,标识该block的状态*/
ssd_wblock_init(ssd);

// Note: free_wblock_q, defrag_wblock_q created after loading devices.

cf_pool_int32_init(&ssd->fd_pool, MAX_POOL_FDS, -1);
cf_pool_int32_init(&ssd->fd_cache_pool, MAX_POOL_FDS, -1);


// 队列初始化:写队列,和free释放队列。队列里面存的指针。
ssd->swb_write_q = cf_queue_create(sizeof(void*), true);
ssd->swb_free_q = cf_queue_create(sizeof(void*), true);
// 该队列意义不明
ssd->post_write_q = cf_queue_create(sizeof(void*),
ns->storage_commit_to_device);
...
}

// 读每个ssd的头block,初始化header结构。此处也能检查ssd有没有初始化(header部份非0),获取ssd的数据元信息。
ssd_init_synchronous(ssds);
}

/* 获取ssd的设备元信息 */
void
ssd_init_devices(as_namespace *ns, drv_ssds **ssds_p)
{
size_t ssds_size = sizeof(drv_ssds) +
(ns->n_storage_devices * sizeof(drv_ssd));
drv_ssds *ssds = cf_malloc(ssds_size);

memset(ssds, 0, ssds_size);
ssds->n_ssds = (int)ns->n_storage_devices;
ssds->ns = ns;

// 对每一个ssd设备初始化,得到元信息
for (uint32_t i = 0; i < ns->n_storage_devices; i++) {
drv_ssd *ssd = &ssds->ssds[i];

ssd->name = ns->storage_devices[i];

// 默认读写、直接io、强同步模式
ssd->open_flag = O_RDWR | O_DIRECT |
(ns->storage_disable_odsync ? 0 : O_DSYNC);

int fd = open(ssd->name, ssd->open_flag);
...
// 确定真正能够写入的ssd大小,去必要的头部以及按块大小对齐
uint64_t size = 0;
ioctl(fd, BLKGETSIZE64, &size); // gets the number of byte
ssd->file_size = check_file_size(ns, size, "usable device");
// 判断ssd设备最少一次读取出的数据大小
ssd->io_min_size = find_io_min_size(fd, ssd->name);
// 冷启动若设置为每一次启动都清空ssd数据,那么初始化header为全0
if (ns->cold_start && ns->storage_cold_start_empty) {
ssd_empty_header(fd, ssd->name);
}

close(fd);

ns->drives_size += ssd->file_size; // increment total storage size
}

*ssds_p = ssds;
}

其中上面涉及的wblock、以及读取ssd-header的初始化代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void
ssd_wblock_init(drv_ssd *ssd)
{
uint32_t n_wblocks = (uint32_t)(ssd->file_size / ssd->write_block_size);

cf_info(AS_DRV_SSD, "%s has %u wblocks of size %u", ssd->name, n_wblocks,
ssd->write_block_size);

ssd->n_wblocks = n_wblocks;
ssd->wblock_state = cf_malloc(n_wblocks * sizeof(ssd_wblock_state));

// 初始化wblock_state,标识为free状态,inuse_sz为0
for (uint32_t i = 0; i < n_wblocks; i++) {
ssd_wblock_state * p_wblock_state = &ssd->wblock_state[i];

p_wblock_state->inuse_sz = 0;
cf_mutex_init(&p_wblock_state->LOCK);
p_wblock_state->swb = NULL;
p_wblock_state->state = WBLOCK_STATE_FREE;
p_wblock_state->n_vac_dests = 0;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

void
ssd_init_synchronous(drv_ssds *ssds)
{
uint64_t random = 0;

while (random == 0) {
random = cf_get_rand64();
}

int n_ssds = ssds->n_ssds;
as_namespace *ns = ssds->ns;

drv_header *headers[n_ssds];
int first_used = -1;

for (int i = 0; i < n_ssds; i++) {
drv_ssd *ssd = &ssds->ssds[i];
/* 从ssd的header块中读出元信息。包括
1. 驱动版本号
2. namespace名字
3. write_block_size大小
*/
headers[i] = ssd_read_header(ssd);
// 如果返回NULL,表示是一块全新的ssd
if (! headers[i]) {
headers[i] = ssd_init_header(ns, ssd);
}
else if (first_used < 0) {
first_used = i;
}
}
// 如果所有的ssd都是全新的,那么会走到这,初始化header,并写入ssd
if (first_used < 0) {
...
ssds->generic = cf_valloc(ROUND_UP_GENERIC);
memcpy(ssds->generic, &headers[0]->generic, ROUND_UP_GENERIC);

ssds->generic->prefix.n_devices = n_ssds;
ssds->generic->prefix.random = random;

for (int i = 0; i < n_ssds; i++) {
headers[i]->unique.device_id = (uint32_t)i;
}
ssd_adjust_versions(ns, ssds->generic->pmeta);
ssd_flush_header(ssds, headers);
...

ssds->all_fresh = true; // won't need to scan devices
return;
}

...

// 将header数据复制到ssds->generic结构体中,做相应更新后,重新写回ssd的header块部分
ssds->generic = cf_valloc(ROUND_UP_GENERIC);
memcpy(ssds->generic, &headers[first_used]->generic, ROUND_UP_GENERIC);

ssds->generic->prefix.n_devices = n_ssds; // may have added fresh drives
ssds->generic->prefix.random = random;
ssds->generic->prefix.flags &= ~DRV_HEADER_FLAG_TRUSTED;

if (fresh_drive || n_ssds < prefix_first->n_devices ||
(ns->dirty_restart && non_commit_drive)) {
ssd_adjust_versions(ns, ssds->generic->pmeta);
}

ssd_flush_header(ssds, headers);
ssd_flush_final_cfg(ns);

...

// 读出每个paritition的树id
for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
drv_pmeta *pmeta = &ssds->generic->pmeta[pid];

ssds->get_state_from_storage[pid] =
as_partition_version_has_data(&pmeta->version);
ns->partitions[pid].tree_id = pmeta->tree_id;
}
...

// 冷启动时,对于每一个磁盘标识存在的分区,构造分区对于的索引树结构
for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
if (ssds->get_state_from_storage[pid]) {
as_partition* p = &ns->partitions[pid];

p->tree = as_index_tree_create(&ns->tree_shared, p->tree_id,
as_partition_tree_done, (void*)p);

as_set_index_create_all(ns, p->tree);
}
}
// 冷启动时间
ns->cold_start_now = now;
}

全盘扫描

ssd全盘扫描函数,创建了一个新的线程run_ssd_cold_start去做该工作(实际上as有一个线程池,是从线程池里取一个线程做该工作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void
start_loading_records(drv_ssds *ssds, cf_queue *complete_q)
{
as_namespace *ns = ssds->ns;

ns->loading_records = true;

// p即complete_rc唯一的作用就是表示有多少块ssd(通过rc引用计数),用于在所有ssd都load完毕后执行最后的收尾工作。
void *p = cf_rc_alloc(1);
for (int i = 1; i < ssds->n_ssds; i++) {
cf_rc_reserve(p);
}

for (int i = 0; i < ssds->n_ssds; i++) {
drv_ssd *ssd = &ssds->ssds[i];
ssd_load_records_info *lri = cf_malloc(sizeof(ssd_load_records_info));

lri->ssds = ssds;
lri->ssd = ssd;
lri->complete_q = complete_q;
lri->complete_rc = p;
// 创建临时线程(线程池中取)
cf_thread_create_transient(run_ssd_cold_start, (void*)lri);
}
}

void *
run_ssd_cold_start(void *udata)
{
ssd_load_records_info *lri = (ssd_load_records_info*)udata;
drv_ssd *ssd = lri->ssd;
drv_ssds *ssds = lri->ssds;
cf_queue *complete_q = lri->complete_q;
void *complete_rc = lri->complete_rc;

cf_free(lri);
// 核心入口
ssd_cold_start_sweep(ssds, ssd);
// 打印本次完整扫描的统计情况
cf_info(AS_DRV_SSD, "device %s: read complete: UNIQUE %lu (REPLACED %lu) (OLDER %lu) (EXPIRED %lu) (EVICTED %lu) records",
ssd->name, ssd->record_add_unique_counter,
ssd->record_add_replace_counter, ssd->record_add_older_counter,
ssd->record_add_expired_counter, ssd->record_add_evicted_counter);
// 通知主线程完成扫描
if (cf_rc_release(complete_rc) == 0) {
as_namespace* ns = ssds->ns;
ns->loading_records = false;
...
void *_t = NULL;
cf_queue_push(complete_q, &_t);
cf_rc_free(complete_rc);
}

return NULL;
}

全盘扫描核心逻辑函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void
ssd_cold_start_sweep(drv_ssds *ssds, drv_ssd *ssd)
{
size_t wblock_size = ssd->write_block_size;
uint8_t *buf = cf_valloc(wblock_size);

const char *read_ssd_name = ssd->name;
int fd = ssd_fd_get(ssd);
int write_fd = -1;

// Loop over all wblocks, unless we encounter 10 contiguous unused wblocks.

ssd->sweep_wblock_id = ssd->first_wblock_id;

uint64_t file_offset = DRV_HEADER_SIZE;
uint32_t n_unused_wblocks = 0;


while (file_offset < ssd->file_size && n_unused_wblocks < 10) {
pread_all(fd, buf, wblock_size, (off_t)file_offset)

uint32_t indent = 0; // current offset within wblock, in bytes
while (indent < wblock_size) {
as_flat_record *flat = (as_flat_record*)&buf[indent];

// Look for record magic.
if (flat->magic != AS_FLAT_MAGIC) {
// Should always find a record at beginning of used wblock. if
// not, we've likely encountered the unused part of the device.
if (indent == 0) {
n_unused_wblocks++;
break; // try next wblock
}
// else - nothing more in this wblock, but keep looking for
// magic - necessary if we want to be able to increase
// write-block-size across restarts.

indent += RBLOCK_SIZE;
continue; // try next rblock
}
...
// N_RBLOCKS_TO_SIZE为16bytes
uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks);
uint32_t next_indent = indent + record_size;

// Found a record - try to add it to the index.
ssd_cold_start_add_record(ssds, ssd, flat,
OFFSET_TO_RBLOCK_ID(file_offset + indent), record_size);

indent = next_indent;
}

file_offset += wblock_size;
ssd->sweep_wblock_id++;
}

ssd->pristine_wblock_id = ssd->sweep_wblock_id - n_unused_wblocks;

ssd->sweep_wblock_id = (uint32_t)(ssd->file_size / wblock_size);
...
cf_free(buf);
}

对于每一个record,解析并增加内存索引的函数如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Add a record just read from drive to the index, if all is well.
void
ssd_cold_start_add_record(drv_ssds* ssds, drv_ssd* ssd,
const as_flat_record* flat, uint64_t rblock_id, uint32_t record_size)
{
// 用hash值算出partition-id
uint32_t pid = as_partition_getid(&flat->keyd);
// 在ssd的header中指示了哪些partition是本节点关心的,不关心的直接跳过
if (! ssds->get_state_from_storage[pid]) {
return;
}

as_namespace* ns = ssds->ns;
as_partition* p_partition = &ns->partitions[pid];

// 解析额外的meta信息,如key值、set名、bins数量
const uint8_t* end = (const uint8_t*)flat + record_size - END_MARK_SZ;
as_flat_opt_meta opt_meta = { { 0 } };
const uint8_t* p_read = as_flat_unpack_record_meta(flat, end, &opt_meta);
...

const uint8_t* cb_end = NULL;
// 校验每一个bin的结构是否正确,得出末尾地址指针
const uint8_t* exact_end = as_flat_check_packed_bins(p_read, end,
opt_meta.n_bins);
// 校验末尾值是否正确,防意外bug(用key的哈希值计算出的一个值)
if (! drv_check_end_mark(cb_end == NULL ? exact_end : cb_end, flat)) {
cf_warning(AS_DRV_SSD, "bad end marker for %pD", &flat->keyd);
return;
}
...

// Get/create the record from/in the appropriate index tree.
as_index_ref r_ref;
int rv = as_record_get_create(p_partition->tree, &flat->keyd, &r_ref, ns);
bool is_create = rv == 1;
as_index* r = r_ref.r;

// 如果不是新的索引节点,而又找到了该key,需要判断新旧关系,决定哪个留下来。
if (! is_create) {
// Record already existed. Ignore this one if existing record is newer.
if (prefer_existing_record(ns, flat, opt_meta.void_time, r)) {
// 如果当前记录更老,代码会进来,销毁当前这个record
ssd_cold_start_adjust_cenotaph(ns, flat, opt_meta.void_time, r);
as_record_done(&r_ref, ns);
ssd->record_add_older_counter++;
return;
}
}
// The record we're now reading is the latest version (so far) ...

// 过期删除
if (opt_meta.void_time != 0 && ns->cold_start_now > opt_meta.void_time) {
if (! is_create) {
// Note - no sindex to adjust.
as_set_index_delete_live(ns, p_partition->tree, r, r_ref.r_h);
}

as_index_delete(p_partition->tree, &flat->keyd);
as_record_done(&r_ref, ns);
ssd->record_add_expired_counter++;
return;
}

// 淘汰删除
if (opt_meta.void_time != 0 && ns->evict_void_time > opt_meta.void_time &&
drv_is_set_evictable(ns, &opt_meta)) {
if (! is_create) {
// Note - no sindex to adjust.
as_set_index_delete_live(ns, p_partition->tree, r, r_ref.r_h);
}

as_index_delete(p_partition->tree, &flat->keyd);
as_record_done(&r_ref, ns);
ssd->record_add_evicted_counter++;
return;
}

// 到此处,该record就完全保留了,也已经分配了索引节点
drv_apply_opt_meta(r, ns, &opt_meta);

// Set/reset the record's last-update-time, generation, and void-time.
r->last_update_time = flat->last_update_time;
r->generation = flat->generation;
r->void_time = opt_meta.void_time;

// Update maximum void-time.
as_setmax_uint32(&p_partition->max_void_time, r->void_time);

if (is_create) {
ssd->record_add_unique_counter++;
}
// 如果之前已经读到了该key,但是现在这个record更新,会将之前的record换个位置。相当于把之前的所有rblock释放掉,找一块新的地方放。
else if (STORAGE_RBLOCK_IS_VALID(r->rblock_id)) {
// Replacing an existing record, undo its previous storage accounting.
ssd_block_free(&ssds->ssds[r->file_id], r->rblock_id, r->n_rblocks,
"record-add");
ssd->record_add_replace_counter++;
}
else {
cf_warning(AS_DRV_SSD, "replacing record with invalid rblock-id");
}

// 这里插入了索引树
ssd_cold_start_transition_record(ns, flat, &opt_meta, p_partition->tree,
&r_ref, is_create);

uint32_t wblock_id = RBLOCK_ID_TO_WBLOCK_ID(ssd, rblock_id);
// 维护wblock_state结构
ssd->inuse_size += record_size;
ssd->wblock_state[wblock_id].inuse_sz += record_size;

// Set/reset the record's storage information.
r->file_id = ssd->file_id;
r->rblock_id = rblock_id;

as_namespace_adjust_set_data_used_bytes(ns, as_index_get_set_id(r),
DELTA_N_RBLOCKS_TO_SIZE(flat->n_rblocks, r->n_rblocks));

r->n_rblocks = flat->n_rblocks;
// 释放锁
as_record_done(&r_ref, ns);
}

释放一串rblock的代码操作如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ssd_block_free(&ssds->ssds[r->file_id], r->rblock_id, r->n_rblocks,
"record-add");

/* 值得注意的是,该函数大多数情况下仅仅只是维护了wblock_state这个结构,将inuse_sz减少,没有真正修改任何ssd里面的数据。
当inuse_sz=0时,会将该wblock_id放到free_q来回收。当inuse_sz小于碎片整理阈值(默认50%)时,会将其传给defrag_q用于碎片整理。
*/
void
ssd_block_free(drv_ssd *ssd, uint64_t rblock_id, uint32_t n_rblocks, char *msg)
{
// Determine which wblock we're reducing used size in.
uint64_t start_offset = RBLOCK_ID_TO_OFFSET(rblock_id);
uint32_t size = N_RBLOCKS_TO_SIZE(n_rblocks);
uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset);
uint32_t end_wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset + size - 1);
...

as_add_uint64(&ssd->inuse_size, -(int64_t)size);

ssd_wblock_state *p_wblock_state = &ssd->wblock_state[wblock_id];

cf_mutex_lock(&p_wblock_state->LOCK);

int64_t resulting_inuse_sz =
(int32_t)as_aaf_uint32(&p_wblock_state->inuse_sz, -(int32_t)size);


if (p_wblock_state->state == WBLOCK_STATE_USED) {
if (resulting_inuse_sz == 0) {
as_incr_uint64(&ssd->n_wblock_direct_frees);
push_wblock_to_free_q(ssd, wblock_id);
}
else if (resulting_inuse_sz < ssd->ns->defrag_lwm_size) {
push_wblock_to_defrag_q(ssd, wblock_id);
}
}
else if (p_wblock_state->state == WBLOCK_STATE_EMPTYING) {
if (resulting_inuse_sz == 0) {
push_wblock_to_free_q(ssd, wblock_id);
}
}

cf_mutex_unlock(&p_wblock_state->LOCK);
}

对于完全清空的wblock,直接回收其id,并标记其wblock_state为已释放。意味着需要分配一个wblock的时候,可以直接从ssd->free_wblock_q取,然后覆盖写入,减少了擦除操作。

1
2
3
4
5
6
7
8
9
10
11
// Put a wblock on the free queue for reuse.
static inline void
push_wblock_to_free_q(drv_ssd *ssd, uint32_t wblock_id)
{
// 全盘扫描时,还没初始化该队列。因全盘扫描一结束便会自己手动扫一遍block_state
if (ssd->free_wblock_q == NULL) {
return;
}
ssd->wblock_state[wblock_id].state = WBLOCK_STATE_FREE;
cf_queue_push(ssd->free_wblock_q, &wblock_id);
}

对于碎片率已经达到高水位的块,传递给碎片整理程序处理。会将其搬空

1
2
3
4
5
6
7
8
9
10
// Put a wblock on the defrag queue.
static inline void
push_wblock_to_defrag_q(drv_ssd *ssd, uint32_t wblock_id)
{
if (ssd->defrag_wblock_q) { // null until devices are loaded at startup
ssd->wblock_state[wblock_id].state = WBLOCK_STATE_DEFRAG;
cf_queue_push(ssd->defrag_wblock_q, &wblock_id);
as_incr_uint64(&ssd->n_defrag_wblock_reads);
}
}

垃圾回收与碎片整理

在全盘扫描完成后,初始化填充了wblock_state[]结构体,得知了每一个wblock的磁盘使用量。此时会立马进行一次扫描所有该结构体的操作,创建相关free队列(垃圾块回收)和defrag队列(碎片整理)。将相关的wblock放置于相应的队列中。

对于free队列,逻辑很简单,在需要分配wblock时,直接复用该块即可。

对于defrag队列,则需要碎片整理,将该块的数据全部搬运走,直到清0,进入free队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 全盘扫描后紧接的下一个总入口函数
void
as_storage_activate_ssd(as_namespace *ns)
{
drv_ssds *ssds = (drv_ssds*)ns->storage_private;
// 统计与初始化队列
ssd_load_wblock_queues(ssds);

// 开启各个ssd相关线程
//maintenance线程仅仅只是定时打印ssd读写与碎片整理等统计信息到日志中
ssd_start_maintenance_threads(ssds);
//
ssd_start_write_threads(ssds);
ssd_start_defrag_threads(ssds);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ssd_load_wblock_queues -> run_load_queues
void*
run_load_queues(void *pv_data)
{
drv_ssd *ssd = (drv_ssd*)pv_data;
// 创建队列
ssd->free_wblock_q = cf_queue_create(sizeof(uint32_t), true);
ssd->defrag_wblock_q = cf_queue_create(sizeof(uint32_t), true);

as_namespace *ns = ssd->ns;
uint32_t lwm_pct = ns->storage_defrag_lwm_pct;
uint32_t lwm_size = ns->defrag_lwm_size;
// 每一个百分比一个结构体,本质上是为了后续日志打印。
defrag_pen pens[lwm_pct];

for (uint32_t n = 0; n < lwm_pct; n++) {
defrag_pen_init(&pens[n]);
}

uint32_t first_id = ssd->first_wblock_id;
uint32_t end_id = ssd->pristine_wblock_id;
// 遍历全部的wblock_state结构,进行碎片整理或垃圾回收
for (uint32_t wblock_id = first_id; wblock_id < end_id; wblock_id++) {
uint32_t inuse_sz = ssd->wblock_state[wblock_id].inuse_sz;

if (inuse_sz == 0) {
// Faster than using push_wblock_to_free_q() here...
cf_queue_push(ssd->free_wblock_q, &wblock_id);
}
else if (inuse_sz < lwm_size) {
defrag_pen_add(&pens[(inuse_sz * lwm_pct) / lwm_size], wblock_id);
}
else {
ssd->wblock_state[wblock_id].state = WBLOCK_STATE_USED;
}
}

defrag_pens_dump(pens, lwm_pct, ssd->name);

for (uint32_t n = 0; n < lwm_pct; n++) {
// 该步将pens的所有wblock_id传送到defrag_queue中
defrag_pen_transfer(&pens[n], ssd);
defrag_pen_destroy(&pens[n]);
}
// 得到当前碎片整理队列的队列长度
ssd->n_defrag_wblock_reads = (uint64_t)cf_queue_sz(ssd->defrag_wblock_q);

return NULL;
}

重点关注defrag线程做的工作,如何整理碎片率高的wblock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ssd_start_defrag_threads(ssds) -> run_defrag
void*
run_defrag(void *pv_data)
{
drv_ssd *ssd = (drv_ssd*)pv_data;
as_namespace *ns = ssd->ns;
uint32_t wblock_id;
uint8_t *read_buf = cf_valloc(ssd->write_block_size);
// 省略了sleep的代码(如刷盘压力大时会sleep)
while (true) {
//需整理的wblock队列中取出一个id来整理
cf_queue_pop(ssd->defrag_wblock_q, &wblock_id, CF_QUEUE_FOREVER);
ssd_defrag_wblock(ssd, wblock_id, read_buf);
...
}

return NULL;
}

对整个wblock从ssd中读出来,遍历所有record,迁移走

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int
ssd_defrag_wblock(drv_ssd *ssd, uint32_t wblock_id, uint8_t *read_buf)
{
int record_count = 0;

ssd_wblock_state* p_wblock_state = &ssd->wblock_state[wblock_id];
// 这个assert表明了当前整理块的迁移目的块必须为0
cf_assert(p_wblock_state->n_vac_dests == 0, AS_DRV_SSD,
"n-vacations not 0 beginning defrag wblock");

// Make sure this can't decrement to 0 while defragging this wblock.
p_wblock_state->n_vac_dests = 1;
...

int fd = ssd_fd_get(ssd);
uint64_t file_offset = WBLOCK_ID_TO_OFFSET(ssd, wblock_id);

// 将该wblock全部读出来到read_buf中
pread_all(fd, read_buf, ssd->write_block_size, (off_t)file_offset)
ssd_fd_put(ssd, fd);
...
uint32_t indent = 0; // current offset within the wblock, in bytes

//开始逐个record的解析该wblock
while (indent < ssd->write_block_size &&
as_load_uint32(&p_wblock_state->inuse_sz) != 0) {
as_flat_record *flat = (as_flat_record*)&read_buf[indent];

if (! prefetch) {
ssd_decrypt(ssd, file_offset + indent, flat);
}

if (flat->magic != AS_FLAT_MAGIC) {
// First block must have magic.
if (indent == 0) {
cf_warning(AS_DRV_SSD, "%s: no magic at beginning of used wblock %d",
ssd->name, wblock_id);
break;
}

// Later blocks may have no magic, just skip to next block.
indent += RBLOCK_SIZE;
continue;
}

uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks);
uint32_t next_indent = indent + record_size;
...

// 到此处解析出了一个record,将其迁移走
int rv = ssd_record_defrag(ssd, wblock_id, flat,
OFFSET_TO_RBLOCK_ID(file_offset + indent));

if (rv == 0) {
record_count++;
}

indent = next_indent;
}

Finished:
//该块已经被完全清空,释放关联的wblock
ssd_release_vacated_wblock(ssd, wblock_id, p_wblock_state);
return record_count;
}

对于单条record迁移走的逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
int ssd_record_defrag(drv_ssd *ssd, uint32_t wblock_id, as_flat_record *flat,
uint64_t rblock_id)
{
...
int rv;
as_index_ref r_ref;
// 从索引树中找到该索引记录,hold锁
bool found = 0 == as_record_get(rsv.tree, &flat->keyd, &r_ref);

if (found) {
as_index *r = r_ref.r;

if (r->file_id == ssd->file_id && r->rblock_id == rblock_id) {
...
defrag_move_record(ssd, wblock_id, flat, r);
rv = 0; // record was in index tree and current - moved it
}
else {
rv = -1; // record was in index tree - presumably was overwritten
}
//释放锁
as_record_done(&r_ref, ns);
}
else {
rv = -2; // 索引树中找不到,基本是被删除了
}
...

return rv;
}

void
defrag_move_record(drv_ssd *src_ssd, uint32_t src_wblock_id,
as_flat_record *flat, as_index *r)
{
uint64_t old_rblock_id = r->rblock_id;
uint32_t old_n_rblocks = r->n_rblocks;

drv_ssds *ssds = (drv_ssds*)src_ssd->ns->storage_private;

drv_ssd *ssd = &ssds->ssds[ssd_get_file_id(ssds, &flat->keyd)];

uint32_t ssd_n_rblocks = flat->n_rblocks;
uint32_t write_size = N_RBLOCKS_TO_SIZE(ssd_n_rblocks);

cf_mutex_lock(&ssd->defrag_lock);
//这一步需要重点关注。一般ssd上会挂着当前碎片迁往的swb,如果该swb写满了可能就会为空。
//那么会重新分配一个全新的wblock(从ssd->free_wblock_q中取或直接拿新的),然后关联本swb
ssd_write_buf *swb = ssd->defrag_swb;
if (! swb) {
swb = swb_get(ssd, true);
ssd->defrag_swb = swb;

if (! swb) {
cf_warning(AS_DRV_SSD, "defrag_move_record: couldn't get swb");
cf_mutex_unlock(&ssd->defrag_lock);
return;
}
}

// 这里就是当前ssd挂着的defrag->swb写满的逻辑,会换一块新的wblock来迁
if (write_size > ssd->write_block_size - swb->pos) {
// 满了,往write_q里放,刷盘线程(就run_write线程)会去刷盘
push_wblock_to_write_q(ssd, swb);
as_incr_uint64(&ssd->n_defrag_wblock_writes);

// Get the new buffer.
while ((swb = swb_get(ssd, true)) == NULL) {
...
usleep(10 * 1000);
}

ssd->defrag_swb = swb;
}

memcpy(swb->buf + swb->pos, (const uint8_t*)flat, write_size);

uint64_t write_offset = WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id) + swb->pos;

ssd_encrypt(ssd, write_offset, (as_flat_record *)(swb->buf + swb->pos));

r->file_id = ssd->file_id;
r->rblock_id = OFFSET_TO_RBLOCK_ID(write_offset);
r->n_rblocks = ssd_n_rblocks;

swb->pos += write_size;

as_add_uint64(&ssd->inuse_size, (int64_t)write_size);
as_add_uint32(&ssd->wblock_state[swb->wblock_id].inuse_sz,
(int32_t)write_size);

// 记录一下当前wblock迁往的目标wblock列表
if (swb_add_unique_vacated_wblock(swb, src_ssd->file_id, src_wblock_id)) {
ssd_wblock_state* p_wblock_state =
&src_ssd->wblock_state[src_wblock_id];

as_incr_uint32(&p_wblock_state->n_vac_dests);
}

cf_mutex_unlock(&ssd->defrag_lock);
// 这个函数前面阐述过,只是简单维护一个inuse变量,没有io操作
ssd_block_free(src_ssd, old_rblock_id, old_n_rblocks, "defrag-write");
}

获取一块可用的wblock的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ssd_write_buf *
swb_get(drv_ssd *ssd, bool use_reserve)
{
...
ssd_write_buf *swb;
// 队列中拿缓存,或者分配一个
if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_free_q, &swb, CF_QUEUE_NOWAIT)) {
swb = swb_create(ssd);
swb->rc = 0;
swb->n_writers = 0;
swb->dirty = false;
swb->use_post_write_q = false;
swb->flush_pos = 0;
swb->ssd = ssd;
swb->wblock_id = STORAGE_INVALID_WBLOCK;
swb->pos = 0;
}

// 从free队列中拿,或者直接拿新的,即pristine_wblock_id+1
if (cf_queue_pop(ssd->free_wblock_q, &swb->wblock_id, CF_QUEUE_NOWAIT) !=
CF_QUEUE_OK && ! pop_pristine_wblock_id(ssd, &swb->wblock_id)) {
cf_queue_push(ssd->swb_free_q, &swb);
return NULL;
}
...
swb->rc = 1;

ssd_wblock_state* p_wblock_state = &ssd->wblock_state[swb->wblock_id];

uint32_t inuse_sz = as_load_uint32(&p_wblock_state->inuse_sz);
...
p_wblock_state->swb = swb;
p_wblock_state->state = WBLOCK_STATE_RESERVED;

return swb;
}

ssd-write刷盘线程,不断从swb_write_q中取去待刷盘的buffer进行刷盘操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ssd_start_write_threads(ssds) -> run_write()
// 一个需要关注的点是:刷盘完成才是块释放的信号,一个swb刷盘后,才表示之前碎片的块可以释放重用了,或者当前块可以状态改变为inuse了。
void *
run_write(void *arg)
{
drv_ssd *ssd = (drv_ssd*)arg;

while (ssd->running || cf_queue_sz(ssd->swb_write_q) != 0) {
ssd_write_buf *swb;

if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_write_q, &swb, 100)) {
continue;
}
...
// Flush to the device.
ssd_flush_swb(ssd, swb);

// If this swb was a defrag destination, release the sources.
// 该swb可能是多个wblock的目标迁移块,因此维护好相应的源wblock的元数据信息
swb_release_all_vacated_wblocks(swb);

// 根据当前的swb状态,决定其wblock_state,是需要碎片整理还是直接标记为inuse
ssd_post_write(ssd, swb);
// 需要刷盘的wblock块数减1
as_decr_uint32(&ssd->ns->n_wblocks_to_flush);

} // infinite event loop waiting for block to write

return NULL;
}

每一个刷盘的swb结构,对应于一个wblock,待flush的swb的pos记录了有效数据的末尾地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void
ssd_flush_swb(drv_ssd *ssd, ssd_write_buf *swb)
{
// Clean the end of the buffer before flushing.
// 无效数据清0
memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos);

// Wait for all writers to finish.
// 如果该swb还有线程在写操作,则等待清0再刷盘
while (swb->n_writers != 0) {
as_arch_pause();
}

as_fence_acq();

int fd = ssd_fd_get(ssd);
// 根据wblock_id计算出ssd的offset
off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id);

uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0;
// 可以看到是整个wblock写入
pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)
ssd_fd_put(ssd, fd);
}

刷盘完swb后,如果超过了缓存的swb数量(默认256个wblock),则重新初始化swb并取消与对应wblock的关联

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void
ssd_post_write(drv_ssd *ssd, ssd_write_buf *swb)
{
// 先用post_write_q缓存这部分swb先不释放,相当于作为wblock的缓存使用,减少频繁的碎片回收或ssd磁盘读取
if (swb->use_post_write_q && ssd->ns->storage_post_write_queue != 0) {
// Transfer swb to post-write queue.
cf_queue_push(ssd->post_write_q, &swb);
}
else {
swb_dereference_and_release(ssd, swb);
}

if (ssd->post_write_q) {
// Release post-write queue swbs if we're over the limit.
while (cf_queue_sz(ssd->post_write_q) >
ssd->ns->storage_post_write_queue) {
ssd_write_buf* cached_swb;

if (CF_QUEUE_OK != cf_queue_pop(ssd->post_write_q, &cached_swb,
CF_QUEUE_NOWAIT)) {
// Should never happen.
cf_warning(AS_DRV_SSD, "device %s: post-write queue pop failed",
ssd->name);
break;
}

swb_dereference_and_release(ssd, cached_swb);
}
}
}

wblock数据缓存

aerospike默认的wblock缓存数据较小,以wblock为1M为例,默认只会缓存256个wblock在内存中。其缓存时机为刷盘后不释放。如在

1
ssd_post_write(drv_ssd *ssd, ssd_write_buf *swb)

函数中,刷完盘后并不会立马释放swb,而是判断是否大于了缓存数量大小,如果大于后再释放swb。

而对于读record操作,也是先看该缓存是否存在,不存在才重新从ssd中读record

这里有很细节的地方,没有把整个wblock都读进来导致放大写,而是以ssd的最小io大小做地址对齐,读ssd的整数个page,囊括整个record数据,再通过record_size和起始地址等信息,取出需要的部分。

相当于读操作也是跟ssd底层的按page读原理对应,该读多少读多少,不会放大读。读进来后也不会填充缓存,用完内存直接释放。

从ssd读record代码如下,做了相应的注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
int
ssd_read_record(as_storage_rd *rd, bool pickle_only)
{
as_namespace *ns = rd->ns;
as_record *r = rd->r;
drv_ssd *ssd = rd->ssd;

uint64_t record_offset = RBLOCK_ID_TO_OFFSET(r->rblock_id);
uint32_t record_size = N_RBLOCKS_TO_SIZE(r->n_rblocks);
uint64_t record_end_offset = record_offset + record_size;

uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, record_offset);
...
uint8_t *read_buf = NULL;
as_flat_record *flat = NULL;

ssd_write_buf *swb = NULL;
// 这里便是看wblock_state中的swb是否存在,即刷盘后没释放,如果存在则增加引用计数hold住,直接拿来用即可
swb_check_and_reserve(&ssd->wblock_state[wblock_id], &swb);

// 有缓存数据,那么直接用
if (swb != NULL) {
// Data is in write buffer, so read it from there.
as_incr_uint32(&ns->n_reads_from_cache);

read_buf = cf_malloc(record_size);
flat = (as_flat_record*)read_buf;

int swb_offset = record_offset - WBLOCK_ID_TO_OFFSET(ssd, wblock_id);
memcpy(read_buf, swb->buf + swb_offset, record_size);
swb_release(swb);

ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat);
...
}
// 无缓存数据,以ssd的page地址对齐,读出对应的所有page,再取出record
else {
// 统计信息,读从ssd中了。而不是从cache
as_incr_uint32(&ns->n_reads_from_device);
// 做ssd的page级别的地址对齐,用于direct-io
uint64_t read_offset = BYTES_DOWN_TO_IO_MIN(ssd, record_offset);
uint64_t read_end_offset = BYTES_UP_TO_IO_MIN(ssd, record_end_offset);
size_t read_size = read_end_offset - read_offset;
uint64_t record_buf_indent = record_offset - read_offset;

read_buf = cf_valloc(read_size);

int fd = rd->read_page_cache ? ssd_fd_cache_get(ssd) : ssd_fd_get(ssd);

uint64_t start_ns = ns->storage_benchmarks_enabled ? cf_getns() : 0;
uint64_t start_us = as_health_sample_device_read() ? cf_getus() : 0;
// 可以看到并没有读整个wblock,而是按需读取page
if (! pread_all(fd, read_buf, read_size, (off_t)read_offset)) {
... //错误处理,忽略代码
return -1;
}
...
as_health_add_device_latency(ns->ix, r->file_id, start_us);

if (rd->read_page_cache) {
ssd_fd_cache_put(ssd, fd);
}
else {
ssd_fd_put(ssd, fd);
}

flat = (as_flat_record*)(read_buf + record_buf_indent);
ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat);
...
}
// 后面都是record数据解析,不用关心
rd->flat = flat;
rd->read_buf = read_buf; // no need to free read_buf on error now

as_flat_opt_meta opt_meta = { { 0 } };

// Includes round rblock padding, so may not literally exclude the mark.
// (Is set exactly to mark below, if skipping or decompressing bins.)
rd->flat_end = (const uint8_t*)flat + record_size - END_MARK_SZ;
rd->flat_bins = as_flat_unpack_record_meta(flat, rd->flat_end, &opt_meta);
rd->flat_n_bins = (uint16_t)opt_meta.n_bins;

if (pickle_only) {
if (! as_flat_skip_bins(&opt_meta.cm, rd)) {
cf_warning(AS_DRV_SSD, "{%s} read %s: digest %pD bad bin data",
ns->name, ssd->name, &r->keyd);
return -1;
}

return 0;
}

if (! as_flat_decompress_bins(&opt_meta.cm, rd)) {
cf_warning(AS_DRV_SSD, "{%s} read %s: digest %pD bad compressed data",
ns->name, ssd->name, &r->keyd);
return -1;
}

if (opt_meta.key != NULL) {
rd->key_size = opt_meta.key_size;
rd->key = opt_meta.key;
}
// else - if updating record without key, leave rd (msg) key to be stored.

return 0;
}

日志打印摘录与注释

主要摘录了启动过程到稳定运行过程的日志,方便理解全过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 全盘扫描阶段.device-pcts表示扫描了百分之多少的磁盘了。
Mar 31 2025 05:39:03 GMT: INFO (drv_ssd): (drv_ssd.c:3812) {test} loaded: objects 1196746282 device-pcts (30)
Mar 31 2025 05:39:08 GMT: INFO (drv_ssd): (drv_ssd.c:3812) {test} loaded: objects 1196747454 device-pcts (30)
# 全盘扫描总结,独特数据为UNIQUE,REPLACED表示重复数据并选取了最新的数据的数量,OLDER表示重复数据选取了老数据的数据量
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:2840) device /dev/nvme0n1: read complete: UNIQUE 1196749999 (REPLACED 342376538) (OLDE
R 838422240) (EXPIRED 0) (EVICTED 0) records
# 开始磁盘垃圾回收和碎片整理
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:1024) {test} loading free & defrag queues
# 打印每一个碎片百分比,需要碎片整理的w_block数量。比如下面说(0~1)%实际数据的块有856个。
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:958) /dev/nvme0n1 init defrag profile: 856,1008,1030,1290,3630,2226,2053,1882,2037,310
0,3111,3383,3809,3671,4282,4426,5254,5339,5828,6068,6420,7045,7405,7633,7933,7734,8015,7956,8149,7982,8152,7543,7523,7557,7351,7075,6918,64
57,6267,6216,6306,6129,6199,6555,6815,7419,8191,8917,9542,10305
# 打印当前统计情况。即需回收块数量为free-q=11517,需碎片整理的block数量为defrag-q=287992
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:1043) /dev/nvme0n1 init wblocks: pristine-id 632993 pristine 1464159 free-q 11517, def
rag-q 287992

# 以下开始准备对外服务的各个线程了
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:2305) {test} starting device maintenance threads
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:1654) {test} starting write threads
Mar 31 2025 05:39:12 GMT: INFO (drv_ssd): (drv_ssd.c:878) {test} starting defrag threads
Mar 31 2025 05:39:12 GMT: INFO (as): (as.c:384) initializing services...
Mar 31 2025 05:39:12 GMT: INFO (service): (service.c:166) starting 320 service threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:791) updated fabric published address list to {10.52.0.230:3001}
Mar 31 2025 05:39:12 GMT: INFO (partition): (partition_balance.c:204) {test} 4096 partitions: found 0 absent, 4096 stored
Mar 31 2025 05:39:12 GMT: INFO (smd): (smd.c:2346) no file '/opt/aerospike/smd/UDF.smd' - starting empty
Mar 31 2025 05:39:12 GMT: INFO (batch): (batch.c:821) starting 64 batch-index-threads
Mar 31 2025 05:39:12 GMT: INFO (health): (health.c:327) starting health monitor thread
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:416) starting 8 fabric send threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:430) starting 16 fabric rw channel recv threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:430) starting 4 fabric ctrl channel recv threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:430) starting 4 fabric bulk channel recv threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:430) starting 4 fabric meta channel recv threads
Mar 31 2025 05:39:12 GMT: INFO (fabric): (fabric.c:442) starting fabric accept thread
Mar 31 2025 05:39:12 GMT: INFO (hb): (hb.c:7165) initializing multicast heartbeat socket: 239.1.99.222:9918
Mar 31 2025 05:39:12 GMT: INFO (fabric): (socket.c:825) Started fabric endpoint 0.0.0.0:3001
Mar 31 2025 05:39:12 GMT: INFO (socket): (socket.c:1582) Joining multicast group: 239.1.99.222
Mar 31 2025 05:39:12 GMT: INFO (hb): (hb.c:7199) mtu of the network is 1500
Mar 31 2025 05:39:12 GMT: INFO (hb): (socket.c:1618) Started multicast heartbeat endpoint 0.0.0.0:9918
Mar 31 2025 05:39:12 GMT: INFO (nsup): (nsup.c:197) starting namespace supervisor threads
Mar 31 2025 05:39:12 GMT: INFO (service): (service.c:943) starting reaper thread
Mar 31 2025 05:39:12 GMT: INFO (service): (socket.c:825) Started client endpoint 0.0.0.0:3000
Mar 31 2025 05:39:12 GMT: INFO (service): (service.c:198) starting accept thread
Mar 31 2025 05:39:12 GMT: INFO (as): (as.c:423) service ready: soon there will be cake!
Mar 31 2025 05:39:13 GMT: INFO (nsup): (nsup.c:1009) {test} collecting ttl & object size info ...
Mar 31 2025 05:39:14 GMT: INFO (clustering): (clustering.c:6357) principal node - forming new cluster with succession list: bb9603b99ef9408
Mar 31 2025 05:39:14 GMT: INFO (clustering): (clustering.c:5797) applied new cluster key 248829370ef3
Mar 31 2025 05:39:14 GMT: INFO (clustering): (clustering.c:5799) applied new succession list bb9603b99ef9408
Mar 31 2025 05:39:14 GMT: INFO (clustering): (clustering.c:5801) applied cluster size 1
Mar 31 2025 05:39:14 GMT: INFO (exchange): (exchange.c:2345) data exchange started with cluster key 248829370ef3
Mar 31 2025 05:39:14 GMT: INFO (exchange): (exchange.c:2728) exchange-compatibility-id: self 14 cluster-min 0 -> 14 cluster-max 0 -> 14
Mar 31 2025 05:39:14 GMT: INFO (exchange): (exchange.c:3296) received commit command from principal node bb9603b99ef9408
Mar 31 2025 05:39:14 GMT: INFO (exchange): (exchange.c:3259) data exchange completed with cluster key 248829370ef3
Mar 31 2025 05:39:14 GMT: INFO (partition): (partition_balance.c:1046) {test} replication factor is 1
Mar 31 2025 05:39:14 GMT: INFO (partition): (partition_balance.c:1019) {test} rebalanced: expected-migrations (0,0,0) fresh-partitions 0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:167) NODE-ID bb9603b99ef9408 CLUSTER-SIZE 1 CLUSTER-NAME cakery
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:239) cluster-clock: skew-ms 0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:260) system: total-cpu-pct 164 user-cpu-pct 125 kernel-cpu-pct 39 free-mem-kbytes 52202
552 free-mem-pct 39 thp-mem-kbytes 30720
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:282) process: cpu-pct 157 threads (8,60,401,401) heap-kbytes (75726946,75743760,7577446
4) heap-efficiency-pct 100.0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:292) in-progress: info-q 0 rw-hash 0 proxy-hash 0 tree-gc-q 0 long-queries 0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:316) fds: proto (0,0,0) heartbeat (0,0,0) fabric (0,0,0)
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:325) heartbeat-received: self 67 foreign 0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:351) fabric-bytes-per-second: bulk (0,0) ctrl (0,0) meta (0,0) rw (0,0)
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:410) {test} objects: all 1196749999 master 1196749999 prole 0 non-replica 0
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:474) {test} migrations: complete
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:504) {test} index-usage: used-bytes 76591999936
Mar 31 2025 05:39:22 GMT: INFO (info): (ticker.c:566) {test} data-usage: used-bytes 330368494752 avail-pct 70 cache-read-pct 0.00
# free-wblocks:空闲(可用)块数、write(41,2) swb-buffer刷盘总数41,刷盘速率2.0。defrag-q:当前defrag队列堆积数。defrag-read(287992,14399)碎片整理队列堆积总数(全历史)与队列堆积速率(当前)
Mar 31 2025 05:39:32 GMT: INFO (drv_ssd): (drv_ssd.c:2035) {test} /dev/nvme0n1: used-bytes 330368494752 free-wblocks 1478345 write-q 0 writ
e (41,2.0) defrag-q 285258 defrag-read (287992,14399.6) defrag-write (41,2.0)