前言 leveldb对外提供了迭代器iterator方法,用于遍历数据,比如以下用法
1 2 3 4 5 6 leveldb::Iterator* it = db->NewIterator (leveldb::ReadOptions ()); for (it->SeekToFirst (); it->Valid (); it->Next ()) { cout << it->key ().ToString () << ": " << it->value ().ToString () << endl; } assert (it->status ().ok ()); delete it;
1 2 3 4 5 for (it->Seek (start); it->Valid () && it->key ().ToString () < limit; it->Next ()) { ... }
其对使用者完全屏蔽了内部极其复杂 的细节,让使用者感觉就像是在一个最简单的有序数组上遍历。
实际上因为lsm-tree结构,读操作特别是扫描操作是性能消耗较大的,需要同时创建mem、imm、level0的全部sst,以及level1~n的其中一个sst的迭代器。用户每次调用Next()时,实际上底层这数十个迭代器都在执行Next(),并找到其中smallest的key,当然这个smallest是将key+seq同时加入排序的,即seq大的会先遍历到。
总的来说,在代码中,迭代器层级如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class DBIter : public Iterator {}; class MergingIterator : public Iterator { ... IteratorWrapper* children_; IteratorWrapper* current_; }; class MemTableIterator : public Iterator {};class TwoLevelIterator : public Iterator {};Iterator* Table::NewIterator (const ReadOptions& options) const { return NewTwoLevelIterator ( rep_->index_block->NewIterator (rep_->options.comparator), &Table::BlockReader, const_cast <Table*>(this ), options); } Iterator* Version::NewConcatenatingIterator (const ReadOptions& options, int level) const { return NewTwoLevelIterator ( new LevelFileNumIterator (vset_->icmp_, &files_[level]), &GetFileIterator, vset_->table_cache_, options); }
整体逻辑解析 迭代器的创建 首先在外层创建迭代器的函数如下,即用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Iterator* DBImpl::NewIterator (const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; Iterator* iter = NewInternalIterator (options, &latest_snapshot, &seed); return NewDBIterator (this , user_comparator (), iter, (options.snapshot != nullptr ? static_cast <const SnapshotImpl*>(options.snapshot) ->sequence_number () : latest_snapshot), seed); }
MergingIterator的创建过程如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Iterator* DBImpl::NewInternalIterator (const ReadOptions& options, SequenceNumber* latest_snapshot, uint32_t * seed) { mutex_.Lock (); *latest_snapshot = versions_->LastSequence (); std::vector<Iterator*> list; list.push_back (mem_->NewIterator ()); mem_->Ref (); if (imm_ != nullptr ) { list.push_back (imm_->NewIterator ()); imm_->Ref (); } versions_->current ()->AddIterators (options, &list); Iterator* internal_iter = NewMergingIterator (&internal_comparator_, &list[0 ], list.size ()); versions_->current ()->Ref (); IterState* cleanup = new IterState (&mutex_, mem_, imm_, versions_->current ()); internal_iter->RegisterCleanup (CleanupIteratorState, cleanup, nullptr ); *seed = ++seed_; mutex_.Unlock (); return internal_iter; }
重点关心sst的各级迭代器如何加入的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void Version::AddIterators (const ReadOptions& options, std::vector<Iterator*>* iters) { for (size_t i = 0 ; i < files_[0 ].size (); i++) { iters->push_back (vset_->table_cache_->NewIterator ( options, files_[0 ][i]->number, files_[0 ][i]->file_size)); } for (int level = 1 ; level < config::kNumLevels; level++) { if (!files_[level].empty ()) { iters->push_back (NewConcatenatingIterator (options, level)); } } }
对于level0的sst迭代器如下,FindTable()函数只是打开与解析sst文件,比如读出各个block的信息,不属于迭代器核心流程,这里不赘述。但值得注意的是,它做了读取和解析操作,因此是比较重的性能操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Iterator* TableCache::NewIterator (const ReadOptions& options, uint64_t file_number, uint64_t file_size, Table** tableptr) { if (tableptr != nullptr ) { *tableptr = nullptr ; } Cache::Handle* handle = nullptr ; Status s = FindTable (file_number, file_size, &handle); ... Table* table = reinterpret_cast <TableAndFile*>(cache_->Value (handle))->table; Iterator* result = table->NewIterator (options); result->RegisterCleanup (&UnrefEntry, cache_, handle); if (tableptr != nullptr ) { *tableptr = table; } return result; }
因此MergingIterator只是包含了全部迭代器的一个vector封装,用于操作所有的child迭代器,并找出其中的smallest返回给上层。
最外层迭代器DBIter也只是MergingIterator迭代器+seq的封装,用于跳过大于传入seq的数据,以及del的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Iterator* NewDBIterator (DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, SequenceNumber sequence, uint32_t seed) { return new DBIter (db, user_key_comparator, internal_iter, sequence, seed); } DBIter (DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s, uint32_t seed) : db_ (db), user_comparator_ (cmp), iter_ (iter), sequence_ (s), direction_ (kForward), valid_ (false ), rnd_ (seed), bytes_until_read_sampling_ (RandomCompactionPeriod ()) {}
迭代器的具体实现 Next()的实现 从最外层迭代器调用Next()开始往下查看实现,为了简单,只关心一直一个方向的遍历,不能反向转正向遍历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void DBIter::Next () { assert (valid_); ... SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); iter_->Next (); if (!iter_->Valid ()) { valid_ = false ; saved_key_.clear (); return ; } FindNextUserEntry (true , &saved_key_); }
检查即跳过删除操作,重复的key等,不断的调用iter_->Next(),直到找到真的用户所要的key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void DBIter::FindNextUserEntry (bool skipping, std::string* skip) { assert (iter_->Valid ()); assert (direction_ == kForward); do { ParsedInternalKey ikey; if (ParseKey (&ikey) && ikey.sequence <= sequence_) { switch (ikey.type) { case kTypeDeletion: SaveKey (ikey.user_key, skip); skipping = true ; break ; case kTypeValue: if (skipping && user_comparator_->Compare (ikey.user_key, *skip) <= 0 ) { } else { valid_ = true ; saved_key_.clear (); return ; } break ; } } iter_->Next (); } while (iter_->Valid ()); saved_key_.clear (); valid_ = false ; }
最外层的DBIter的Next()逻辑基本清楚了,深入到底层MergingIterator的Next(),代码相当简单,即推进current_指向的迭代器的Next()即可,因为current_指向的迭代器的key值,是当前所有迭代器中最小的,因此只需要推进最小的那个值即可,类似归并排序。
1 2 3 4 5 6 7 8 class MergingIterator : public Iterator { void Next () override { assert (Valid ()); ... current_->Next (); FindSmallest (); } };
每次推进current_后都需要更新最新的current_指向当前最小的key的迭代器。所以可以注意到,key可能重复也可能带有del操作,不过这些都由上层迭代器DBIter处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void MergingIterator::FindSmallest () { IteratorWrapper* smallest = nullptr ; for (int i = 0 ; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child->Valid ()) { if (smallest == nullptr ) { smallest = child; } else if (comparator_->Compare (child->key (), smallest->key ()) < 0 ) { smallest = child; } } } current_ = smallest; }
Prev()的实现 MergingIterator的Prev()实现跟Next()一模一样,不赘述。
对于DBiter的Prev()实现摘录如下,比较简单,不赘述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void DBIter::Prev () { ... FindPrevUserEntry (); } void DBIter::FindPrevUserEntry () { assert (direction_ == kReverse); ValueType value_type = kTypeDeletion; if (iter_->Valid ()) { do { ParsedInternalKey ikey; if (ParseKey (&ikey) && ikey.sequence <= sequence_) { if ((value_type != kTypeDeletion) && user_comparator_->Compare (ikey.user_key, saved_key_) < 0 ) { break ; } value_type = ikey.type; if (value_type == kTypeDeletion) { saved_key_.clear (); ClearSavedValue (); } else { Slice raw_value = iter_->value (); if (saved_value_.capacity () > raw_value.size () + 1048576 ) { std::string empty; swap (empty, saved_value_); } SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); saved_value_.assign (raw_value.data (), raw_value.size ()); } } iter_->Prev (); } while (iter_->Valid ()); } if (value_type == kTypeDeletion) { valid_ = false ; saved_key_.clear (); ClearSavedValue (); direction_ = kForward; } else { valid_ = true ; } }
最后提一句,为什么说Prev()的性能比Next()稍差。主要还是memtable的跳表结构没有反向指针,其执行prev的时候,实际上会从头找,时间复杂度是log(n)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 template <typename Key, class Comparator >inline void SkipList<Key, Comparator>::Iterator::Prev () { assert (Valid ()); node_ = list_->FindLessThan (node_->key); if (node_ == list_->head_) { node_ = nullptr ; } } template <typename Key, class Comparator >typename SkipList<Key, Comparator>::Node*SkipList<Key, Comparator>::FindLessThan (const Key& key) const { Node* x = head_; int level = GetMaxHeight () - 1 ; while (true ) { assert (x == head_ || compare_ (x->key, key) < 0 ); Node* next = x->Next (level); if (next == nullptr || compare_ (next->key, key) >= 0 ) { if (level == 0 ) { return x; } else { level--; } } else { x = next; } } }