程序执行模型。 并发及其缺陷。 作为并发执行单元的线程。 Rust如何提供线程安全性。 Rust中的并发基础知识。 其他用于处理并发的程序库。
程序执行模型
即使应用程序在后台连接网络,桌面应用程序仍然可以继续正常工作。 某个游戏程序同时更新数千个实体的状态,可在后台播放音乐并保持一致的帧速率。 一个科学的、计算量庞大的程序会分割计算过程,以充分利用计算机硬件中的所有内核。 Web服务器一次处理多个请求以最大化吞吐量。
并发
程序同时管理多个事务从而让人以为它们是同时发生的能力被称为并发,这种程序被称为并发程序。 并发和构建同时管理多个事务的程序有关,而并行是指将该程序交给多个内核处理,以增加它在一定时间内完成的工作量。
并发方法 Processes:在这种方法中,我们可以通过生成自己的独立副本来运行程序的不同部分。在Linux上,这可以通过调用fork系统来实现。要向生成的进程传递任何数据,可以使用各种进程间通信(Inter Process Communica-tion,IPC)工具,例如管道和FIFO。 Threads:底层的进程只是线程,具体来说就是主线程。进程可以启动或生成一个甚至多个线程。线程是最小的可调度执行单元。每个进程都是以一个主线程开始的。除此之外,它还可以使用操作系统提供的API生成其他线程。为了允许程序员使用线程,大多数程序语言都在其标准库中附带了线程API。与进程相比,它们是轻量级的。线程与其父进程共享相同的地址空间,它们不需要在内核中的进程控制块拥有单独的条目,每次生成新进程时都会自动更新。在一个进程中管理其中的多个线程是一项挑战,因为与进程不同,它们与父进程和其他子线程共享地址空间,并且由于线程的调度由操作系统决定,我们不能依赖线程执行的顺序,以及它们从中读取或写入内存的顺序。当我们从单线程程序转到多线程程序时,这些操作会突然变得难以理解。
用户级别 基于进程和线程的并发性受限于我们可以生成多少个进程或线程。更轻量级和更有效的替代方案是使用用户空间线程,通常也称绿色线程。 用户空间线程会作为语言运行的一部分进行管理和调度。运行时是在运行每个程序时执行的任何额外启动或管理代码。 在程序中利用并发需要遵循多个步骤。首先,我们需要识别出任务中能够独立运行的部分。其次,我们需要找到协调线程的方法,这些线程被拆分为多个子任务一起实现共同的目标。在此过程中,线程可能还需要共享数据,并且需要同步才能访问或写入共享数据。
缺陷
条件竞争:由于线程是由操作系统调度的,所以我们对它以什么顺序执行,以及如何访问共享数据没有发言权。多线程代码中的常见用例是从多个线程中更新全局状态。这需要经过3个步骤——读取、修改、写入。如果这3个操作不是由线程原子执行的,那么我们最终可能会遇到条件竞争问题。
数据竞争:当多个线程尝试将数据写入内存的某个位置,并且当两个线程同时对上述位置执行写入时,将很难预测会写入哪些值,内存中的最终结果可能是垃圾值。数据竞争是条件竞争导致的,因为读取、修改、更新操作必须由线程原子执行,以确保任何线程读取或写入数据的一致性。
Rust中的并发
线程基础 每个程序都以一个主线程开始启动。要从程序中的任意位置创建独立的执行点,主线程可以生成一个新线程,该线程将成为其子线程,子线程可以进一步产生自己的线程。
use std::thread;
fn main() {
thread::spawn(|| {
println!("Thread!");
"Much concurrent, such wow!".to_string()
});
print!("Hello ");
}
在子线程有机会运行代码之前,程序抵达“print!(“Hello”);”语句时从main函数返回并退出。因此,子线程中的代码根本不会执行。为了允许子线程执行代码,我们需要等待子线程。
use std::thread;
fn main() {
let child = thread::spawn(|| {
println!("Thread!");
String::from("Much concurrent, such wow!") });
print!("Hello ");
let value = child.join().expect("Failed joining child thread");
println!("{}", value);
}
自定义线程
use std::thread::Builder;
fn main() {
let my_thread = Builder::new().name("Worker Thread".to_string()) .stack_size(1024 * 4);
let handle = my_thread.spawn(|| {
panic!("Oops!");
});
let child_status = handle.unwrap().join();
println!("Child status: {}", child_status);
}
访问线程中的数据
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3, 4];
for n in 0..5 {
thread::spawn(|| {
println!("{}", nums[n]);
});
}
}
nums来自主线程,当我们生成一个线程时,它不能保证在父线程之前退出,并且有可能比父线程存续时间更长。当父线程返回时,nums变量已经失效,它指向的Vec也会被释放。如果Rust允许前面的代码通过编译,那么子线程可能已经访问了主线程返回后包含一些垃圾值的nums,并且可能导致分段错误。
use std::thread;
fn main() {
let my_str = String::from("Damn you borrow checker!");
let _ = thread::spawn(move || {
println!("In thread: {}", my_str);
});
println!("In main: {}", my_str);
}
使用关键字move,即使我们只是从子线程中读取my_str,该数据也不再有效。当然,我们还需要感谢编译器。如果子线程释放数据,并且我们从main函数中访问my_str,那么我们将会访问一个已被释放的值。
线程的并发模型
Rust并不会倾向于使用任何固有的并发模型,允许开发者使用自己的模型,并根据需要使用第三方软件包来解决自己的问题。因此还有其他并发模型供用户选择,其中包括actix软件包中实现为程序库的actor模型,rayon软件包实现的工作窃取(work stealing)并发模型。还包括crossbeam软件包实现的模型,它们允许并发线程从其父堆栈帧上共享数据,并保证在父堆栈被释放之前返回。
Rust内置了两种流行的并发模型:通过同步共享数据和通过消息传递共享数据。
状态共享模型
use std::thread;
use std::rc::Rc;
fn main() {
let nums = Rc::new(vec![0, 1, 2, 3, 4]);
let mut childs = vec![];
for n in 0..5 {
let ns = nums.clone();
let c = thread::spawn(|| {
println!("{}", ns[n]);
});
childs.push(c);
}
for c in childs {
c.join().unwrap();
}
}
Rc类型并不是线程安全的,因为引用计数更新操作不是原子的。我们只能在单线程代码中使用Rc类型。如果我们想在多线程环境中共享相同类型的所有权,那么可以使用Arc类型,它和Rc类型类似,但是具有原子引用计数功能。
通过Arc类型共享所有权
use std::thread;
use std::sync::Arc;
fn main() {
let nums = Arc::new(vec![0, 1, 2, 3, 4]);
let mut childs = vec![];
for n in 0..5 {
let ns = Arc::clone(&nums);
let c = thread::spawn(move || {
println!("{}", ns[n]);
});
childs.push(c);
}
for c in childs {
c.join().unwrap();
}
}
修改线程中的共享数据
use std::thread;
use std::sync::Arc;
fn main() {
let mut nums = Arc::new(vec![]);
for n in 0..5 {
let mut ns = nums.clone();
thread::spawn(move || {
nums.push(n);
});
}
}
要改变来自多线程的数据,我们需要使用一种提供共享可变性的类型,就像RefCell那样。但与Rc类似,RefCell不能跨多个线程使用。因此,我们需要使用它们的线程安全的变体,例如Mutex或RwLock包装器类型。接下来让我们来探讨它们。
互斥
当需要安全地对共享资源进行可变访问时,可以通过互斥来提供访问。互斥锁(mutex)是mutual和exclusion的缩写,是一种广泛使用的同步原语,用于确保一段代码一次只能有一个线程执行。通常,互斥锁是一个守护对象,线程获取该对象以保护要由多个线程共享或修改的数据。它的工作原理是通过锁定值来禁止一次访问多个线程中的值。
use std::sync::Mutex;
use std::thread;
fn main() {
let m = Mutex::new(0);
let c = thread::spawn(move || {
{
*m.lock().unwrap() += 1;
}
let updated = *m.lock().unwrap();
updated
});
let updated = c.join().unwrap();
println!("{:?}", updated);
}
但是当多个线程尝试访问该值时,这将无效,因为Mutex类型不提供共享可变性。为了允许Mutex类型中的值支持在多线程环境下被修改,我们需要将它包装成Arc类型。
通过Arc和Mutex实现共享可变性
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let vec = Arc::new(Mutex::new(vec![]));
let mut childs = vec![];
for i in 0..5 {
let mut v = vec.clone();
let t = thread::spawn(move || {
let mut v = v.lock().unwrap();
v.push(i);
});
childs.push(t);
}
for c in childs {
c.join().unwrap();
}
println!("{:?}", vec);
}
在互斥锁上执行锁定将阻止其他线程调用锁定,直到锁定消失为止。因此,以一种细粒度的方式构造代码是很重要的。还有一种与互斥锁类似的替代方法,即RwLock类型,它对类型的锁定更敏感,并且在读取比写入更频繁的情况下性能更好。
RwLock
?read:提供对线程的读取访问权限;可以存在多个读取调用。 ?write:提供对线程的独占访问,以便将数据写入包装类型;从RwLock实例到线程只允许有一个写入访问权限。
use std::sync::RwLock;
use std::thread;
fn main() {
let m = RwLock::new(5);
let c = thread::spawn(move || {
{
*m.write().unwrap() += 1;
}
let updated = *m.read().unwrap();
updated
});
let updated = c.join().unwrap();
println!("{:?}", updated);
}
通过消息传递进行通信
?channel:这是一个异步的无限缓冲通道。 ?sync_channel:这是一个同步的有界缓冲通道。
异步通道
use std::thread;
use std::sync::mpsc::channel;
fn main() {
let (tx, rx) = channel();
let join_handle = thread::spawn(move || {
while let Ok(n) = rx.recv() {
println!("Received {}", n);
}
});
for i in 0..10 {
tx.send(i).unwrap();
}
join_handle.join().unwrap();
}
同步通道
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::sync_channel(1);
let tx_clone = tx.clone();
let _ = tx.send(0);
thread::spawn(move || {
let _ = tx.send(1);
});
thread::spawn(move || {
let _ = tx_clone.send(2);
});
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {:?} via the channel", rx.recv());
}
Rust中的线程安全
spawn是一个包含F和T的泛型函数,并且会接收一个参数f,返回的泛型是JoinHan-dle。随后的where子句指定了多个特征边界。 F:FnOnce() ->T:这表示F实现了一个只能被调用一次的闭包。换句话说,f是一个闭包,通过值获取所有内容并移动从环境中引用的项。 F:Send+ 'static:这表示闭包必须是发送型(Send),并且必须具有’static的生命周期,同时执行环境中闭包内引用的任何类型必须是发送型,必须在程序的整个生命周期内存活。 T:Send + 'static:来自闭包的返回类型T必须实现Send+'static特征。
Send Send类型可以安全地发送到多个线程,这表明该类型是一种移动类型。非Send类型的是指针类型,例如&T,除非T是Sync类型。 pub unsafe auto trait Send { }
Sync pub unsafe auto trait Sync { } 这表示实现此特征的类型可以安全地在线程之间共享。如果某些类型是Sync类型,那么指向它的引用,换句话说相关的&T是Send类型。这意味着我们可以将对它的引用传递给多线程。
使用actor模型实现并发
Actor:这是actor模型中的核心元素。每个actor都包含其地址,使用该地址我们可以将消息发送到某个actor和邮箱,这只是一个存储它收到的消息的队列。 FIFO:队列通常是先进先出(First In FirstOut,FIFO)。actor的地址是必需的,以便其他actor可以向其发送消息。超级actor能够创建其他子actor的子actor。 Messages:actor之间仅通过消息进行通信,它们由actor异步处理。actix-web框架为异步包装器中的同步操作提供了一个很好的包装器。
[dependencies] actix = “0.7.9” futures = “0.1.25” tokio = “0.1.15”
use actix::prelude::*;
use tokio::timer::Delay;
use std::time::Duration;
use std::time::Instant;
use futures::future::Future;
use futures::future;
struct Add(u32, u32);
impl Message for Add {
type Result = Result<u32, ()>;
}
struct Adder;
impl Actor for Adder {
type Context = SyncContext<Self>;
}
impl Handler<Add> for Adder {
type Result = Result<u32, ()>;
fn handle(&mut self, msg: Add, _: &mut Self::Context) -> Self::Result {
let sum = msg.0 + msg.0;
println!("Computed: {} + {} = {}",msg.0, msg.1, sum);
Ok(msg.0 + msg.1) }
}
fn main() {
System::run(|| {
let addr = SyncArbiter::start(3, || Adder);
for n in 5..10 {
addr.do_send(Add(n, n+1));
}
tokio::spawn(futures::lazy(|| {
Delay::new(Instant::now() + Duration::from_secs(1)).then(|_| {
System::current().stop();
future::ok::<(),()>(())
})
}));
});
}
|