/build/testdb (main *%) $ ll total 16 -rw-r--r--. 1 x 34 Nov 26 14:33 000003.log -rw-r--r--. 1 x 16 Nov 26 14:33 CURRENT -rw-r--r--. 1 x 0 Nov 26 14:33 LOCK -rw-r--r--. 1 x 144 Nov 26 14:33 LOG -rw-r--r--. 1 x 50 Nov 26 14:33 MANIFEST-000002
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest){ ... // 创建LOCK文件,保证整个机器只允许一个进程访问 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); ...
// 如果没有文件,那么新创一个db if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { // 新创db的过程,即按照当前的空数据库的现状(如logNumber等),生成一个VersionEdit,并写入manifest // 再创建CURRENT文件指向刚刚的manifest s = NewDB(); } ... } ... // 对于空数据库,显然就是按刚刚的空的manifest恢复,否则按实际的manifest恢复 // 本质上这一步是解析manifest s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } SequenceNumber max_sequence(0);
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, VersionEdit* edit, SequenceNumber* max_sequence){ ... // Open the log file std::string fname = LogFileName(dbname_, log_number); SequentialFile* file; Status status = env_->NewSequentialFile(fname, &file); ... log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); ...
// 读取所有的记录,并应用到memtable std::string scratch; Slice record; WriteBatch batch; int compactions = 0; MemTable* mem = nullptr; while (reader.ReadRecord(&record, &scratch) && status.ok()) { if (record.size() < 12) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; } WriteBatchInternal::SetContents(&batch, record); // 第一次进入时,在此处创建memtable if (mem == nullptr) { mem = newMemTable(internal_comparator_); mem->Ref(); } // 往memtable里面插入此条解析记录 status = WriteBatchInternal::InsertInto(&batch, mem); ... const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; if (last_seq > *max_sequence) { *max_sequence = last_seq; } // memtable占用内存太多时,刷盘到sst level0 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); mem = nullptr; ... } } delete file;
... // recover阶段,都会把memtable最终清空,也就是刷sst level0 if (mem != nullptr) { // mem did not get reused; compact it. if (status.ok()) { *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); } mem->Unref(); }
if (s.ok() && impl->mem_ == nullptr) { // Create new log and a corresponding memtable. uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); if (s.ok()) { edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); impl->mem_ = newMemTable(impl->internal_comparator_); impl->mem_->Ref(); } }
之前也判断过save_manifest最终会为true,则执行以下逻辑
1 2 3 4 5
if (s.ok() && save_manifest) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetLogNumber(impl->logfile_number_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_); }
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu){ ... // 将edit和当前的version情况结合(启动时为空),生成一个新version Version* v = newVersion(this); { Builder builder(this, current_); builder.Apply(edit); builder.SaveTo(v); } Finalize(v);
std::string new_manifest_file; Status s; // 启动时,这里就是nullptr,会直接生成新的manifest if (descriptor_log_ == nullptr) { ... new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); if (s.ok()) { descriptor_log_ = new log::Writer(descriptor_file_); // 将当前的version写入manifest s = WriteSnapshot(descriptor_log_); } }
{ mu->Unlock();
// 再把最新的这条VersionEdit写入 if (s.ok()) { std::string record; edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); } ... }
// 更新CURRENT文件 if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); }