Last Updated: 2023-09-15 05:22:31 Friday
-- TOC --
我一直认为我对多线程编程可以信手拈来,直到我开始总结这篇博文。
Race Conditions
are a result of uncontrolled access to shared resource. When the wrong access pattern happens, something unexpected results. (condition这里不是条件,而是a state of a particular time,例如:heart condition
)The term “process runs” which is used fairly often, is inaccurate. Processes don’t run – processes manage. Threads are the ones that execute code and technically run.
import threading
在Python中创建多线程,使用标准库中的threading模块,在创建线程的时候,有一个非常鬼魅的参数,叫daemon,它默认为False,官方的解释如下:
daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False. (我们当然也不能改变main thread的daemon属性值)
这段说明并没有解释daemon线程到底是啥?表面上的测试分析,可以得出下面的结论:
当daemon=True时,主线程结束,会导致所有daemon=True的线程一并强制结束。反之,当daemon=False时,主线程结束,Python进程不会结束,解释器不会退出,直到所有的daemon=False的线程自己执行结束。
我对daemon的理解:
以上Python多线程的行为规则,是建立在daemon这个词的含义所表达的设计理念上的。我理解daemon表示一直在后台运行的线程,一般用于辅助主线程的执行,它一般不会频繁启停,启动后不会轻易退出,由于其辅助的定位,main线程如果执行结束,daemon线程也自然没有存在的必要了;而非daemon线程,就是那些不需要一直在后台辅助运行的worker线程,它干完活后自己就结束,非daemon线程有自己的特殊使命,它不会因为main线程执行结束而退出,因此Python解释器要等待非daemon线程自己完成使命后,才会退出。
不管是否daemon,在python中,这些线程都可以join!threading模块中,没有joinable这样的判断接口。
C/C++中的线程,没有daemon这个概念,但有detach概念,detach后的线程,同样会随主线程的退出而终止,但不可以再join。
A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument. (当只剩下daemon线程时,Python程序结束,daemon线程很可能“死得难看”)
Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.
一段来自MIT教程对daemon进程的解释
Most computers have a series of processes that are always running in the background rather than waiting for a user to launch them and interact with them. These processes are called daemons and the programs that run as daemons often end with a
d
to indicate so. For example,sshd
,systemd
.
调试线程时,容易出现RuntimeError。
一个常见的原因是:主线程退出后,相关资源已经被释放了,但是其它daemon线程的退出滞后了,或者非daemon线程还在运行中,这时线程如果要访问已经被主线程释放了的资源,可能就会出现这个异常。常见于tkinter GUI程序。
在看windows内核相关资料的时候,读到:mutex的另一个名称是mutant,mutex is pessimistic,only one thread can get it!
互斥量mutex,可能是最简单也最常用的线程间同步方式了。
Python使用互斥量的方法如下:
import threading
mutex = threading.Lock()
...
if mutex.locked():
...
mutex.acquire(blocking=True, timeout=-1) # default parameter
# critical section
mutex.release()
# or
with mutex:
# critical section
【Python使用全局变量的坑】mutex一般都是全局变量,或处于一个相关线程都能够直接访问的namespace。对于mutex的操作仅仅是调用其接口,不是赋值,因此直接在线程代码中访问时安全的。Python的动态特性,带来了使用变量而无需申明,因此对于普通的全局变量,在函数或线程中赋值时,必须要先global申明,否则就编程了创建一个新的local变量!这里容易出bug...如果不是赋值操作,而仅仅是访问其值,就不存在这个问题。
如果acquire
设置为blocking=False, It is forbidden to specify a timeout when blocking is False. 此时如果无法lock mutex,acquire return False。
多mutex防止deadlock的经验
多业务流程需要多个mutex的时候,注释代码申请多个mutex的顺序,如果每个线程都能够按照一个固定的顺序申请这多个mutex,可以有效防止deadlock!
可重入的mutex,当一个存在递归的线程需要获取mutex的时候,此时就要用RLock,因为recursion会造成同一个线程重复acqure这个mutex。在此线程中,acquire的次数要与最后release的次数相同。
import time
import threading
rlock = threading.RLock()
def test_rlock():
rlock.acquire()
rlock.acquire()
rlock.release()
rlock.acquire()
rlock.release()
rlock.release()
print('done')
th = threading.Thread(target=test_rlock, args=())
th.start()
th.join()
正式的recursive接口,应该不会出现上面测试代码中的acquire和release pattern。
计算机科学史上最古老的同步机制,著名的PV操作操作对象,就是semaphore信号量。semaphore与mutex最大的区别,在于前者有一个counter,对应了某个资源的数量,每次acquire,counter--,如果acquire时counter==0,默认block。而release会让counter++(也可以+=n)。
sem = threading.Semaphore(5) # defalut value is 1, just like a mutex
sem.acquire(blocking=True, timeout=None) # default parameters
...
sem.release(n=1) #
release
接口有个参数n
,含义:Release a semaphore, incrementing the internal counter by n. When it was zero on entry and other threads are waiting for it to become larger than zero again, wake up n of those threads. 可以启动同时释放n个资源,同时唤醒n个等待的线程。
同样,当blocking=False的时候,timeout无意义。(未测试)
普通semaphore在代码执行过程中中,counter的值可以超过初始值,BoundedSemaphore对象表示,counter的值不能操作初始值,否则raise。
多个线程可以wait某个event,一旦event被set,这些wait的线程就开始运行。
wait2go = threading.Event() # default status is unset, so can be wait
wait2go.is_set() # True is set
wait2go.set() # set the event
wait2go.clear() # make event to be unset
wait2go.wait(timeout=None) # return True only if event is set
表示经过一个特定的时间后,线程开始执行。
import time
import threading
def hello(a,b,c,*,d=9):
print("hello",a,b,c,d)
# threading.Timer(interval, function, args=None, kwargs=None)
t = threading.Timer(3, hello, (1,2,3),{'d':4} )
t.start() # after 3 seconds, "hello" and a,b,c,d will be printed
#time.sleep(1)
#t.cancel()
time.sleep(10)
cancel
接口可以取消还处于等待状态的timer线程,如果timer线程已经开始执行,调用cancel将没有任何效果。
线程间复杂交互,就要用到条件变量了。跟mutex相比,condition有如下特点:
示例1
下面是一个猜数字的双线程示例:
import random
import threading
what = -1
cond = threading.Condition()
def guess():
global what
while True:
with cond:
if what == 100:
break
what = random.randint(0,9)
print('guess', what)
cond.notify()
cond.wait()
def checker():
global what
num = random.randint(0,9)
while True:
with cond:
if what == num:
print("bingo! num: ", num)
what = 100
cond.notify()
break
print("not correct")
cond.notify()
cond.wait()
t1 = threading.Thread(target=guess, args=())
t2 = threading.Thread(target=checker, args=())
t2.start()
t1.start()
t1.join()
t2.join()
一次运气较好的测试输出:
$ python3 test.py
guess 5
not correct
guess 2
not correct
guess 4
not correct
guess 2
not correct
guess 8
bingo! num: 8
这段代码,特意让t2先start,有what == -1
这部分的判断,所有很OK。同时,特别注意两个线程中的cond.wait()
调用,如果没有wait,这两个线程的调用是随机的,由于都是死循环,会出现很多空转的情况,浪费CPU!
示例2
下面是一个M个生产者和N个消费者的示例:
import threading
from time import sleep
product = -1
con = threading.Condition()
class Producer(threading.Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
global product
with con:
while True:
if product == -1:
product = 200
print(self.name, 'init to 200')
elif product < 100:
product += 100
print(self.name, '+100')
con.notify_all()
con.wait()
class Consumer(threading.Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
global product
while True:
with con:
if product <= 60:
con.notify_all()
con.wait()
else:
product -= 10
print(self.name, '-10')
sleep(1)
for i in range(2):
p = Producer('producer [%d]' %i)
p.start()
for i in range(4):
c = Consumer('consumer [%d]' %i)
c.start()
sleep(0.6)
这段代码用的全是notify_all()
,因为condition的机制不能指定唤醒具体哪一个线程,代码逻辑必须保证在producer中要唤醒consumer,在consumer中要唤醒producer,这会带来一点点性能浪费,但有更好的方式吗?
每个barrier在创建时,会设置一个线程数量(parties)
,只有调用barrier.wait的线程数量达到这个数字时,所有wait的线程才会同时开闸放行。比如,有一个线程负责初始化,初始化完成后才调用wait,而其它线程一开始就wait住。比如下面的测试代码:
import threading
barrier = threading.Barrier(5)
def init():
global value
value = 9
i = barrier.wait()
print('init', i)
def worker():
i = barrier.wait()
print(i, value)
for i in range(4):
th = threading.Thread(target=worker, args=())
th.start()
th = threading.Thread(target=init, args=())
th.start()
barrier.wait
的返回值是一个integer,0 -- parties-1,每个线程都不同,可以利用这个数字来做点特别的事情,比如设定某个数字对应的线程,干点只需要在一个线程中做就可以的事情。
场景也可以反过来,某一个线程一定要等到其它线程都执行结束之后,才能开始执行。Barrier objects in python are used to wait for a fixed number of thread to complete execution before any particular thread can proceed forward with the execution of the program. 比如下面的测试代码:
import threading
barrier = threading.Barrier(101)
mutex = threading.Lock()
value = 0
def add():
global value
with mutex:
value += 1
barrier.wait()
def show():
barrier.wait()
print(value)
th = threading.Thread(target=show, args=())
th.start()
for i in range(100):
th = threading.Thread(target=add, args=())
th.start()
还有的场景,穿过屏障的条件可能没有成功创建,此时,abort
接口可以将barrier的状态设置为broken,此时wait接口会抛出异常(threading.BrokenBarrierError),若要恢复barrier,需reset
接口,重新开始,此接口也会让wait抛出异常。
下面这个测试,设定一个parties为2的barrier,在多个init线程中,只有有一个init得到随机数0,就表示init失败,然后abort,重新来过:
import random
import threading
barrier = threading.Barrier(2)
print(barrier.parties)
def init():
if random.randint(0,9) == 0:
barrier.abort()
def go():
while True:
try:
barrier.wait()
print('ok')
break
except threading.BrokenBarrierError:
print('barrier aborted...')
continue
th = threading.Thread(target=go, args=())
th.start()
init_num =12
while True:
for i in range(init_num):
th = threading.Thread(target=init, args=())
th.start()
for i in range(init_num):
th.join()
if barrier.broken == True:
barrier.reset()
continue
barrier.wait()
break
运气好的一次输出:
2
barrier aborted...
barrier aborted...
barrier aborted...
barrier aborted...
ok
import threading
t_local = threading.local()
def add():
for i in range(100000):
t_local.x += 1
def go():
if not hasattr(t_local,'x'):
print('no t_local.x, set to 0')
t_local.x = 0
add()
print(t_local.x)
for i in range(10):
t = threading.Thread(target=go, args=())
t.start()
在每个线程的执行序列中,t_local都是一个全新的thread local对象,没有x这个属性。每个线程中的t_local.x,都是自己的,使用时不需要互斥,相互之间也没有关联。
输出:
no t_local.x, set to 0
no t_local.x, set to 0
no t_local.x, set to 0
100000
100000
no t_local.x, set to 0
100000
no t_local.x, set to 0
no t_local.x, set to 0
no t_local.x, set to 0
no t_local.x, set to 0
100000
no t_local.x, set to 0
100000
no t_local.x, set to 0
100000
100000
100000
100000
100000
模块concurrent.futures
可以创建一个ThreadPoolExecutor
。
Python的threading模块没有提供读写锁(允许多个读线程并行,但只能有一个写线程),那就自己来搞吧。读写锁的应用场景:读取线程很多,只是偶尔写一下,使用读写锁可以提高性能,但写入时可能会稍慢,因为要等待所有读取线程释放锁。
下面是来自网络上的一个读写锁的简易实现:
import threading
class RWLock_ReadFirst():
def __init__(self):
self._lock = threading.Lock()
self._extra = threading.Lock()
self.read_num = 0
def read_acquire(self):
with self._extra:
self.read_num += 1
if self.read_num == 1:
self._lock.acquire()
def read_release(self):
with self._extra:
self.read_num -= 1
if self.read_num == 0:
self._lock.release()
def write_acquire(self):
self._lock.acquire()
def write_release(self):
self._lock.release()
这个实现的缺陷是:写线程会因为一直存在读线程而长时间得不到调用。这叫做读优先
。
我希望当出现写线程后,读线程就不能再获取锁,读线程的引用计数不能再增加,只能释放,减小其引用计数到0,然后写线程完成写操作,之后才可以继续读。这叫写优先
,如果一直有写线程,读取进程就得不到调用,写操作如此频繁,就已经不适合使用读写锁了。实现如下:
import time
import threading
class RWLock_WriteFirst():
def __init__(self):
self._lock = threading.Lock()
self._read = threading.Lock()
self._write = threading.Lock()
self.read_num = 0
self.write_num = 0
def read_acquire(self):
while True:
with self._write:
if self.write_num != 0:
flag = 1
else:
flag = 0
with self._read:
self.read_num += 1
if self.read_num == 1:
self._lock.acquire()
if flag:
time.sleep(0.1)
else:
break
def read_release(self):
with self._read:
self.read_num -= 1
if self.read_num == 0:
self._lock.release()
def write_acquire(self):
with self._write:
self.write_num += 1
self._lock.acquire()
def write_release(self):
self._lock.release()
with self._write:
self.write_num -= 1
# test code
a = 1
rwlock = RWLock_WriteFirst()
def read_thread(n):
while True:
rwlock.read_acquire()
print(a,n)
rwlock.read_release()
time.sleep(0.1)
def write_thread(n):
global a
while True:
time.sleep(1)
rwlock.write_acquire()
a += 1
print('+1', n)
time.sleep(1)
rwlock.write_release()
for i in range(32):
th = threading.Thread(target=read_thread, args=(i,))
th.start()
time.sleep(0.1)
for i in range(1):
th = threading.Thread(target=write_thread, args=(i,))
th.start()
time.sleep(0.1)
上面的测试代码,在release之后sleep一下下,否则会导致其它线程得不到调度。
看起来这个实现也存在问题,读进程要通过sleep和循环来等待,空耗CPU,用condition variable应该更好。下面是一个实现:
import time
import threading
class RWLock_WriteFirst():
def __init__(self):
self._lock = threading.Lock()
self._read = threading.Lock()
self._write = threading.Lock()
self._condition = threading.Condition()
self.read_num = 0
self.write_num = 0
def read_acquire(self):
while True:
with self._write:
if self.write_num != 0:
flag = 1
else:
flag = 0
with self._read:
self.read_num += 1
if self.read_num == 1:
self._lock.acquire()
if flag:
with self._condition:
self._condition.wait()
else:
break
def read_release(self):
with self._read:
self.read_num -= 1
if self.read_num == 0:
self._lock.release()
def write_acquire(self):
with self._write:
self.write_num += 1
self._lock.acquire()
def write_release(self):
self._lock.release()
with self._write:
self.write_num -= 1
if self.write_num == 0:
with self._condition:
self._condition.notify_all()
3个mutex,1个condition,看起来好吓人....当多个读线程和一个写线程在同时竞争_write这个lock的时候,谁能拿到就要看Python的线程调度了,一旦写线程拿到,就开始“清场”...
C语言的标准中没有提供多线程库,需要操作系统提供多线程库的支持,比如POSIX系统中的pthread
库。
Linux下有两套POSIX threads的实现,早期的LinuxThreads
,和现在的NPTL(Native POSIX Threads Library)
。他们接口相同,基本都是pthread_*
命名,现在基本只使用NPTL实现,连接时要指定-lpthread
。
Both of these are so-called 1:1
implementations, meaning that each thread maps to a kernel scheduling entity. 这两种POSIX threads都采用1:1线程模型,即每个创建的线程,在kernel中,都是一个可以独立参与调度的task。
A single process can contain multiple threads, all of which are executing the same program. These threads share the same global memory (data and heap segments), but each thread has its own stack (automatic variables). 线程间共享全局变量,共享heap区域
(线程中的malloc,都在统一的heap中进行),但拥有自己的stack
和thread local data
。
查看系统中NPTL的版本:
$ getconf GNU_LIBPTHREAD_VERSION
NPTL 2.35
关于NPTL更多信息,man 7 pthreads
来一段基础测试代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
int thread_rtv;
void func(char **args) {
printf("child thread is running...\n");
printf("get args: %s, %d\n", args[0], *(int*)args[1]);
thread_rtv = 99;
pthread_exit((void*)&thread_rtv);
}
int main(void) {
char *p = "abcde";
int q = 12345;
char *args[] = {p, (char*)&q};
int *prtv;
pthread_t tid;
pthread_create(&tid, NULL, (void*)&func, args);
pthread_join(tid, (void*)&prtv);
printf("thread exit code: %d\n", (int)*prtv);
return 0;
}
这段测试代码,创建了一个线程,给线程传了两个入参,调用join等待线程执行结束,并且获取线程的exit code。输出如下:
$ gcc -Wall -Wextra test.c -o test -lpthread
$ ./test
child thread is running...
get args: abcde, 12345
thread exit code: 99
说明:
很迷惑人的接口是pthread_detach
,这个接口有啥作用:可以不用join了!但主线程结束后,这些detach的线程照样完蛋。
When a detached thread terminates, its resources are automatically released back to the system without the need for another thread to join with the terminated thread.
The detached attribute merely determines the behavior of the system when the thread terminates; it does not prevent the thread from being terminated if the process terminates using exit(3) (or equivalently, if the main thread returns).
我是这样理解的:detach的线程,其占用的系统资源可以自动被回收;而非detach的线程资源,在join的时候回收。(join的时候可以得到线程的exit code)
Python中daemon线程的概念,在pthread中是没有的!detach与daemon是不同的概念。这意味着,pthread线程,不管是否detach,主线程结束,就立即嗝屁!
C11标准增加了<threads.h>
头文件,从语言层面支持了多线程,可提高代码的可移植性。在Linux下,新的接口就是pthread接口的封装。(貌似Windows的VS不支持这个header file,Windows下要用C++)。
看到Github上有人自己封装的threads.h。
C++编程时,可以使用与C相同的多线程库,还可以用C++标准库中的<thread>
库(从C++11开始)。
与C一样,C++的多线程,也没有daemon这个概念,有detach和join!。
测试代码:
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
void func(int a, string s) {
while (1) {
cout << "in func...";
cout << a << " " << s << endl << flush;
sleep(1);
}
}
int main(void) {
thread th(func, 123, "abc");
th.join();
return 0;
}
C++的thread库,使用比pthread要简单。
我一直感觉C++比C要严格,在多线程的时候,C++的thread与pthread有一个区别,C++非detach的线程必须join,否则异常。以上代码,如果去掉join,就会aborted!
那些没法或不需要join的线程,在C++中,都要detach!
C++的复杂,在于对象。线程入口传递参数,显然也可以传递复杂对象,或复杂对象的引用,或指向复杂对象的指针。而线程入口参数的传递,特殊的地方,在于线程有自己独立的stack,对象参数传递默认走copy+move的方式。
测试代码:
#include <cstdio>
#include <iostream>
#include <thread>
using namespace std;
struct xyz {
xyz(void) = default;
xyz(xyz&) {
cout << "copy\n";
}
xyz(xyz&&) {
cout << "move\n";
}
};
void func(xyz x) {
printf("%p\n", &x);
}
int main(void) {
xyz x;
printf("%p\n", &x);
thread th(func, x);
th.join();
return 0;
}
输出如下:
$ g++ -Wall -Wextra test.cpp -o test
$ ./test
0x7ffcda2f943f
copy
move
0x7f4f7cc0bd7f
在线程入口直接传递对象,copy --> move
从代码流程看,C++线程创建即执行,没有start这类调用。新线程和创建线程的执行序列开始并发执行,因此对象先copy到别处,以免创建线程的执行序列修改了对象的状态,再move进线程stack。
一般函数调用传对象,只有copy。
如果需要在线程入口传递对象引用,需要使用std::ref
这个调用。请看如下测试代码:
#include <cstdio>
#include <iostream>
#include <thread>
using namespace std;
struct xyz {
xyz(void) = default;
xyz(xyz&) {
cout << "copy\n";
}
xyz(xyz&&) {
cout << "move\n";
}
};
void func(xyz& x) { //!!
printf("%p\n", &x);
}
int main(void) {
xyz x;
printf("%p\n", &x);
thread th(func, ref(x)); //!!
th.join();
return 0;
}
线程入口参数申明的修改,是毫无疑问的。而线程入口传递对象引用时,需要调用std::ref
,否则编译报错。这与一般函数接口间传递引用不一样,一般就是直接写对象名称即可。从输出可以看出,这是同一个对象:
0x7ffd3a51af47
0x7ffd3a51af47
线程入口传递对象引用,大多也是为性能考虑,但要特别注意对象在什么地方被销毁!因为传递之后,对象就可以在至少两个执行序列中被使用。如果要避免同一个对象潜在地在多个执行序列中被使用,就要使用move,请继续看测试代码:
#include <cstdio>
#include <iostream>
#include <thread>
using namespace std;
struct xyz {
xyz(void) = default;
xyz(xyz&) {
cout << "copy\n";
}
xyz(xyz&&) {
cout << "move\n";
}
};
void func(xyz x) { //!!
printf("%p\n", &x);
}
int main(void) {
xyz x;
printf("%p\n", &x);
thread th(func, move(x)); //!!
th.join();
return 0;
}
输出:
0x7ffe39480cbf
move
move
0x7f9eccbb5d7f
连续两次move,原执行序列中的对象,不可再使用。
其实,传递对象指针可能是比较省心的做法,只是需要注意对象销毁的位置,这要根据场景来确定。下面这段测试代码,就是在线程中销毁对象,而原执行序列,仅仅是将指针赋值为nullptr,这确保了对象只能在线程中使用,也只能而且必须在线程中销毁。
#include <cstdio>
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
struct xyz {
~xyz(void) {
cout << "die\n";
}
};
void func(xyz* x) {
delete x; //!!
}
int main(void) {
xyz* x { new xyz };
thread th(func, x);
x = nullptr; //!!
th.detach();
sleep(1); // wait to see print out in detached thread
return 0;
}
成员函数的特点:第1个隐藏的参数是this指针!因此,在以类成员函数作为线程入口并创建线程时,第1个参数是this。
void you::sending_loop(){...}
you::you(){
std::thread th {&you::sending_loop, this}; // & is must
th.detach();
}
或者显式地传入对象的地址:
you y;
std::thread th {&you::sending_loop, &y};
#include <mutex>
mutex
lock()
,线程调用该函数会发生下面3种情况:(1). 如果该互斥量当前没有被lock,则调用线程将该互斥量锁住,直到调用unlock。(2). 如果当前互斥量已被其他线程lock,则当前线程被阻塞。(3). 如果当前互斥量已被当前线程lock,则会产生死锁deadlock。unlock()
,释放对互斥量的所有权。 try_lock()
,尝试锁住互斥量,如果互斥量已被其他线程占有,则当前线程也不会被阻塞。线程调用该函数会出现下面3种情况,(1). 如果当前互斥量没有被其他线程lock,则该线程锁住互斥量,直到unlock。(2). 如果当前互斥量已被其他线程lock,则当前调用线程返回false,并不会被阻塞。(3). 如果当前互斥量已被当前调用线程锁住,则会产生死锁deadlock。recursive_mutex,可重入锁
同一个线程,可以多次lock recursive mutex,而不会出现deadlock!在设计某些thread-safe对象时,这种互斥量非常有用。多次lock,一定要有对应的至少那么多次的unlock。
timed_mutex,recursive_timed_mutex
本文链接:https://cs.pynote.net/sf/202210211/
-- EOF --
-- MORE --