前言

本文主要是研究leveldb的如何最大吞吐化并发写操作的,即假如多个线程同时执行Put操作时,leveldb内部是如何处理的。

先给结论:leveldb是通过组提交的方式,一批写同时刷新memtable和wal的,即一批写操作到来时,会用一个队列进行排队,队列中的第一个成员会成为这批写操作的leader,leader通过在锁内聚合这一批WriteBatch,然后锁外将刚刚生成的WriteBatch一次写wal和memtable。最终再持有锁,在锁内依次将刚刚刷完的写操作置为状态done,并从队列释放,最终唤醒下一个还未写wal和memtable的写操作,作为下一次组提交的leader。

源码导读

需要重点关注锁,即哪里加锁与哪里解锁了。

首先是总入口,每个写操作会组成一个Writer插入到deque队列writers_里面,对于非队列首位的writer,都会等待在自身的条件变量上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// 将当前的写操作组成Writer
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 类似std的lock_guard,加锁操作deque队列writers
MutexLock l(&mutex_);
writers_.push_back(&w);
// 若是新请求且非队首,直接等待
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}

if (w.done) {
return w.status;
}

// 查看当前的memtable是否还有空间可以插入,若没有,将memtable变成imm,生成一个新的memtable,并触发compaction
// 此函数全程持有锁
Status status = MakeRoomForWrite(updates == nullptr);

uint64_t last_sequence = versions_->LastSequence();
// w是此次的leader(队首)
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) {
// 将本次队列的全部写操作复制到一个write_batch中,并设置好last_writer指向队列的最后一个写操作
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
// 维护seq
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

//将此次聚合的write_batch写wal和mem,写的过程不持有锁,因此新的写操作会插入到队尾
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
// 重新拿回锁,用于后续清理队列
mutex_.Lock();
...
}
// 释放掉此处临时生成的write_batch
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}
// 本循环依次唤醒写完成的线程
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// 如果队列还有请求,唤醒第一个作为下一次组提交的leader
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

写memtable前检查空间的函数如下,重点关注其memtable变成imm,并触发compaction的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
// 此时空间是够用的,直接返回即可
if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
break;
}
// 此时imm_不为空,加上上一步判断memtable已经满了,意味着写太快了,前一个compaction还没有完成,先阻塞等待
else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
}
// level0层的sst文件太多了,阻塞等待compaction完成
else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
}
else {
// 将mem变成imm,触发compaction
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;

s = logfile_->Close();
...
delete logfile_;

logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}

是否需要compaction的函数如下,很简单,值得强调的是,leveldb实现简单,自己会生成仅一个后台线程来做compaction,不允许用户给线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
// 很显然当imm_不为空时,就会触发compaction
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}

构建聚合WriteBatch函数如下,很简单,只是依次复制队列中每个WriteBatch到临时的一个WriteBatch中,维护好last_writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);

size_t size = WriteBatchInternal::ByteSize(first->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}

*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}

// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

心得

其实这种leader的组提交的方式几乎已经成为写盘的标准实现方式了,我在公司自研的哈希引擎中看到也是这么实现的,猜想rocksdb估计也跟这个思想是一致的。总的来说可以最大化提高写吞吐,但是又不会sleep等待增大延迟。