前言

leveldb对外提供了迭代器iterator方法,用于遍历数据,比如以下用法

1
2
3
4
5
6
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
cout << it->key().ToString() << ": " << it->value().ToString() << endl;
}
assert(it->status().ok()); // Check for any errors found during the scan
delete it;
1
2
3
4
5
for (it->Seek(start);
it->Valid() && it->key().ToString() < limit;
it->Next()) {
...
}

其对使用者完全屏蔽了内部极其复杂的细节,让使用者感觉就像是在一个最简单的有序数组上遍历。

实际上因为lsm-tree结构,读操作特别是扫描操作是性能消耗较大的,需要同时创建memimmlevel0的全部sst,以及level1~n的其中一个sst的迭代器。用户每次调用Next()时,实际上底层这数十个迭代器都在执行Next(),并找到其中smallest的key,当然这个smallest是将key+seq同时加入排序的,即seq大的会先遍历到。

总的来说,在代码中,迭代器层级如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DBIter : public Iterator {}; // 最外层抽象,用户直接使用

// 实际操作数据的归并迭代器
class MergingIterator : public Iterator {
...
IteratorWrapper* children_; //实际上是一个vector数组,包含了其下的所有迭代器
IteratorWrapper* current_; // 其中的,指向当前实际数据的迭代器指针。比如key 1 3 4 7分布在两个迭代器中,之前指向3,调用Next()应该指向4,那么那个指向4的迭代器指针就是这个变量
};

/* mem迭代器,用于遍历memtable,对于mem是在children的0号下标,imm在1号下标,level0次之,更高层的继续往后 */
class MemTableIterator : public Iterator {};

// 遍历sst的双层迭代器,外层是index_block用于定位block块位置,内层在block内部遍历
class TwoLevelIterator : public Iterator {};
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}
// level1~n的双层迭代器,即依次遍历sst file第一层,每个sst内部又是前文提到的两层
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator,
vset_->table_cache_, options);
}

整体逻辑解析

迭代器的创建

首先在外层创建迭代器的函数如下,即用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
// 创建MergingIterator
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
// 结合MergingIterator和用户传过来的seq创建最外层给用户的DBIter
// 因此iter创建时,实际上就已经创建了一个快照,后面扫数据是不会扫到在此之后写入的数据的
return NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
}

MergingIterator的创建过程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {
// 可以看到几乎全程持有锁,所以创建iterator实际上是比较重的操作,因为后面还涉及文件读取与解析
mutex_.Lock();
// 拿到当前的最大seq
*latest_snapshot = versions_->LastSequence();

// Collect together all needed child iterators
std::vector<Iterator*> list;
// 将mem与imm的迭代器加入数组
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != nullptr) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
// 创建sst各层的迭代器,并加入数组
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();

// 内存与数据清理函数注册,用于用户delete迭代器时,能够自动释放各级资源
IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);

*seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}

重点关心sst的各级迭代器如何加入的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// level0的全部sst都会创建迭代器,因为数据重叠
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
}

// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
// level1~n的双层迭代器,本文前言中有提到
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}

对于level0的sst迭代器如下,FindTable()函数只是打开与解析sst文件,比如读出各个block的信息,不属于迭代器核心流程,这里不赘述。但值得注意的是,它做了读取和解析操作,因此是比较重的性能操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number, uint64_t file_size,
Table** tableptr) {
if (tableptr != nullptr) {
*tableptr = nullptr;
}

Cache::Handle* handle = nullptr;
// 这里先从lru缓存中找有没有缓存此文件,如果没有会读出文件并全部解析,最终放到lru中
Status s = FindTable(file_number, file_size, &handle);
...
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
// 最终返回值即前言提到的具体的双层级迭代器
Iterator* result = table->NewIterator(options);
// 注册清理函数,用于lru清理
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != nullptr) {
*tableptr = table;
}
return result;
}

因此MergingIterator只是包含了全部迭代器的一个vector封装,用于操作所有的child迭代器,并找出其中的smallest返回给上层。

