深入理解rust并发编程 (晁岳攀( 鸟窝)) (Z-Library)
Author: 晁岳攀(@ 鸟窝)
商业
No Description
📄 File Format:
PDF
💾 File Size:
2.7 MB
20
Views
0
Downloads
0.00
Total Donations
📄 Text Preview (First 20 pages)
ℹ️
Registered users can read the full content for free
Register as a Gaohf Library member to read the complete e-book online for free and enjoy a better reading experience.
📄 Page
1
深入理解 Rust 并发编程 从入门到放弃 看这本就够了 晁岳攀(@ 鸟窝)著 Version 0.14.1 2024年 1月 7日
📄 Page
2
(This page has no text content)
📄 Page
3
目录 1 线程 9 1.1 创建线程 . . . . . . . . . . . . . . . . . . . . . . . . . . 11 1.2 Thread Builder . . . . . . . . . . . . . . . . . . . . . . . . 12 1.3 当前的线程 . . . . . . . . . . . . . . . . . . . . . . . . . 13 1.4 并发数和当前线程数 . . . . . . . . . . . . . . . . . . . . . . 14 1.5 sleep和 park . . . . . . . . . . . . . . . . . . . . . . . . 15 1.6 scoped thread . . . . . . . . . . . . . . . . . . . . . . . . 17 1.7 ThreadLocal . . . . . . . . . . . . . . . . . . . . . . . . . 18 1.8 Move . . . . . . . . . . . . . . . . . . . . . . . . . . . . 19 1.9 控制新建的线程. . . . . . . . . . . . . . . . . . . . . . . . 20 1.10 设置线程优先级. . . . . . . . . . . . . . . . . . . . . . . . 21 1.11 设置 affinity . . . . . . . . . . . . . . . . . . . . . . . . . 23 1.12 Panic . . . . . . . . . . . . . . . . . . . . . . . . . . . . 24 1.13 crossbeam scoped thread . . . . . . . . . . . . . . . . . . . 25 1.14 Rayon scoped thread . . . . . . . . . . . . . . . . . . . . . 25 1.15 send_wrapper . . . . . . . . . . . . . . . . . . . . . . . . 26 1.16 Go风格的启动线程 . . . . . . . . . . . . . . . . . . . . . . 27 2 线程池 29 2.1 rayon线程池 . . . . . . . . . . . . . . . . . . . . . . . . . 29 2.2 threadpool库 . . . . . . . . . . . . . . . . . . . . . . . . 33 2.3 rusty_pool库 . . . . . . . . . . . . . . . . . . . . . . . . 34 2.4 fast_threadpool库 . . . . . . . . . . . . . . . . . . . . . . 37 2.5 scoped_threadpool库 . . . . . . . . . . . . . . . . . . . . . 38 2.6 scheduled_thread_pool库 . . . . . . . . . . . . . . . . . . . 39 2.7 poolite库 . . . . . . . . . . . . . . . . . . . . . . . . . . 40 2.8 executor_service库 . . . . . . . . . . . . . . . . . . . . . . 42 2.9 threadpool_executor库 . . . . . . . . . . . . . . . . . . . . 44 3 async/await 异步编程 49 3.1 异步编程综述 . . . . . . . . . . . . . . . . . . . . . . . . 49 3.2 Rust中的异步编程模型 . . . . . . . . . . . . . . . . . . . . 49 3.3 async/await语法和用法 . . . . . . . . . . . . . . . . . . . . 52
📄 Page
4
3.4 Tokio . . . . . . . . . . . . . . . . . . . . . . . . . . . . 53 3.5 futures . . . . . . . . . . . . . . . . . . . . . . . . . . . 55 3.6 futures_lite . . . . . . . . . . . . . . . . . . . . . . . . . 56 3.7 async_std . . . . . . . . . . . . . . . . . . . . . . . . . . 57 3.8 smol . . . . . . . . . . . . . . . . . . . . . . . . . . . . 57 3.9 try_join、join、select和 zip . . . . . . . . . . . . . . . . . . 58 4 容器同步原语 61 4.1 cow . . . . . . . . . . . . . . . . . . . . . . . . . . . . 61 4.2 box . . . . . . . . . . . . . . . . . . . . . . . . . . . . 63 4.3 Cell、RefCell、OnceCell、LazyCell和 LazyLock . . . . . . . . . . 64 4.3.1 Cell . . . . . . . . . . . . . . . . . . . . . . . . . 64 4.3.2 RefCell . . . . . . . . . . . . . . . . . . . . . . . . 65 4.3.3 OnceCell . . . . . . . . . . . . . . . . . . . . . . . 65 4.3.4 LazyCell、LazyLock . . . . . . . . . . . . . . . . . . 66 4.4 rc . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 67 5 基础同步原语 69 5.1 Arc. . . . . . . . . . . . . . . . . . . . . . . . . . . . . 69 5.2 互斥锁 Mutex . . . . . . . . . . . . . . . . . . . . . . . . 72 5.2.1 Lock . . . . . . . . . . . . . . . . . . . . . . . . . 73 5.2.2 try_lock . . . . . . . . . . . . . . . . . . . . . . . 74 5.2.3 Poisoning . . . . . . . . . . . . . . . . . . . . . . . 75 5.2.4 更快的释放互斥锁 . . . . . . . . . . . . . . . . . . . 76 5.3 读写锁 RWMutex . . . . . . . . . . . . . . . . . . . . . . . 78 5.4 一次初始化 Once . . . . . . . . . . . . . . . . . . . . . . . 83 5.5 屏障/栅栏 Barrier . . . . . . . . . . . . . . . . . . . . . . . 85 5.6 条件变量 Condvar. . . . . . . . . . . . . . . . . . . . . . . 87 5.7 LazyCell和 LazyLock . . . . . . . . . . . . . . . . . . . . . 88 5.8 Exclusive . . . . . . . . . . . . . . . . . . . . . . . . . . 89 5.9 mpsc . . . . . . . . . . . . . . . . . . . . . . . . . . . . 89 5.10 信号量 Semaphore . . . . . . . . . . . . . . . . . . . . . . 92 5.11 原子操作 atomic . . . . . . . . . . . . . . . . . . . . . . . 92 5.11.1 原子操作的 Ordering . . . . . . . . . . . . . . . . . . 95 5.11.2 Ordering::Relaxed . . . . . . . . . . . . . . . . . . . 96 5.11.3 Ordering::Acquire . . . . . . . . . . . . . . . . . . . 96 5.11.4 Ordering::Release . . . . . . . . . . . . . . . . . . . 97 5.11.5 Ordering::AcqRel . . . . . . . . . . . . . . . . . . . 98 5.11.6 Ordering::SeqCst . . . . . . . . . . . . . . . . . . . 99 6 并发集合 101 6.1 线程安全的 Vec . . . . . . . . . . . . . . . . . . . . . . . . 101 6.2 线程安全的 HashMap . . . . . . . . . . . . . . . . . . . . . 102 6.3 dashmap . . . . . . . . . . . . . . . . . . . . . . . . . . 104
📄 Page
5
6.4 lockfree. . . . . . . . . . . . . . . . . . . . . . . . . . . 104 6.5 cuckoofilter . . . . . . . . . . . . . . . . . . . . . . . . . 105 6.6 evmap . . . . . . . . . . . . . . . . . . . . . . . . . . . 105 6.7 arc-swap . . . . . . . . . . . . . . . . . . . . . . . . . . 106 7 进程 109 7.1 创建进程 . . . . . . . . . . . . . . . . . . . . . . . . . . 109 7.2 等待进程结束 . . . . . . . . . . . . . . . . . . . . . . . . 109 7.3 配置输入输出 . . . . . . . . . . . . . . . . . . . . . . . . 110 7.4 环境变量 . . . . . . . . . . . . . . . . . . . . . . . . . . 110 7.5 设置工作目录 . . . . . . . . . . . . . . . . . . . . . . . . 111 7.6 设置进程的 UID和 GID . . . . . . . . . . . . . . . . . . . . 111 7.7 传递给子进程打开的文件 . . . . . . . . . . . . . . . . . . . . 112 7.8 控制子进程 . . . . . . . . . . . . . . . . . . . . . . . . . 113 7.8.1 等待子进程结束 . . . . . . . . . . . . . . . . . . . . 113 7.8.2 向子进程发送信号 . . . . . . . . . . . . . . . . . . . 114 7.8.3 通过标准输入输出与子进程交互 . . . . . . . . . . . . . . 114 7.9 实现管道 . . . . . . . . . . . . . . . . . . . . . . . . . . 114 7.10 和子进程的 I/O交互 . . . . . . . . . . . . . . . . . . . . . . 115 8 channel 通道 119 8.1 mpsc . . . . . . . . . . . . . . . . . . . . . . . . . . . . 119 8.2 crossbeam-channel . . . . . . . . . . . . . . . . . . . . . . 123 8.3 flume. . . . . . . . . . . . . . . . . . . . . . . . . . . . 128 8.4 async_channel . . . . . . . . . . . . . . . . . . . . . . . . 130 8.5 futures_channel . . . . . . . . . . . . . . . . . . . . . . . 131 8.6 crossfire . . . . . . . . . . . . . . . . . . . . . . . . . . 133 8.7 kanal . . . . . . . . . . . . . . . . . . . . . . . . . . . . 135 9 定时器 137 9.1 Timer . . . . . . . . . . . . . . . . . . . . . . . . . . . 137 9.1.1 timer库 . . . . . . . . . . . . . . . . . . . . . . . 138 9.1.2 futures_timer . . . . . . . . . . . . . . . . . . . . . 140 9.1.3 async-io的 Timer . . . . . . . . . . . . . . . . . . . 141 9.1.4 tokio. . . . . . . . . . . . . . . . . . . . . . . . . 142 9.1.5 smol::Timer . . . . . . . . . . . . . . . . . . . . . . 142 9.1.6 async-timer . . . . . . . . . . . . . . . . . . . . . . 142 9.1.7 timer-kit . . . . . . . . . . . . . . . . . . . . . . . 142 9.1.8 hierarchical_hash_wheel_timer . . . . . . . . . . . . . . 142 9.2 ticker. . . . . . . . . . . . . . . . . . . . . . . . . . . . 143 9.2.1 ticker . . . . . . . . . . . . . . . . . . . . . . . . 143 9.2.2 tokio::time::interval . . . . . . . . . . . . . . . . . . 143 10parking_lot 并发库 145 10.0.1 Mutex . . . . . . . . . . . . . . . . . . . . . . . . 146
📄 Page
6
10.0.2 FairMutex . . . . . . . . . . . . . . . . . . . . . . 149 10.0.3 RwLock . . . . . . . . . . . . . . . . . . . . . . . 150 10.0.4 ReentrantMutex . . . . . . . . . . . . . . . . . . . . 151 10.0.5 Once. . . . . . . . . . . . . . . . . . . . . . . . . 152 10.0.6 Condvar . . . . . . . . . . . . . . . . . . . . . . . 153 11crossbeam 并发库 155 11.1 原子操作 . . . . . . . . . . . . . . . . . . . . . . . . . . 156 11.2 数据结构 . . . . . . . . . . . . . . . . . . . . . . . . . . 157 11.2.1 双向队列 deque . . . . . . . . . . . . . . . . . . . . 157 11.2.2 ArrayQueue . . . . . . . . . . . . . . . . . . . . . . 159 11.2.3 SegQueue . . . . . . . . . . . . . . . . . . . . . . 159 11.3 内存管理 . . . . . . . . . . . . . . . . . . . . . . . . . . 160 11.4 线程同步 . . . . . . . . . . . . . . . . . . . . . . . . . . 160 11.4.1 channel . . . . . . . . . . . . . . . . . . . . . . . 160 11.4.2 Parking . . . . . . . . . . . . . . . . . . . . . . . 165 11.4.3 ShardedLock . . . . . . . . . . . . . . . . . . . . . 166 11.4.4 WaitGroup . . . . . . . . . . . . . . . . . . . . . . 167 11.5 实用工具 . . . . . . . . . . . . . . . . . . . . . . . . . . 167 11.5.1 Backoff. . . . . . . . . . . . . . . . . . . . . . . . 168 11.5.2 CachePadded . . . . . . . . . . . . . . . . . . . . . 169 11.5.3 Scope . . . . . . . . . . . . . . . . . . . . . . . . 170 11.6 crossbeam-skiplist . . . . . . . . . . . . . . . . . . . . . . 170 12rayon 库 173 12.1 并行集合 . . . . . . . . . . . . . . . . . . . . . . . . . . 173 12.2 scope. . . . . . . . . . . . . . . . . . . . . . . . . . . . 174 12.3 Thread池 . . . . . . . . . . . . . . . . . . . . . . . . . . 175 12.4 join . . . . . . . . . . . . . . . . . . . . . . . . . . . . 176 13tokio 库 179 13.1 异步运行时 . . . . . . . . . . . . . . . . . . . . . . . . . 179 13.2 同步原语 . . . . . . . . . . . . . . . . . . . . . . . . . . 181 13.2.1 Mutex . . . . . . . . . . . . . . . . . . . . . . . . 182 13.2.2 RwLock . . . . . . . . . . . . . . . . . . . . . . . 183 13.2.3 Barrier . . . . . . . . . . . . . . . . . . . . . . . . 184 13.2.4 Notify . . . . . . . . . . . . . . . . . . . . . . . . 184 13.2.5 Semaphore . . . . . . . . . . . . . . . . . . . . . . 186 13.2.6 OnceCell . . . . . . . . . . . . . . . . . . . . . . . 187 13.3 通道 . . . . . . . . . . . . . . . . . . . . . . . . . . . . 187 13.3.1 mpsc . . . . . . . . . . . . . . . . . . . . . . . . 188 13.3.2 oneshot . . . . . . . . . . . . . . . . . . . . . . . 188 13.3.3 broadcast (mpmc) . . . . . . . . . . . . . . . . . . . 189 13.3.4 watch (spmc) . . . . . . . . . . . . . . . . . . . . . 190
📄 Page
7
13.4 时间相关 . . . . . . . . . . . . . . . . . . . . . . . . . . 192 13.4.1 Sleep . . . . . . . . . . . . . . . . . . . . . . . . 193 13.4.2 Interval . . . . . . . . . . . . . . . . . . . . . . . 193 13.4.3 Timeout . . . . . . . . . . . . . . . . . . . . . . . 194 14其它并发库 197 14.1 进程锁 . . . . . . . . . . . . . . . . . . . . . . . . . . . 197 14.2 oneshot . . . . . . . . . . . . . . . . . . . . . . . . . . . 198 14.3 map . . . . . . . . . . . . . . . . . . . . . . . . . . . . 200 14.4 一些同步原语 . . . . . . . . . . . . . . . . . . . . . . . . 203 14.5 事件通知 . . . . . . . . . . . . . . . . . . . . . . . . . . 206 14.6 队列 . . . . . . . . . . . . . . . . . . . . . . . . . . . . 208 14.7 scc . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 209 14.8 信号量 . . . . . . . . . . . . . . . . . . . . . . . . . . . 210 14.9 singleflight . . . . . . . . . . . . . . . . . . . . . . . . . 211 14.10arc_swap . . . . . . . . . . . . . . . . . . . . . . . . . . 212
📄 Page
8
(This page has no text content)
📄 Page
9
1 线程 线程(英语:thread)是操作系统能够进行运算和调度的最小单位。大部分情况下,它 被包含在进程之中,是进程中的实际运作单位,所以说程序实际运行的时候是以线程为 单位的,一个进程中可以并发多个线程,每条线程并行执行不同的任务。 线程是独立调度和分派的基本单位,并且同一进程中的多条线程将共享该进程中的全部 系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有 各自的调用栈(call stack),自己的寄存器上下文(register context),自己的线程本 地存储(thread-local storage)。 一个进程可以有很多线程来处理,每条线程并行执行不同的任务。如果进程要完成的 任务很多,这样需很多线程,也要调用很多核心,在多核或多 CPU,或支持 Hyper- threading的 CPU上使用多线程程序设计可以提高了程序的执行吞吐率。在单 CPU单 核的计算机上,使用多线程技术,也可以把进程中负责 I/O处理、人机交互而常被阻塞 的部分与密集计算的部分分开来执行,从而提高 CPU的利用率。 线程在以下几个方面与传统的多任务操作系统进程不同: • 进程通常是独立的,而线程作为进程的子集存在 • 进程携带的状态信息比线程多得多,而进程中的多个线程共享进程状态以及内存 和其他资源 • 进程具有单独的地址空间,而线程共享其地址空间 • 进程仅通过系统提供的进程间通信机制进行交互 • 同一进程中线程之间的上下文切换通常比进程之间的上下文切换发生得更快 线程与进程的优缺点包括: 9
📄 Page
10
1 线程 • 线程的资源消耗更少:使用线程,应用程序可以使用比使用多个进程时更少的资 源来运行。 • 线程简化共享和通信:与需要消息传递或共享内存机制来执行进程间通信的进程 不同,线程可以通过它们已经共享的数据,代码和文件进行通信。 • 线程可以使进程崩溃:由于线程共享相同的地址空间,线程执行的非法操作可能 会使整个进程崩溃;因此,一个行为异常的线程可能会中断应用程序中所有其他线 程的处理。 图 1.1. 进程与线程 更有一些编程语言,比如 SmallTalk、Ruby、Lua、Python等,还会有协程(英语:coroutine) 更小的调度单位。协程非常类似于线程。但是协程是协作式多任务的,而线程典型是抢 占式多任务的。这意味着协程提供并发性而非并行性。使用抢占式调度的线程也可以实 现协程,但是会失去某些好处。Go语言实现了 Goroutine的最小调度单元,虽然官方 不把它和 coroutine等同,因为 goroutine实现了独特的调度和执行机制,但是你可以 大致把它看成和协程是一类的东西。 还有一类更小的调度单元叫纤程(英语:Fiber),它是一种最轻量化的线程。它是一种 用户态线程(user thread),让应用程序可以独立决定自己的线程要如何运作。操作系 统内核不能看见它,也不会为它进行调度。就像一般的线程,纤程有自己的寻址空间。 但是纤程采取合作式多任务(Cooperative multitasking),而线程采取先占式多任务 (Pre-emptive multitasking)。应用程序可以在一个线程环境中创建多个纤程,然后手 动执行它。纤程不会被自动执行,必须要由应用程序自己指定让它执行,或换到下一个 纤程。跟线程相比,纤程较不需要操作系统的支持。实际上也有人任务认为纤程也属于 协程,因为这两个并没有一个严格的定义,或者说含义在不同的人不同的场景下也有所 区别,所以不同的人有不同的理解,比如新近 Java 19终于发布的特性,有人叫它纤程, 有人叫它协程。 不管怎么说,Rust实现并发的基本单位是线程,虽然也有一些第三方的库,比如 PingCAP 的黄旭东实现了 Stackful coroutine 库 (may) 和 coroutine,甚至有一个 RFC(RFC 2033: Experimentally add coroutines to Rust)关注它,但是目前 Rust并发实现主流 还是使用线程来实现,包括最近实现的 async/await特性,运行时还是以线程和线程池 的方式运行。所以作为 Rust并发编程的第一章,我们重点还是介绍线程的使用。 10
📄 Page
11
1.1 创建线程 1.1 创建线程 Rust标准库std::thread crate提供了线程相关的函数。正如上面所说,一个 Rust程序 执行的会启动一个进程,这个进程会包含一个或者多个线程,Rust 中的线程是纯操作 的系统的线程,拥有自己的栈和状态。线程之间的通讯可以通过 channel,就像 Go语 言中的 channel 的那样,也可以通过一些同步原语)。这个我们会在后面的章节中在做 介绍。 1 pub fn start_one_thread() { 2 let handle = thread::spawn(|| { 3 println!("Hello from a thread!"); 4 }); 5 6 handle.join().unwrap(); 7 } 这段代码我们通过 thread.spawn在当前线程中启动了一个新的线程,新的线程简单的 输出 Hello from a thread文本。 如果在 main函数中调用这个 start_one_thread函数,控制台中会正常看到这段输出 文本,但是如果注释掉 handle.join.unwrap();那一句的话,有可能期望的文本可能 不会被输出,原因是当主程序退出的时候,即使这些新开的线程也会强制退出,所以有时 候你需要通过 join等待这些线程完成。如果忽略 thread::spawn返回的 JoinHandle 值,那么这个新建的线程被称之为 detached,通过调用 JoinHandle的 join方法,调 用者就不得不等待线程的完成了。 这段代码我们直接使用 handle.join().unwrap() , 事实上 join() 返回的是 Result 类型,如果线程 panicked了,那么它会返 Err ,否则它会返回 Ok(_) ,这就有意思了, 调用者甚至可以得到线程最后的返回值: 1 pub fn start_one_thread_result() { 2 let handle = thread::spawn(|| { 3 println!("Hello from a thread!"); 4 200 5 }); 6 7 match handle.join() { 8 Ok(v) => println!("thread result: {}", v), 9 Err(e) => println!("error: {:?}", e), 10 } 11 } 下面这段代码是启动了多个线程: 1 pub fn start_two_threads() { 2 let handle1 = thread::spawn(|| { 3 println!("Hello from a thread1!"); 4 }); 5 6 let handle2 = thread::spawn(|| { 11
📄 Page
12
1 线程 7 println!("Hello from a thread2!"); 8 }); 9 10 handle1.join().unwrap(); 11 handle2.join().unwrap(); 12 } 但是如果启动 N个线程呢?可以使用一个 Vector保存线程的 handle: 1 pub fn start_n_threads() { 2 const N: isize = 10; 3 4 let handles: Vec<_> = (0..N) 5 .map(|i| { 6 thread::spawn(move || { 7 println!("Hello from a thread{}!", i); 8 }) 9 }) 10 .collect(); 11 12 for handle in handles { 13 handle.join().unwrap(); 14 } 15 } 1.2 Thread Builder 通过 Builder你可以对线程的初始状态进行更多的控制,比如设置线程的名称、栈大大 小等等。 1 pub fn start_one_thread_by_builder() { 2 let builder = thread::Builder::new() 3 .name("foo".into()) // set thread name 4 .stack_size(32 * 1024); // set stack size 5 6 let handler = builder 7 .spawn(|| { 8 println!("Hello from a thread!"); 9 }) 10 .unwrap(); 11 12 handler.join().unwrap(); 13 } 它提供了 spawn 开启一个线程,同时还提供了 spawn_scoped 开启 scoped thread (下面会讲),一个实验性的方法 spawn_unchecked ,提供更宽松的声明周期的绑定,调 用者要确保引用的对象丢弃之前线程的 join一定要被调用,或者使用 ‘static声明周 期,因为是实验性的方法,我们不做过多介绍,一个简单的例子如下: 12
📄 Page
13
1.3 当前的线程 1 #![feature(thread_spawn_unchecked)] 2 use thread; 3 4 let builder = Builder::new() 5 ; 6 let x = 1; 7 let thread_x = 8 &x; 9 let handler = unsafe { 10 builder.spawn_unchecked(move || { 11 println!("x = {}", *thread_x); 12 }).unwrap() 13 }; 14 15 // caller has to ensure ‘join()‘ is called, otherwise 16 // it is possible to access freed memory if ‘x‘ gets 17 // dropped before the thread closure is executed! 18 handler.join().unwrap(); 1.3 当前的线程 因为线程是操作系统最小的调度和运算单元,所以一段代码的执行隶属于某个线程。如 何获得当前的线程呢?通过 thread::current() 就可以获得,它会返回一个 Thread 对象,你可以通过它获得线程的 ID和 name: 1 pub fn current_thread() { 2 let current_thread = thread::current(); 3 println!( 4 "current thread: {:?},{:?}", 5 current_thread.id(), 6 current_thread.name() 7 ); 8 9 let builder = thread::Builder::new() 10 .name("foo".into()) // set thread name 11 .stack_size(32 * 1024); // set stack size 12 13 let handler = builder 14 .spawn(|| { 15 let current_thread = thread::current(); 16 println!( 17 "child thread: {:?},{:?}", 18 current_thread.id(), 19 current_thread.name() 20 ); 21 }) 22 .unwrap(); 23 24 handler.join().unwrap(); 25 } 13
📄 Page
14
1 线程 甚至,你还可以通过它的 unpark方法,唤醒被阻塞 (parked)的线程: 1 use std::thread; 2 use std::time::Duration; 3 4 let parked_thread = thread::Builder::new() 5 .spawn(|| { 6 println!("Parking thread"); 7 thread::park(); 8 println!("Thread unparked"); 9 }) 10 .unwrap(); 11 12 thread::sleep(Duration::from_millis(10)); 13 14 println!("Unpark the thread"); 15 parked_thread.thread().unpark(); 16 17 parked_thread.join().unwrap(); park和 unpark用来阻塞和唤醒线程的方法,利用它们可以有效的利用 CPU,让暂时不 满足条件的线程暂时不可执行。 1.4 并发数和当前线程数 并发能力是一种资源,一个机器能够提供并发的能力值,这个数值一般等价于计算机拥 有的 CPU数(逻辑的核数),但是在虚机和容器的环境下,程序可以使用的 CPU核数 可能受到限制。你可以通过 available_parallelism获取当前的并发数: 1 use {io, thread}; 2 3 fn main() -> Result<()> { 4 let count = thread::available_parallelism().unwrap().get(); 5 assert!(count >= 1_usize); 6 7 Ok(()) 8 } affinity (不支持 MacOS) crate可以提供当前的 CPU核数: 1 let cores: Vec<usize> = (0..affinity::get_core_num()).step_by(2).collect(); 2 println!("cores : {:?}", &cores); 更多的场景下,我们使用 num_cpus获取 CPU的核数(逻辑核): 1 use num_cpus; 2 let num = num_cpus::get(); 如果想获得当前进程的线程数,比如在一些性能监控收集指标的时候,你可以使 用 num_threads crate, 实际测试 num_threads 不支持 windows,所以你可以使用 14
📄 Page
15
1.5 sleep和 park thread-amount代替。(Rust生态圈就是这样,有很多功能相同或者类似的 crate,你可 能需要花费时间进行评估和比较, 不像 Go 生态圈,优选标准库的包,如果没有,生态 圈中一般会有一个或者几个高标准的大家公认的库可以使用。相对而言,Rust 生态圈 就比较分裂,这一点在选择异步运行时或者网络库的时候感受相当明显。) 1 let count = thread::available_parallelism().unwrap().get(); 2 println!("available_parallelism: {}", count); 3 4 if let Some(count) = num_threads::num_threads() { 5 println!("num_threads: {}", count); 6 } else { 7 println!("num_threads: not supported"); 8 } 9 10 let count = thread_amount::thread_amount(); 11 if !count.is_none() { 12 println!("thread_amount: {}", count.unwrap()); 13 } 14 15 let count = num_cpus::get(); 16 println!("num_cpus: {}", count); 1.5 sleep 和 park 有时候我们我们需要将当前的业务暂停一段时间,可能是某些条件不满足,比如实现 spinlock, 或者是想定时的执行某些业务,如 cron 类的程序,这个时候我们可以调用 thread::sleep函数: 1 pub fn start_thread_with_sleep() { 2 let handle1 = thread::spawn(|| { 3 thread::sleep(Duration::from_millis(2000)); 4 println!("Hello from a thread3!"); 5 }); 6 7 let handle2 = thread::spawn(|| { 8 thread::sleep(Duration::from_millis(1000)); 9 println!("Hello from a thread4!"); 10 }); 11 12 handle1.join().unwrap(); 13 handle2.join().unwrap(); 14 } 它至少保证当前线程 sleep 指定的时间。因为它会阻塞当前的线程,所以不要在异步 的代码中调用它。如果时间设置为 0,不同的平台处理是不一样的,Unix类的平台会立 即返回,不会调用 nanosleep 系统调用,而 Windows 平台总是会调用底层的 Sleep 系统调用。如果你只是想让渡出时间片,你不用设置时间为 0,而是调用 yield_now函 数即可: 15
📄 Page
16
1 线程 1 pub fn start_thread_with_yield_now() { 2 let handle1 = thread::spawn(|| { 3 thread::yield_now(); 4 println!("yield_now!"); 5 }); 6 7 let handle2 = thread::spawn(|| { 8 thread::yield_now(); 9 println!("yield_now in another thread!"); 10 }); 11 12 handle1.join().unwrap(); 13 handle2.join().unwrap(); 14 } 如果在休眠时间不确定的情况下,我们想让某个线程休眠,将来在某个事件发生之后, 我们再主动的唤醒它,那么就可以使用我们前面介绍的 park和 unpark方法了。 你可以认为每个线程都有一个令牌 ( token ),最初该令牌不存在: • thread::park将阻塞当前线程,直到线程的令牌可用。 此时它以原子操作的使用令牌。thread::park_timeout 执行相同的操作,但允 许指定阻止线程的最长时间。和 sleep不同,它可以还未到超时的时候就被唤醒。 • thread.upark方法以原子方式使令牌可用(如果尚未可用)。由于令牌初始不存 在,unpark会导致紧接着的 park调用立即返回。 1 pub fn thread_park2() { 2 let handle = thread::spawn(|| { 3 thread::sleep(Duration::from_millis(1000)); 4 thread::park(); 5 println!("Hello from a park thread in case of unpark first!"); 6 }); 7 8 handle.thread().unpark(); 9 10 handle.join().unwrap(); 11 } 如果先调用 unpark ,接下来的那个 park会立即返回: 1 如果预先调用一股脑的 unpark多次,然后再一股脑的调用 park行不行,如下所示: 1 ```rust 2 let handle = thread::spawn(|| { 3 thread::sleep(Duration::from_millis(1000)); 4 thread::park(); 5 thread::park(); 16
📄 Page
17
1.6 scoped thread 6 thread::park(); 7 println!("Hello from a park thread in case of unpark first!"); 8 }); 9 handle.thread().unpark(); 10 handle.thread().unpark(); 11 handle.thread().unpark(); 12 handle.join().unwrap(); 13 ``` 答案是不行。因为一个线程只有一个令牌,这个令牌或者存在或者只有一个,多次调用 unpark 也是针对一个令牌进行的的操作,上面的代码会导致新建的那个线程一直处于 parked状态。 依照官方的文档,park函数的调用并不保证线程永远保持 parked状态,调 用者应该小心这种可能性。 1.6 scoped thread thread::scope 函数提供了创建 scoped thread 的可能性。scoped thread 不 同于上面我们创建的 thread, 它可以借用 scope 外部的非 ‘static’ 数据。使用 thread::scope 函数提供的 Scope 的参数,可以创建 (spawn) scoped thread。创 建出来的 scoped thread如果没有手工调用 join ,在这个函数返回前会自动 join。 1 pub fn wrong_start_threads_without_scoped() { 2 let mut a = vec![1, 2, 3]; 3 let mut x = 0; 4 5 thread::spawn(move || { 6 println!("hello from the first scoped thread"); 7 dbg!(&a); 8 }); 9 thread::spawn(move || { 10 println!("hello from the second scoped thread"); 11 x += a[0] + a[2]; 12 }); 13 println!("hello from the main thread"); 14 15 // After the scope, we can modify and access our variables again: 16 a.push(4); 17 assert_eq!(x, a.len()); 18 } 这段代码是无法编译的,因为线程外的 a没有办法 move到两个 thread中,即使 move 到一个 thread,外部的线程也没有办法再使用它了。为了解决这个问题,我们可以使用 scoped thread: 1 pub fn start_scoped_threads() { 2 let mut a = vec![1, 2, 3]; 3 let mut x = 0; 17
📄 Page
18
1 线程 4 5 thread::scope(|s| { 6 s.spawn(|| { 7 println!("hello from the first scoped thread"); 8 dbg!(&a); 9 }); 10 s.spawn(|| { 11 println!("hello from the second scoped thread"); 12 x += a[0] + a[2]; 13 }); 14 println!("hello from the main thread"); 15 }); 16 17 // After the scope, we can modify and access our variables again: 18 a.push(4); 19 assert_eq!(x, a.len()); 20 } 这里我们调用了 thread::scope函数,并使用 s参数启动了两个 scoped thread,它 们使用了外部的变量 a和 x。因为我们对 a只是读,对 x只有单线程的写,所以不用考 虑并发问题。thread::scope返回后,两个线程已经执行完毕,所以外部的线程又可以 访问变量了。标准库的 scope 功能并没有进一步扩展,事实上我们可以看到,在新的 scoped thread, 我们是不是还可以启动新的 scope 线程? 这样实现类似 java 一样的 Fork-Join父子线程。不过如果你有这个需求,可以通过第三方的库实现。 1.7 ThreadLocal ThreadLocal 为 Rust 程 序 提 供 了 thread-local storage 的 实 现。 TLS(thread-local storage) 可以存储数据到全局变量中,每个线程都有这个存 储变量的副本,线程不会分享这个数据,副本是线程独有的,所以对它的访问不需要 同步控制。Java 中也有类似的数据结构,但是 Go 官方不建议实现 goroutine-local storage。 thread-local key拥有它的值,并且在线程退出此值会被销毁。我们使用 thread_local! 宏创建 thread-local key,它可以包含’static的值。它使用 with访问函数去访问值。如 果我们想修值,我们还需要结合 Cell和 RefCell ,这两个类型我们后面同步原语章节中 再介绍,当前你可以理解它们为不可变变量提供内部可修改性。 一个 ThreadLocal例子如下: 1 pub fn start_threads_with_threadlocal() { 2 thread_local!(static COUNTER: RefCell<u32> = RefCell::new(1)); 3 4 COUNTER.with(|c| { 5 *c.borrow_mut() = 2; 6 }); 7 8 let handle1 = thread::spawn(move || { 9 COUNTER.with(|c| { 18
📄 Page
19
1.8 Move 10 *c.borrow_mut() = 3; 11 }); 12 13 COUNTER.with(|c| { 14 println!("Hello from a thread7, c={}!", *c.borrow()); 15 }); 16 }); 17 18 let handle2 = thread::spawn(move || { 19 COUNTER.with(|c| { 20 *c.borrow_mut() = 4; 21 }); 22 23 COUNTER.with(|c| { 24 println!("Hello from a thread8, c={}!", *c.borrow()); 25 }); 26 }); 27 28 handle1.join().unwrap(); 29 handle2.join().unwrap(); 30 31 COUNTER.with(|c| { 32 println!("Hello from main, c={}!", *c.borrow()); 33 }); 34 } 在这个例子中,我们定义了一个 Thread_local key: COUNTER。在外部线程和两个子 线程中使用 with修改了 COUNTER,但是修改 COUNTER只会影响本线程。可以看 到最后外部线程输出的 COUNTER的值是 2,尽管两个子线程修改了 COUNTER的值 为 3和 4。 1.8 Move 在前面的例子中,我们可以看到有时候在调用 thread::spawn 的时候,有时候会使用 move,有时候没有使用 move。 使不使用 move依赖相应的闭包是否要获取外部变量的所有权。如果不获取外部变量的 所有权,则可以不使用 move , 大部分情况下我们会使用外部变量,所以这里 move 更 常见: 1 pub fn start_one_thread_with_move() { 2 let x = 100; 3 4 let handle = thread::spawn(move || { 5 println!("Hello from a thread with move, x={}!", x); 6 }); 7 8 handle.join().unwrap(); 9 10 let handle = thread::spawn(move || { 19
📄 Page
20
1 线程 11 println!("Hello from a thread with move again, x={}!", x); 12 }); 13 handle.join().unwrap(); 14 15 let handle = thread::spawn(|| { 16 println!("Hello from a thread without move"); 17 }); 18 handle.join().unwrap(); 19 } 当我们在线程中引用变量 x时,我们使用了 move ,当我们没引用变量,我们没使用 move 。 这里有一个问题,move 不是把 x 的所有权交给了第一个子线程了么,为什么第二个子 线程依然可以 move并使用 x呢? 这是因为 x 变量是 i32 类型的,它实现了 Copy trait, 实际 move 的时候实际复制它的 的值,如果我们把 x替换成一个未实现 Copy的类型,类似的代码就无法编译了,因为 x的所有权已经转移给第一个子线程了: 1 pub fn start_one_thread_with_move2() { 2 let x = vec![1, 2, 3]; 3 4 let handle = thread::spawn(move || { 5 println!("Hello from a thread with move, x={:?}!", x); 6 }); 7 8 handle.join().unwrap(); 9 10 let handle = thread::spawn(move|| { 11 println!("Hello from a thread with move again, x={:?}!", x); 12 }); 13 handle.join().unwrap(); 14 15 let handle = thread::spawn(|| { 16 println!("Hello from a thread without move"); 17 }); 18 handle.join().unwrap(); 19 20 } 1.9 控制新建的线程 从上面所有的例子中,我们貌似没有办法控制创建的子线程,只能傻傻等待它的执行或 者忽略它的执行,并没有办法中途停止它,或者告诉它停止。Go创建的 goroutine也有 类似的问题,但是 Go提供了 Context.WithCancel和 channel,父 goroutine可以传 递给子 goroutine信号。Rust也可以实现类似的机制,我们可以使用以后讲到的 mpsc 或者 spsc 或者 oneshot 等类似的同步原语进行控制,也可以使用这个 crate:thread- control: 20
The above is a preview of the first 20 pages. Register to read the complete e-book.