MIT 6.s081 Lab8: locks

本次 lab 是要重新设计 xv6 的内存分配器和块缓存来获取更高的并行性。要增加并行性通常要设计到更改数据结构和上锁策略来减少竞态。

Memory allocator (moderate)

原本的 allocator 中只有一个单独的 freelist,也就是说 page 的分配和释放都会频繁的产生竞争。所以给每个 CPU 设置一个单独的 freelist 和一个单独的锁。所以进程在不同的 CPU 上申请和分配 page 就可以并行运行了。如果当前 CPU 上 freelist 的为空的时候,就要去别的 CPU 那里偷取 page,这个时候依旧会产生竞争。

代码实现

首先修改 CPU 结构体,在每个 CPU 中加入一个之前的 kmem,让每个 CPU 可以单独的分配内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct run {
struct run *next;
};

struct kmem {
struct spinlock lock;
struct run *freelist;
};

// Per-CPU state.
struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
struct kmem kmem;
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};

通过 NCPU 参数可以知道,有 8 个 CPU,所以在初始化 alloctor 的时候将 freepage 分成 8 份,分别放入 8 个 CPU 的 kmem 中。其实也可以初始化的时候都放到一个 CPU 里,然后其它 CPU 去偷。但是我感觉这样会有一个冷启动的过程,不如在初始化的时候直接先分配了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
kinit()
{
uint64 offset = (PHYSTOP - (uint64)end)/8;
uint64 left, right;
left = (uint64) end;
right = left + offset;
for (int i = 0; i < NCPU; i++) {
struct cpu *c = &cpus[i];
initlock(&c->kmem.lock, "kmem");
char *p;
p = (char*)PGROUNDUP(left);
for(; p + PGSIZE <= (char*)right; p += PGSIZE) {
struct run *r;

if(((uint64)p % PGSIZE) != 0 || (char*)p < end || (uint64)p >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(p, 1, PGSIZE);
r = (struct run*)p;
r->next = c->kmem.freelist;
c->kmem.freelist = r;
}
left = right;
right = right + offset;
}
}

然后修改 kfree,在释放的时候申请当前 CPU 的 kmem 锁,将 page 释放到当前 CPU 的 freelist 中即可,注意开关中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void
kfree(void *pa)
{
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

push_off();

struct cpu *c = mycpu();

acquire(&c->kmem.lock);
r->next = c->kmem.freelist;
c->kmem.freelist = r;
release(&c->kmem.lock);

pop_off();
}

最后是 kalloc,在分配的时候由于当前 CPU 的 freelist 可能已经为空了,那么就需要去其它 CPU 那里偷。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void *
kalloc()
{
push_off();
struct run *r;
int cid = cpuid();
struct cpu *c = &cpus[cid];

acquire(&c->kmem.lock);
r = c->kmem.freelist;
if(r) {
c->kmem.freelist = r->next;
release(&c->kmem.lock);
} else {
release(&c->kmem.lock);
for (int i = 0; i < NCPU; i++) {
if (i == cid) continue;
struct cpu *nc = &cpus[i];
acquire(&nc->kmem.lock);
r = nc->kmem.freelist;
if (r) {
acquire(&c->kmem.lock);
nc->kmem.freelist = r->next;
release(&c->kmem.lock);
release(&nc->kmem.lock);
break;
} else {
release(&nc->kmem.lock);
continue;
}
}
}

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
pop_off();
return (void*)r;
}

去其它 CPU 那里偷的时候,由于要获取其它 CPU 的锁,所以会产生竞争。还有一种情况就是当前进程持有当前 CPU 的锁去偷其它 CPU 的时候,那个被偷的 CPU 可能也正在尝试偷当前 CPU,这样就会产生死锁。

可以选择在去别的 CPU 那里偷之前把当前 CPU 的锁释放掉,这样就不会产生死锁,破坏了死锁的必要条件。我在大佬的博客上看到说如果这个时候释放了锁,就会导致重复偷取 page。我想了一下,如果说在释放当前 CPU 的 kmem 锁之后,该进程被调度走了,另外一个进程又过来执行 kalloc,也发现当前 CPU 的 freelist 是空的,也会进行偷取。但是我感觉当前已经是关中断了,那么时钟中断也会被屏蔽,当前进程就不会被调度了,所以应该不会产生这种情况吧。这一块没有太懂,不过能通过测试,暂时先这样。

Buffer cache (hard)

这部分要做的事情跟第一部分的目的是一样的,修改 bcache 结构体,并降低锁的粒度。将原本的用一个大锁锁住整个双向链表的设计,拆分成一个 Hash Table,每个 bucket 一个锁,那么就可以使得访问不同 bucket 的进程并行访问 buffer cache。

代码实现

根据 hints,最基本的思路是很容易想到的。将 buffer 以哈希表的形式组织,bucket 的数量为 13 个,每个 bucket 需要一把锁。并且把原本的双向链表设计去掉,原本的双向链表是为了实现 LRU,但现在我们通过给每个 buffer 记录时间戳来实现 LRU,时间戳就为 kernel/trap.c 中的 ticks。

所以 bcache 结构改为以下形式:

1
2
3
4
5
6
7
struct {
struct spinlock lock;
struct buf buf[NBUF];

struct buf buckets[NBUCKET];
struct spinlock bucketlocks[NBUCKET];
} bcache;

在 buf 结构体中去掉 prev 指针,并加入 timestamp 字段,用于实现 LRU:

1
2
3
4
5
6
7
8
9
10
11
12
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
uint timestamp;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};