最外层迭代器DBIter也只是MergingIterator迭代器+seq的封装,用于跳过大于传入seq的数据,以及del的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
Iterator* internal_iter, SequenceNumber sequence,
uint32_t seed) {
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
}

DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
uint32_t seed)
: db_(db),
user_comparator_(cmp),
iter_(iter),
sequence_(s),
direction_(kForward),
valid_(false),
rnd_(seed),
bytes_until_read_sampling_(RandomCompactionPeriod()) {}

迭代器的具体实现

Next()的实现

从最外层迭代器调用Next()开始往下查看实现,为了简单,只关心一直一个方向的遍历,不能反向转正向遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void DBIter::Next() {
assert(valid_);
...
// 将当前遍历到的key保存到saved_key_中,主要是判断下一个key是不是跟当前key是一样的,比如del操作和更老版本的key
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);

// 直接调用其MergingIterator的Next
iter_->Next();
if (!iter_->Valid()) {
valid_ = false;
saved_key_.clear();
return;
}
// 检查是不是真的是Next-key
FindNextUserEntry(true, &saved_key_);
}

检查即跳过删除操作,重复的key等,不断的调用iter_->Next(),直到找到真的用户所要的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(direction_ == kForward);
do {
ParsedInternalKey ikey;
// seq比传入大的,都会直接跳过
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
switch (ikey.type) {
case kTypeDeletion:
// 跳过所有del的key
SaveKey(ikey.user_key, skip);
skipping = true;
break;
case kTypeValue:
// 跳过所有重复的key
if (skipping &&
user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
} else {
valid_ = true;
saved_key_.clear();
return;
}
break;
}
}
iter_->Next();
} while (iter_->Valid());
saved_key_.clear();
valid_ = false;
}

最外层的DBIterNext()逻辑基本清楚了,深入到底层MergingIteratorNext(),代码相当简单,即推进current_指向的迭代器的Next()即可,因为current_指向的迭代器的key值,是当前所有迭代器中最小的,因此只需要推进最小的那个值即可,类似归并排序。

1
2
3
4
5
6
7
8
class MergingIterator : public Iterator {
void Next() override {
assert(Valid());
...
current_->Next();
FindSmallest();
}
};

每次推进current_后都需要更新最新的current_指向当前最小的key的迭代器。所以可以注意到,key可能重复也可能带有del操作,不过这些都由上层迭代器DBIter处理了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void MergingIterator::FindSmallest() {
IteratorWrapper* smallest = nullptr;
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child->Valid()) {
if (smallest == nullptr) {
smallest = child;
} else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
smallest = child;
}
}
}
current_ = smallest;
}

Prev()的实现

MergingIteratorPrev()实现跟Next()一模一样,不赘述。

对于DBiterPrev()实现摘录如下,比较简单,不赘述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void DBIter::Prev() {
...
FindPrevUserEntry();
}
void DBIter::FindPrevUserEntry() {
assert(direction_ == kReverse);

ValueType value_type = kTypeDeletion;
if (iter_->Valid()) {
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
// 找到了前一个非del的比当前key小的key了,可以返回
if ((value_type != kTypeDeletion) &&
user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
// We encountered a non-deleted value in entries for previous keys,
break;
}
value_type = ikey.type;
if (value_type == kTypeDeletion) {
saved_key_.clear();
ClearSavedValue();
} else {
Slice raw_value = iter_->value();
if (saved_value_.capacity() > raw_value.size() + 1048576) {
std::string empty;
swap(empty, saved_value_);
}
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
saved_value_.assign(raw_value.data(), raw_value.size());
}
}
iter_->Prev();
} while (iter_->Valid());
}

if (value_type == kTypeDeletion) {
// End
valid_ = false;
saved_key_.clear();
ClearSavedValue();
direction_ = kForward;
} else {
valid_ = true;
}
}

最后提一句,为什么说Prev()的性能比Next()稍差。主要还是memtable的跳表结构没有反向指针,其执行prev的时候,实际上会从头找,时间复杂度是log(n)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = nullptr;
}
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}