memtable解析

外部使用

在主流程recover阶段会重放wal。

重点观察wal的一条record的值的格式,注意每个record由count个kv组成一个WriteBatch。可以看到由seq+类型+key+value组成(del记录没有val)。

WriteBatch中记录的第一个操作使用记录的seq,后面的操作依次seq++。

1
2
3
4
5
6
7
8
9
10
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring |
// kTypeDeletion varstring
// varstring :=
// len: varint32
// data: uint8[len]

往memtable里面插入一个WriteBatch的函数如下,就是插入或者删除而已

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}

Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
...
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
...
}

class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;

void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};

数据结构

整个数据结构代码摘录如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);

MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;

// Increase reference count.
void Ref() { ++refs_; }

// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}

// Returns an estimate of the number of bytes of data in use by this
// data structure. It is safe to call when MemTable is being modified.
size_t ApproximateMemoryUsage();

// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();

// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);

// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);

private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;

struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* a, const char* b) const;
};

// 可以看到核心存储数据结构是跳表,key是"const char*"类型
typedef SkipList<const char*, KeyComparator> Table;

~MemTable(); // private的析构函数,保证不允许创建在栈上,并且外界调用delete

KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
};

整个memtable其实都是围绕跳表在操作,而且可以观察到,没有删除接口,只有AddGet可以用,因为删除操作实际上是Addtype==kTypeDeletion的key值。

看下memtable的Add的实现,结合其比较器的实现,会发现跳表排序的key,实际上是key+seq+type,相同的key,还会按seq排序(当然一定不存在相同的seq)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
// 重点关注,发现排序的key包括了tag部份,也就是seq+type部份。
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
table_.Insert(buf);
}

int MemTable::KeyComparator::operator()(const char* aptr,
const char* bptr) const {
// Internal keys are encoded as length-prefixed strings.
// 拿出上面函数提到的internal_key的Slice
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}

// 发现是按key的字典序升序(或者用户定义顺序),然后key相同的情况下按seq_num降序
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}

此时可以发现,一个memtable会同时存在一个key的多个版本(seq),并且多个版本同时参与排序,对于key的部份按字典序升序排列,对于seq的部份,是反序的,即seq大的在前面!

这里有一个好处,当用迭代器去Seek()的时候,因为Seek()返回的是第一个大于等于搜索key的指针,显然就会返回最大的Seq的key,也就是最新的key。

从memtable的Get()函数就能发现这一点,其只定位了一次,无需遍历这个key的全部版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
// 找到了这个key,返回true且置值
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
// 找到了删除的历史记录
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
// 任何版本都没找到,返回false
return false;
}

memtable刷level0的过程

入口

即解析函数WriteLevel0Table, 在主流程中的recover阶段的最后,会调用该函数并清空memtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long)meta.number);

Status s;
{
mutex_.Unlock();
// 核心函数,遍历memtable,写sst!
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long)meta.number, (unsigned long long)meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);

int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
...
// 往本次的VersionEdit增加新sst文件,level是0,包括该sst的key范围和文件大小
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
}
// 供compaction统计使用,获取此次生成sst文件耗时信息
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}

解析sst的构建过程

这里我使用了AI作为辅助阅读构建sst以及刷盘过程,我觉得它已经讲的非常好了,这里大多数是摘录+自己删减

LevelDB中MemTable刷写到SST文件的过程(也称为MemTable Flush)是将内存中有序的键值对持久化为磁盘上的SSTable(Sorted String Table)的核心流程,由BuildTable函数驱动,TableBuilder负责具体的SST文件结构构建。

一、整体流程概览

MemTable是内存中的有序跳表,当它达到阈值(如写满、Immutable MemTable切换)时,后台线程会触发Flush:

  1. 遍历MemTable的有序键值对;
  2. 通过TableBuilder将数据组织为SST的标准结构(数据块→过滤器块→元索引块→索引块→Footer);
  3. 将构建好的内容写入磁盘文件,更新元数据(FileMetaData)。

BuildTable是流程入口,TableBuilder是SST结构的“构造器”,负责逐块构建并写入文件。

二、BuildTable:Flush的顶层驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst(); // 定位到MemTable的第一个键值对(MemTable的Iterator是有序的)