这样一来就不需要维护双向链表了,binit 的实现也很简单,只需要初始化锁并且将 bcache.buf 中的所有 buffer 都放进 bcache.buckets[0],并初始化每个 buf 的 sleeplock 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
binit(void)
{
struct buf *b;

initlock(&bcache.lock, "bcache");

int i;
for (i = 0; i < NBUCKET; i++) {
initlock(&bcache.bucketlocks[i], "bcache");
}

for (b = bcache.buf; b < bcache.buf+NBUF; b++) {
initsleeplock(&b->lock, "buffer");
b->next = bcache.buckets[0].next;
bcache.buckets[0].next = b;
}
}

在我最开始的实现中,bcache.lock 是用不上的(所以也没跑出正确答案)。但是根据 Hints:

Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.

是需要上两把锁的,具体使用场景到后面再看。

由于我们现在不需要维护双向链表,在 brelse 的时候直接释放 buf 的 sleep 的 lock,然后再获取对应的 bucket 的 lock,将 refcnt– 即可,如果 refcnt == 0,则将当前的 ticks 设置为 buf 的 timestamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

int key = HASH_DEV_BLOCKNO(b->dev, b->blockno);
acquire(&bcache.bucketlocks[key]);
b->refcnt--;
if (b->refcnt == 0) {
b->timestamp = ticks;
}

release(&bcache.bucketlocks[key]);
}

同样的修改 bpin 和 bunpin,这里就不贴出来了。

bget

整个 bget 是最折磨的地方,多线程场景下的问题还是太难发现了。

最开始我没有意识到问题的严重性,按照最初的思路,写了第一版代码,逻辑如下:

  1. 获取 dev 和 blockno 对应的 bucket 的锁;
  2. 如果已经有缓存了就直接返回 buf,否则执行 3;
  3. 在所有的 bucket 中找出一个 refcnt 为 0,并且最近最久未使用的 buf。在这个过程中需要对 bucket 上锁,遍历完一个 bucket 之后,释放它的锁;
  4. 如果找到了这样的 buf,就申请它所在的 bucket 锁并进行 eviction,将它从原本的 bucket 中移除,否则直接 panic;
  5. 将找到的 buf 插入 dev 和 blockno 对应的 bucket 中,并设置 buf 的值;
  6. 释放 dev 和 blockno 对应的 bucket 的锁;
  7. 获取找到的 buf 的 sleeplock;
  8. 返回 buf;

这样乍一看是没什么问题,但是事实并非如此。

首先一个容易注意到的点就是,会产生死锁。由于我们在发现没有缓存的时候,并没有释放刚刚获取的 bucket 的锁,然后就开始在所有 bucket 中找一个可以被 evict 的 buf,这时候要获取其它 bucket 的锁。那么如果一个进程 A 持有 bucket1 的锁,又去获取 bucket 2 的锁,但是进程 B 又持有 bucket2 的锁去获取 bucket1 的锁时,就产生了死锁。

所以在发现我们要找的 buf 并没有在缓存中时,要先释放当前持有的 bucket 的锁,再去进行 eviction。

第二个问题就是,当我们找到了那个可以被 evict 的 buf 时,将锁释放掉了,正式进行 evict 时,又去申请锁,但是在释放锁到重新申请锁的这个间隙,可能有其它进程又引用了刚刚找出来的那个 buf,使得它的 refcnt 不为 1,这个时候将其 evict 掉就会发生错误。

解决方案就是,在找出可以 evict 的 buf 后,不释放对应 bucket 的锁,而是直到 evict 之后再释放。那么其它进程在刚开始获取缓存的时候就会阻塞,因为它获取不到这个 bucket 的锁,待到 ecivt 结束后,它才能去查看 bucket,这时候它就看不到那个被 evict 的 buf 了。

