leveldb源码学习(2)-启动过程

启动函数Open()解析

demo

首先写一个如下的测试文件,打开一个全新的db,并简单写入一个key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <cassert>
#include "leveldb/db.h"

int main() {
leveldb::DB* db;

{
leveldb::Options options;
options.create_if_missing = true;
options.compression = leveldb::kNoCompression;
leveldb::Status status = leveldb::DB::Open(options, "testdb", &db);
assert(status.ok());
}

{
leveldb::WriteOptions options;
leveldb::Status status = db->Put(options, "[Key]", "[Value]");
assert(status.ok());
}

delete db;
}

执行完毕后,可以看到,存在五个文件,其含义已经在前一篇文章中有叙述

1
2
3
4
5
6
7
8
/build/testdb (main *%)
$ ll
total 16
-rw-r--r--. 1 x 34 Nov 26 14:33 000003.log
-rw-r--r--. 1 x 16 Nov 26 14:33 CURRENT
-rw-r--r--. 1 x 0 Nov 26 14:33 LOCK
-rw-r--r--. 1 x 144 Nov 26 14:33 LOG
-rw-r--r--. 1 x 50 Nov 26 14:33 MANIFEST-000002

主流程走读

后续会对代码进行大量精简,只保留核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
DBImpl* impl = new DBImpl(options, dbname);
VersionEdit edit;
bool save_manifest = false
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
...
}

recover

new DBImpl这个构造函数只是做了内部变量的初始化,生成相应的日志文件而已。

重点在于 impl->Recover(&edit, &save_manifest)这一段,这里对于空数据库(目录下是空的)会创一个新的,对于已经存在的数据库将执行recover过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
...
// 创建LOCK文件,保证整个机器只允许一个进程访问
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
...

// 如果没有文件,那么新创一个db
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
// 新创db的过程,即按照当前的空数据库的现状(如logNumber等),生成一个VersionEdit,并写入manifest
// 再创建CURRENT文件指向刚刚的manifest
s = NewDB();
}
...
}
...
// 对于空数据库,显然就是按刚刚的空的manifest恢复,否则按实际的manifest恢复
// 本质上这一步是解析manifest
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);

const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
// 拿到这个目录下的全部文件
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
// 刚刚recover后,我们从manifest中知道了哪些文件是必须存在的
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
// 解析所有文件,拿出所有的WAL文件,按名字排序
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
// 如果缺失了文件,则数据库打开必须失败
if (!expected.empty()) {
char buf[50];
std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// 对wal进行重放恢复
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
...
}
...
return Status::OK();
}

重点看看怎么解析的manifest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
Status VersionSet::Recover(bool* save_manifest) {
...

// 从current文件中拿出现在的manifest
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
...
current.resize(current.size() - 1);

std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
...

bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
int read_records = 0;

{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/,
0 /*initial_offset*/);
Slice record;
std::string scratch;
// 注意manifest的格式和wal(.log)文件是一致的,这里依次的读出所有的VersionEdit
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
++read_records;
VersionEdit edit;
s = edit.DecodeFrom(record);

...
if (s.ok()) {
//依次的应用这些VersionEdit,得到一个当前的版本Version
builder.Apply(&edit);
}

if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}

if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}

if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = nullptr;
...

if (s.ok()) {
// 最终得到一个当前版
Version* v = new Version(this);
builder.SaveTo(v);
Finalize(v);
// 插入到版本双向链表中,并让current_版本指标指向这个新的
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;

...
// 可以看到这里后续会生成一个新的manifest,所以文件目录下是MANIFEST-000002
*save_manifest = true;
}
...
return s;
}

看看重放wal的过程做了什么,注意对于新数据库,实际上是没有wal文件的,这里不会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
...
// Open the log file
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
...
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
...

// 读取所有的记录,并应用到memtable
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
// 第一次进入时,在此处创建memtable
if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
// 往memtable里面插入此条解析记录
status = WriteBatchInternal::InsertInto(&batch, mem);
...
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
// memtable占用内存太多时,刷盘到sst level0
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
...
}
}
delete file;

...
// recover阶段,都会把memtable最终清空,也就是刷sst level0
if (mem != nullptr) {
// mem did not get reused; compact it.
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
mem->Unref();
}

return status;
}

recover后续

重新回到Open()函数,对于空数据库,之前是没有初始化memtable指针的,这里会做初始化,并创建新的wal文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}

之前也判断过save_manifest最终会为true,则执行以下逻辑

1
2
3
4
5
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}

重点关注LogAndApply(),应用edit,生成新version

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
...
// 将edit和当前的version情况结合(启动时为空),生成一个新version
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);

std::string new_manifest_file;
Status s;
// 启动时,这里就是nullptr,会直接生成新的manifest
if (descriptor_log_ == nullptr) {
...
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
// 将当前的version写入manifest
s = WriteSnapshot(descriptor_log_);
}
}

{
mu->Unlock();

// 再把最新的这条VersionEdit写入
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
...
}

// 更新CURRENT文件
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

// 修改cur版本指针到最新版本
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
...
}
return s;
}

最后清空旧文件,也就是manifest中没有记录的文件,以及判断是否需要compaction。逻辑简单,不再详细叙述。

1
2
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();