ch8 并发 (这章感觉没啥要补充的,简单总结一下得了)
本文是参加2025春夏季开源操作系统训练营 时对第二阶段文档第7章做的笔记。
文档:https://learningos.cn/rCore-Camp-Guide-2025S
完整版文档:https://rcore-os.cn/rCore-Tutorial-Book-v3
引言 操作系统通过在不同时间片调度进程,实现了进程间的并发,进程内部还没有并发性的体现。为了提高一个进程内的并发性,出现了线程(thread)。把一个进程内的多个可并行执行任务通过一种更细粒度的方式让操作系统进行调度,这种细粒度的调度对象就是线程。
内核态的线程管理 有了线程后,进程是线程的资源容器,线程是程序的基本执行实体。每个线程有自己的线程上下文,包括通用寄存器、用户栈、局部变量等,同时所有线程共享该进程的资源。在进程中的线程没有父子关系,但创建进程时建立的第一个线程比较重要。
引入线程之后,需要修改一些系统调用,在通过 fork
创建进程其实也意味着要单独建立一个主线程来使用处理器,并为以后创建新的线程建立相应的线程控制块向量。
每个线程的执行状态和线程上下文均保存在一个线程控制块 (TCB, Task Control Block) 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 pub struct TaskControlBlock { pub process: Weak<ProcessControlBlock>, pub kernel_stack: KernelStack, inner: UPSafeCell<TaskControlBlockInner>, } pub struct TaskControlBlockInner { pub trap_cx_ppn: PhysPageNum, pub task_cx: TaskContext, pub task_status: TaskStatus, pub exit_code: Option <i32 >, pub res: Option <TaskUserRes>, }
对线程的管理,例如创建、退出、调度切换等,与没有线程的时候对进程的管理类似。
锁机制 在一个时间段内,会有多个线程在执行,这就是并发。多个线程可能会读写相同的数据,写操作一般都会对应好几条汇编指令,如果一个线程修改到一半,被中断打断执行,切换到别的线程执行,对共享数据的操作可能会出错。这需要加入锁作为保障机制,保证对拥有锁的线程,可以独占地对共享数据进行读写,从而能够得到正确的共享数据结果。
锁有很多种实现方式,包括用户态软件、机器指令硬件、内核态操作系统等。锁大概可以分为 自旋锁(spin lock) 和 睡眠锁(blocking lock) 。这里介绍一种内核态操作系统级方法实现锁。
在线程的眼里,互斥是一种每个线程能看到的资源,可以存在多个不同互斥资源,所以我们可以把所有的互斥资源放在一起让进程来管理, mutex_list: Vec<Option<Arc<dyn Mutex>>>
表示的是实现了 Mutex
trait 的一个互斥资源的向量。操作系统需要显式地施加某种控制,来确定当一个线程释放锁时,等待的线程谁将能抢到锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 pub struct ProcessControlBlock { pub pid: PidHandle, inner: UPSafeCell<ProcessControlBlockInner>, } pub struct ProcessControlBlockInner { ... pub mutex_list: Vec <Option <Arc<dyn Mutex>>>, } pub trait Mutex : Sync + Send { fn lock (&self ); fn unlock (&self ); } pub struct MutexBlocking { inner: UPSafeCell<MutexBlockingInner>, } pub struct MutexBlockingInner { locked: bool , wait_queue: VecDeque<Arc<TaskControlBlock>>, }
Mutex
trait 可能的实现包括上面提到的两种锁,然后,再为 Mutex
trait实现 lock 和 unlock 函数,主要逻辑为判断锁现在的状态,然后调用调度器的函数切换线程。以睡眠锁为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 impl Mutex for MutexBlocking { fn lock (&self ) { let mut mutex_inner = self .inner.exclusive_access (); if mutex_inner.locked { mutex_inner.wait_queue.push_back (current_task ().unwrap ()); drop (mutex_inner); block_current_and_run_next (); } else { mutex_inner.locked = true ; } } fn unlock (&self ) { let mut mutex_inner = self .inner.exclusive_access (); assert! (mutex_inner.locked); if let Some (waking_task) = mutex_inner.wait_queue.pop_front () { add_task (waking_task); } else { mutex_inner.locked = false ; } } }
信号量机制 通过锁,可以让线程在临界区执行时,独占临界资源。但是如果同时允许N个线程访问临界资源,可以使用信号量。
信号量是对互斥锁的一种扩展,包括 up(V)和 down(P)操作。信号量除了用于临界资源共享,还可以用于实现同步(synchronization)。把信号量的初始值设置为 0 ,当一个线程 A 对此信号量执行一个 P 操作,该线程立即会被阻塞睡眠。之后有另外一个线程 B 对此信号量执行一个 V 操作,就会将线程 A 唤醒。这样线程 B 中执行 V 操作之前的代码序列 B-stmts 和线程 A 中执行 P 操作之后的代码 A-stmts 序列之间就形成了一种确定的同步执行关系。实现信号量的操作与上面的互斥锁类似。
条件变量机制 在有些情况下,线程需要检查某一条件满足之后,才会继续执行。如果使用忙等,对处理器资源的浪费比较多;如果使用互斥锁,实现睡眠等待,线程可能会持有锁进入睡眠状态,会出现死锁。对于这个问题,引入了管程(Monitor) 和 条件变量(Condition Variables) 。
条件变量允许一个或多个线程在某个条件不满足时挂起等待,并在条件满足时被唤醒。它通常与互斥锁(Mutex)一起使用,广泛应用于生产者-消费者模型、任务队列等场景。管程将共享数据、对数据的操作过程、以及同步机制(如互斥和条件变量)封装在一起,形成一个抽象的数据结构,类似于面向对象中的对象 ,更抽象、安全、结构化。
以下是一个模拟“生产者-消费者”问题的例子,使用 Rust 标准库中的 Mutex
和 Condvar
实现一个简化的管程结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 use std::collections::VecDeque;use std::sync::{Arc, Condvar, Mutex};use std::thread;use std::time::Duration;const BUFFER_CAPACITY: usize = 5 ;struct MonitorBuffer <T> { queue: Mutex<VecDeque<T>>, not_full: Condvar, not_empty: Condvar, } impl <T> MonitorBuffer<T> { fn new () -> Self { Self { queue: Mutex::new (VecDeque::new ()), not_full: Condvar::new (), not_empty: Condvar::new (), } } fn produce (&self , item: T) { let mut queue = self .queue.lock ().unwrap (); while queue.len () == BUFFER_CAPACITY { queue = self .not_full.wait (queue).unwrap (); } queue.push_back (item); self .not_empty.notify_one (); } fn consume (&self ) -> T { let mut queue = self .queue.lock ().unwrap (); while queue.is_empty () { queue = self .not_empty.wait (queue).unwrap (); } let item = queue.pop_front ().unwrap (); self .not_full.notify_one (); item } } fn main () { let buffer = Arc::new (MonitorBuffer::new ()); let producer = { let buffer = Arc::clone (&buffer); thread::spawn (move || { for i in 0 ..10 { println! ("Producing: {}" , i); buffer.produce (i); thread::sleep (Duration::from_millis (100 )); } }) }; let consumer = { let buffer = Arc::clone (&buffer); thread::spawn (move || { for _ in 0 ..10 { let val = buffer.consume (); println! ("Consumed: {}" , val); thread::sleep (Duration::from_millis (150 )); } }) }; producer.join ().unwrap (); consumer.join ().unwrap (); }
其中,MonitorBuffer<T>
是一个管程封装体,queue
是受保护的共享资源,produce
和 consume
是临界区方法,多个线程通过调用 produce()
和 consume()
进行安全通信。