完成以上两个修改之后,bcachetest 已经能够通过了。但是如果执行 usertests,第一个 manywrites 就无法通过,报的错误是 panic: freeing free block,即释放了一个原本已经释放的 block cache。而这种重复释放的原因,肯定就是同一个 block 被缓存了多次。

下面的思路来自于Miigon

假设当前有两个进程同时访问同样缓存块,在第一个进程获取到对应的 bucket 锁后,发现不存在缓存,就释放了锁,进入寻找可 evict 的 buf 阶段。这时,第二个进程同样能够获取对应 bucket 的锁,并同样发现缓存不存在,也进入寻找可 evict 的 buf 阶段。

这个时候完全有可能发生的是,它们找到了两个不同的 buf,并且将它们 evict 掉之后都插入了它们要找的 block 对应的 bucket 中。这时就出现了同一块 block 有多个 cache 的情况。最后也就会触发 freeing free block。

这时候前文提到的那个 hint 就要回收了,再看一遍:

Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.

再结合前一条:

It is OK to serialize eviction in bget (i.e., the part of bget that selects a buffer to re-use when a lookup misses in the cache).

现在想想其实就是暗示我们在寻找并 evict 可用 buf 的时候将整个流程串行化,保留最开始查看是否有缓存的并行。并且缓存丢失的概率一般来说都是非常低的,所以后面的串行化造成的性能损失其实是可以接受的。

所以现在利用上 bcache.lock,在发现缓存不存在释放掉对应 bucket 的锁之后,立刻获取 bcache.lock 这把大锁。由于在释放对应 bucket 锁到获取 bcache.lock 期间,可能有别的进程已经完成了对我们要找的 block 设置 cache,所以在获取完 bcache.lock 后,再进行一次查找缓存,如果发现已经存在了,就直接返回对应的 buf。

这样一来,就算有多个进程同时进入 bget,也只有第一个进程可以获取到 bcache.lock,并完成缓存的设置,后面的进程都会被获取完锁后的第二轮查找缓存给拦住。这样就避免了一个 block 被缓存多次。

下面是完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

int key = HASH_DEV_BLOCKNO(dev, blockno);

acquire(&bcache.bucketlocks[key]);

// Is the block already cached?
for (b = bcache.buckets[key].next; b; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
b->refcnt++;
release(&bcache.bucketlocks[key]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.bucketlocks[key]);
// 一定要先释放再去获取 bcache.lock
// 否则如果另一个进程持有 bcache.lock,再在下面获取 bucketlocks[key] 就会死锁
acquire(&bcache.lock);

for (b = bcache.buckets[key].next; b; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
acquire(&bcache.bucketlocks[key]);
b->refcnt++;
release(&bcache.bucketlocks[key]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
int i, bidx = -1, flag = 0;
struct buf *tmp = 0;
for (i = 0; i < NBUCKET; i++) {
acquire(&bcache.bucketlocks[i]);
for (b = &bcache.buckets[i]; b->next; b = b->next) {
if (b->next->refcnt == 0 && (!tmp || b->next->timestamp > tmp->next->timestamp)) {
tmp = b;
flag = 1;
}
}
if (!flag) {
release(&bcache.bucketlocks[i]); // 如果没有找到 buf 就释放锁
} else {
// 如果找到了新 buf,那就释放之前找到的 buf 的 bucket 锁,并保留当前 bucket 的锁
flag = 0;
if (bidx != -1) release(&bcache.bucketlocks[bidx]);
bidx = i;
}
}

if (!tmp) {
panic("bget: no buffers");
}

b = tmp->next;
tmp->next = b->next;
release(&bcache.bucketlocks[bidx]); // 进行 evict 后再释放锁

acquire(&bcache.bucketlocks[key]);
b->next = bcache.buckets[key].next;
bcache.buckets[key].next = b;
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.bucketlocks[key]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}

测试结果

总结

多线程场景下的问题真的很难搞,感觉在业务场景下,用一些常见的模式还没有那么难。到系统编程的场景下,这些问题都要自己考虑,解决一个问题之后可能又产生另一个问题,我自己对于这方面的能力还是太弱了。

引用一下Miigon大佬的总结:

锁竞争优化一般有几个思路:

  • 只在必须共享的时候共享(对应为将资源从 CPU 共享拆分为每个 CPU 独立)
  • 必须共享时,尽量减少在关键区中停留的时间(对应“大锁化小锁”,降低锁的粒度)

第一种思路就是本次 lab 的第一部分,只有当前 CPU 的 freelist 已经为空时,才去和其它的 CPU 共享 freelist,其它情况下都是并行执行的。

第二种思路就是本次 lab 的第二部分,bcache 是没有办法单独划分给每个 CPU 的,属于必须共享,所以只能通过缩小临界区,缩小锁的粒度来实现。

后续这方面还是得多多加强。