std::string fname = TableFileName(dbname, meta->number); // 生成SST文件名(如000001.sst)
if (iter->Valid()) { // 若MemTable非空
WritableFile* file;
s = env->NewWritableFile(fname, &file); // 创建可写文件句柄
if (!s.ok()) return s;

TableBuilder* builder = new TableBuilder(options, file); // 创建SST构造器
meta->smallest.DecodeFrom(iter->key()); // 记录SST的最小key(用于后续查找)
Slice key;
for (; iter->Valid(); iter->Next()) { // 遍历MemTable的所有键值对
key = iter->key();
builder->Add(key, iter->value()); // 逐个添加到TableBuilder
}
if (!key.empty()) {
meta->largest.DecodeFrom(key); // 记录SST的最大key
}

// 完成SST构建,获取文件大小
s = builder->Finish();
delete builder;
...
}
return s;
}

核心作用

  • 遍历MemTable的有序数据;
  • 创建SST文件和TableBuilder
  • 触发TableBuilder的构建流程;
  • 完成文件持久化并更新SST的元数据(最小/最大key、文件大小)。

三、TableBuilder:SST结构的具体构建

TableBuilder通过内部的Rep结构体维护构建状态,核心职责是将键值对组织为SST的标准块结构,并写入文件。

1. TableBuilder初始化(Rep结构体)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0), // 文件当前写入偏移
data_block(&options), // 数据块构建器(BlockBuilder)
index_block(&index_block_options), // 索引块构建器
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt.filter_policy)), // 过滤器块(如布隆过滤器)
pending_index_entry(false) { // 是否有待处理的索引项
index_block_options.block_restart_interval = 1; // 索引块的重启点间隔为1(每个索引项都是重启点,加速查找)
}
// ...其他成员:last_key(最后添加的key)、compressed_output(压缩缓存)等
};

TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0); // 初始化过滤器块(第一个数据块的偏移为0)
}
}

初始化关键

  • 创建数据块、索引块、过滤器块的构建器;
  • 初始化文件偏移offset(记录当前写入位置);

2. Add:添加键值对到数据块

Add是将单个键值对加入构建流程的核心方法,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
// 确保key是递增的(MemTable的Iterator保证有序)
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}

// 处理“待处理的索引项”:为上一个数据块生成索引
if (r->pending_index_entry) {
assert(r->data_block.empty()); // 数据块已刷空(Flush后)
// 生成last_key和当前key的“最短分隔符”(优化索引键长度)
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding); // 序列化上一个数据块的BlockHandle(偏移+大小)
r->index_block.Add(r->last_key, Slice(handle_encoding)); // 索引块添加项:[短分隔符key → BlockHandle]
r->pending_index_entry = false;
}

// 向过滤器块添加key(用于后续快速过滤不存在的key)
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}

r->last_key.assign(key.data(), key.size()); // 更新最后一个key
r->num_entries++;
r->data_block.Add(key, value); // 将键值对添加到当前数据块(BlockBuilder处理前缀压缩)

// 若数据块达到阈值(默认4KB),触发Flush(刷写到文件)
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}

核心细节

  • 索引项延迟添加pending_index_entry为true时,说明上一个数据块已Flush,需为其生成索引项。通过FindShortestSeparator生成“最短分隔符key”(如上一个key是the quick,当前key是the who,分隔符是the r),减少索引块的存储空间。
  • 数据块前缀压缩data_block.AddBlockBuilder处理,对连续key的前缀进行压缩(如user:1:nameuser:1:age共享user:1:前缀),降低数据块大小。
  • 过滤器块更新:若配置了FilterPolicy(如布隆过滤器),每个key会被加入过滤器块,用于后续查询时快速判断key是否存在。

3. Flush:刷写当前数据块到文件

当数据块达到阈值(block_size),Add会触发Flush,将数据块写入文件并准备下一个数据块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return; // 数据块为空则跳过
assert(!r->pending_index_entry);

WriteBlock(&r->data_block, &r->pending_handle); // 将数据块写入文件,获取BlockHandle
if (ok()) {
r->pending_index_entry = true; // 标记有“待处理的索引项”(后续为该数据块生成索引)
r->status = r->file->Flush(); // 刷写文件缓冲区(确保数据写入OS缓存)
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset); // 过滤器块记录下一个数据块的偏移
}
}

4. WriteBlock:数据块的压缩与写入

