MIT 6.s081 Lab7: Multithreading

本次实验要实现用户级线程之间的切换,使用多线程来加速程序,并实现屏障。

在听课的时候感觉这次实验是要实现内核级线程,让一个进程中的多个线程可以同时运行在多个 CPU 上面,感觉有点害怕。不过看实验说明书的时候才知道是实现的用户级线程,其实应该叫做协程,而且很多代码都已经给出来了,做之前要读一遍 user/uthread.c。每个用户线程对应一个内核线程是 Optional chanllenge 部分的任务了,有时间再做吧。这样一来本次的实验还是很简单的。

Uthread: switching between (moderate)

这部分是实现用户级线程的切换。在课上教授已经讲过了内核级线程是如何切换的了。大部分内容都可以借鉴。

在切换线程时,要把当前运行线程的 callee registers 保存下来。

thread_switch needs to save/restore only the callee-save registers.

我对这里的理解是,用户级线程的切换一定是手动调用 thread_schedule 然后调用 thread_switch 进行切换的。所以对用户级线程来说,恢复现场就是在之前调用 thread_switch 函数后调用 ret 返回到 thread_schedule 函数中,那么由于 C 编译器会在调用函数的时候把 caller registers 保存在线程的栈上,所以我们只需要记下 ra 来定位返回的位置,还有 sp 指出线程栈的位置,并且把所有的 callee register 保存即可(跟内核级线程的切换 Context 是一样的)。

这部分要做得任务只有两个,在 thread_create 函数中添加代码。要做到线程的切换,我们必须初始化它的 ra 和 sp,这样才可以使得其它线程调用 thread_switch 通过 ret 返回跳到 ra 所在位置执行。在创建的时候设置 ra 和 sp 相当于伪造一个执行现场,返回到那里。

我选择是直接把寄存器存在 thread 结构体中,这样可以直接把 swtch.S 里的内容复制过来就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct thread {
uint64 ra;
uint64 sp;

// callee saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;

char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */

};

将线程的 ra 初始化为 func 的起始地址,并且设置线程栈。注意栈是从高地址扩展到低地址的,所以初始化的时候应该将指针指向栈的最高地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
void 
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->ra = (uint64)func;
t->sp = (uint64)t->stack + STACK_SIZE - 1;
}

最后在 thread_schedule 调用 thread_switch 就可以了。注意 t 才是当前线程,而 current 是要被调度的线程。

Using threads (moderate)

这部分跟 xv6 无关了,就是用 pthread + lock 来控制并发访问一个 hash table。

先看一下 hash table 的实现。本质上是一个 entry 类型的数组,里面有 NBUCKET 个链表,key 通过摸 NBUCKET 来决定插入到哪个链表的表头。

所以回答第一个问题:

Why are there missing keys with 2 threads, but not with 1 thread? Identify a sequence of events with 2 threads that can lead to a key being missing.

当两个线程同时调用 insert 函数的时候,两个线程看到的链表头都是同一个,那么在插入的时候,后插入的会把先插入的给覆盖掉。

然后是实现并发控制,其实就是给每个 bucket 都分配一把锁,这样不同的线程在访问不同的 bucket 的时候就可以并发执行了。

1
pthread_mutex_t bucketlocks[NBUCKET]; // 声明锁

在 main 函数中调用 pthread_mutex_init 进行初始化。

1
2
3
for (int i = 0; i < NBUCKET; i++) {
pthread_mutex_init(&bucketlocks[i], NULL);
}

在 put 中上锁即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static 
void put(int key, int value)
{
int i = key % NBUCKET;

// is the key already present?
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if(e){
// update the existing key.
e->value = value;
} else {
// the new is new.
pthread_mutex_lock(&bucketlocks[i]);
insert(key, value, &table[i], table[i]);
pthread_mutex_unlock(&bucketlocks[i]);
}
}

Barrier (moderate)

这部分要通过 pthread_cond 来实现一个 barrier。关于 barrier 和 waitgroup 的区别:

屏障是一种同步原语,它会阻止多个线程或 goroutine 的运行,直到它们都到达某个点,在该点它们都被同时释放。 屏障可用于确保仅在所有线程完成其计算的特定阶段后才执行某些操作。

另一方面,WaitGroup 是一种同步原语,用于等待一组 goroutine 完成它们的执行,然后再继续。 WaitGroup 用一个计数初始化,每个 goroutine 在开始工作之前递增计数,并在完成时递减计数。 主线程(或另一个 goroutine)在 WaitGroup 上等待,直到计数达到零,这表明所有 goroutine 都已完成。 这允许主线程与并行任务的完成同步。

直接实现 barrier 函数即可。需要注意的是 barrier 函数也可能出现竞态。比如说线程 1 进入 barrier,将 bstate.nthread 加 1,然后比较一下还没有达到 nthread,正打算休眠,结果线程 2 进入 barrier,将 bstate.nthread 加 1,比较一下发现已经达到了 nthread,于是执行了一次唤醒操作,接着线程 1 又休眠了,那么线程 1 无法被唤醒,下一轮也就永远无法到达 nthread 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void 
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
pthread_mutex_lock(&bstate.barrier_mutex);
bstate.nthread++;
if (bstate.nthread < nthread) {
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
} else {
bstate.nthread = 0;
bstate.round++;
pthread_cond_broadcast(&bstate.barrier_cond);
}
pthread_mutex_unlock(&bstate.barrier_mutex);
}

所以通过上锁将增加 bstate.nthread++、比较、休眠或唤醒合成一个原子操作。

总结

这次 lab 由于跟 xv6 相关的部分只有一个,并且跟教授上课讲得内核级线程的切换几乎一样,只是省去了先切换到调度线程这一步,所以实现起来很简单。后续有时间可以回来实现一下内核级线程。