WriteBlock是将BlockBuilder构建好的数据块(含前缀压缩)处理为SST的标准块格式(数据+类型+CRC),并写入文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice raw = block->Finish(); // 完成数据块构建(生成带重启点的原始数据)

Slice block_contents;
CompressionType type = r->options.compression;
// 根据配置选择压缩方式(Snappy/Zstd/无压缩)
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
// 若压缩率>12.5%则使用压缩数据,否则用原始数据
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
block_contents = raw;
type = kNoCompression;
}
break;
}
// ...Zstd压缩逻辑类似
}

WriteRawBlock(block_contents, type, handle); // 写入块内容+trailer(类型+CRC)
r->compressed_output.clear();
block->Reset(); // 重置BlockBuilder,准备下一个数据块
}

void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
// 记录该数据块的BlockHandle(偏移+大小)
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
// 写入块内容到文件
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
// 写入块尾部(trailer):1字节压缩类型 + 4字节CRC校验
char trailer[kBlockTrailerSize]; // kBlockTrailerSize=5
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // CRC覆盖压缩类型
EncodeFixed32(trailer + 1, crc32c::Mask(crc)); // 掩码CRC防止误判
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize; // 更新文件偏移
}
}
}

SST块格式
每个数据块在文件中存储为:
[块内容(压缩/原始)] + [压缩类型(1字节)] + [CRC校验(4字节)]

  • 压缩类型:标记块是否压缩(0=无,1=Snappy,2=Zstd);
  • CRC校验:用于验证块内容的完整性。

5. Finish:完成SST构建,写入元信息

当所有键值对添加完毕后,BuildTable会调用Finish,完成剩余块的写入并生成SST的最终结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush(); // 刷写最后一个数据块
assert(!r->closed);
r->closed = true;

BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

// 1. 写入过滤器块(如布隆过滤器)
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression, &filter_block_handle);
}

// 2. 写入元索引块(Meta Index Block)
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// 元索引项:[filter.过滤器名称 → 过滤器块的BlockHandle]
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
WriteBlock(&meta_index_block, &metaindex_block_handle); // 写入元索引块
}

// 3. 写入索引块(Index Block)
if (ok()) {
if (r->pending_index_entry) { // 处理最后一个数据块的索引项
r->options.comparator->FindShortSuccessor(&r->last_key); // 生成最后一个key的后继key
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, handle_encoding);
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle); // 写入索引块
}

// 4. 写入Footer(文件尾部,固定48字节)
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle); // 元索引块的BlockHandle
footer.set_index_handle(index_block_handle); // 索引块的BlockHandle
std::string footer_encoding;
footer.EncodeTo(&footer_encoding); // 序列化Footer
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size(); // 更新最终文件大小
}
}
return r->status;
}

SST最终结构
通过Finish,SST文件会形成标准结构(与你之前问的文件格式一致):
[数据块1] → [数据块2] → ... → [过滤器块] → [元索引块] → [索引块] → [Footer]

  • Footer:固定48字节,存储元索引块和索引块的BlockHandle,以及魔数(验证文件合法性);
  • 元索引块:指向过滤器块等元数据块;
  • 索引块:指向所有数据块,用于快速定位key所在的数据块。

跳表的并发读写

leveldb实际上用标准库的原子变量实现了无锁的并发读写的跳表。

首先强调,其支持的是1写多读的访问模型,因为由leader统一写入确保了只有一个写线程可以对跳表进行操作,但其读操作是完全无锁的可以多个线程同时读跳表。

分析一下其插入逻辑,其跳表节点的每一层的指针,都是std::atomic<Node*>的,在生成新节点后,对每一层level,先用std::memory_order_relaxed内存序设置好本节点的指针指向下一个节点,再用std::memory_order_release内存序确保其之前的所有写操作不会排到其后面,以保证顺序(即一定是先低层再高层,每一层内部也一定是先设置本节点再设置prev节点)。并逐步对每一层做好设置。

编译器实际上可以这样重排顺序,因为release语义只会保证其之前的读写操作不会排其后面,所以编译器完全可能先设置完本节点所有层的指针指向,再依次从level0到level_max的上一个指针的指向,显然这种对于跳表而言是安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Implementation details follow
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}

Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};

再者,读操作去查找key时采用的是Next(),内存序是std::memory_order_acquire,所以能确保其看到的跳表视图一定是安全的。