2.2.加強篇
其實以前的 linux中是沒有線程這個概念的, windows程序員經常使用線程,這一看~方便啊,然后可能是當時程序員偷懶了,就把進程模塊改了改(這就是為什么之前說linux下的多進程編程其實沒有win下那么“重量級”),弄了個精簡版進程==> 線程(內核是分不出 進程和線程的,反正 pcb個數都是一樣)
多線程和多進程最大的不同在于,多進程中,同一個變量,各自有一份拷貝存在于每個進程中,互不影響,而多線程中,所有變量都由所有線程共享(全局變量和堆 ==> 線程間共享。進程的棧 ==> 線程平分而獨占)
還記得通過 current_thread()獲取的線程信息嗎?難道線程也沒個id啥的?一起看看:(通過 ps-Lfpid來查看LWP)

回顧:進程共享的內容:(回顧:http://www.cnblogs.com/dotnetcrazy/p/9363810.html)
代碼(.text)文件描述符(fd)內存映射(mmap)2.2.1.線程同步~互斥鎖Lock
線程之間共享數據的確方便,但是也容易出現數據混亂的現象,來看個例子:
立即學習“Python免費學習筆記(深入)”;
代碼語言:JavaScript代碼運行次數:0運行復制
from multiprocessing.dummy import Threadingnum = 0 # def global numdef test(i): print(f"子進程:{i}") global num for i in range(100000): num += 1def main(): p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) # 應該是500000,發生了數據混亂,結果少了很多if __name__ == '__main__': main()
輸出:(應該是 500000,發生了數據混亂,只剩下 358615)
代碼語言:javascript代碼運行次數:0運行復制
子進程:0子進程:1子進程:2子進程:3子進程:4452238
Lock案例
共享資源+CPU調度==>數據混亂==解決==>線程同步 這時候 Lock就該上場了
互斥鎖是實現線程同步最簡單的一種方式,讀寫都加鎖(讀寫都會串行)
先看看上面例子怎么解決調:
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import threading, Locknum = 0 # def global numdef test(i, lock): print(f"子進程:{i}") global num for i in range(100000): with lock: num += 1def main(): lock = Lock() p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num)if __name__ == '__main__': main()
輸出: time python31.thread.2.py
代碼語言:javascript代碼運行次數:0運行復制
子進程:0子進程:1子進程:2子進程:3子進程:4500000real 0m2.846suser 0m1.897ssys 0m3.159s
優化下
lock設置為全局或者局部,性能幾乎一樣。循環換成map后性能有所提升(測試案例在Code中)
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import Pool as ThreadPool, Locknum = 0 # def global numlock = Lock()def test(i): print(f"子進程:{i}") global num global lock for i in range(100000): with lock: num += 1def main(): p = ThreadPool() p.map_async(test, list(range(5))) p.close() p.join() print(num)if __name__ == '__main__': main()
輸出:
time Python31.thread.2.py
代碼語言:javascript代碼運行次數:0運行復制
子進程:0子進程:1子進程:3子進程:2子進程:4500000real 0m2.468suser 0m1.667ssys 0m2.644s
本來多線程訪問共享資源的時候可以并行,加鎖后就部分串行了(沒獲取到的線程就阻塞等了)
【項目中可以多次加鎖,每次加鎖只對修改部分加(盡量少的代碼) 】(以后會說協程和Actor模型)
補充:以前都是這么寫的,現在支持 with托管了(有時候還會用到,所以了解下):【net是直接 lock大括號包起來】
代碼語言:javascript代碼運行次數:0運行復制
#### 以前寫法:lock.acquire() # 獲取鎖try: num += 1finally: lock.release() # 釋放鎖#### 等價簡寫with lock: num += 1
擴展知識:(GIL在擴展篇會詳說)
GIL的作用:多線程情況下必須存在資源的競爭,GIL是為了保證在解釋器級別的線程唯一使用共享資源(cpu)。同步鎖的作用:為了保證解釋器級別下的自己編寫的程序唯一使用共享資源產生了同步鎖
2.2.2.線程同步~遞歸鎖RLock
看個場景:小明欠小張2000,欠小周5000,現在需要同時轉賬給他們:(規定:幾次轉賬加幾次鎖)

小明啥也沒管,直接擼起袖子就寫Code了:(錯誤Code示意)
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明轉賬2000給小張 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明轉賬5000給小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
小明寫完代碼就出去了,這可把小周和小張等急了,打了N個電話來催,小明心想啥情況?
一看代碼楞住了,改了改代碼,輕輕松松把錢轉出去了:
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000# 小明轉賬2000給小張def a_to_b(lock): global xiaoming global xiaozhang with lock: xiaoming -= 2000 xiaozhang += 2000# 小明轉賬5000給小周def a_to_c(lock): global xiaoming global xiaozhou with lock: xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(a_to_b, args=(lock, )) p.apply_async(a_to_c, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
輸出:
代碼語言:javascript代碼運行次數:0運行復制
[還錢前]小明8000,小張3000,小周5000[還錢后]小明1000,小張5000,小周10000
就這么算了嗎?不不不,不符合小明性格,于是小明研究了下,發現~還有個遞歸鎖 RLock呢,正好解決他的問題:
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把這邊換了下xiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明轉賬2000給小張 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明轉賬5000給小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = RLock() # 就把這邊換了下 p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
RLock內部維護著一個 Lock和一個counter變量, counter記錄了acquire的次數,從而使得資源可以被多次 require。直到一個線程所有的 acquire都被release,其他的線程才能獲得資源
2.2.3.死鎖引入1.多次獲取導致死鎖
小明想到了之前說的(互斥鎖 Lock讀寫都加鎖)就把代碼拆分研究了下:
代碼語言:javascript代碼運行次數:0運行復制
print("[開始]小明轉賬2000給小張")lock.acquire() # 獲取鎖xiaoming -= 2000xiaozhang += 2000print("[開始]小明轉賬5000給小周")lock.acquire() # 獲取鎖(互斥鎖第二次加鎖)xiaoming -= 5000xiaozhou += 5000lock.release() # 釋放鎖print("[結束]小明轉賬5000給小周")lock.release() # 釋放鎖print("[開始]小明轉賬2000給小張")
輸出發現:(第二次加鎖的時候,變成阻塞等了【死鎖】)
代碼語言:javascript代碼運行次數:0運行復制
[還錢前]小明8000,小張3000,小周5000[開始]小明轉賬2000給小張[開始]小明轉賬5000給小周
這種方式,Python提供的RLock就可以解決了
2.常見的死鎖
看個場景:小明和小張需要流水帳,經常互刷~ 小明給小張轉賬1000,小張給小明轉賬1000
一般來說,有幾個共享資源就加幾把鎖(小張、小明就是兩個共享資源,所以需要兩把 Lock)
先描述下然后再看代碼:
正常流程 小明給小張轉1000:小明自己先加個鎖==>小明-1000==>獲取小張的鎖==>小張+1000==>轉賬完畢
死鎖情況 小明給小張轉1000:小明自己先加個鎖==>小明-1000==>準備獲取小張的鎖。可是這時候小張準備轉賬給小明,已經把自己的鎖獲取了,在等小明的鎖(兩個人相互等,于是就一直死鎖了)
代碼模擬一下過程:
代碼語言:javascript代碼運行次數:0運行復制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的鎖z_lock = Lock() # 小張的鎖# 小明轉賬1000給小張def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock with m_lock: xiaoming -= 1000 sleep(0.01) with z_lock: xiaozhang += 1000# 小張轉賬1000給小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang}")if __name__ == '__main__': main()
輸出:(卡在這邊了)
代碼語言:javascript代碼運行次數:0運行復制
[轉賬前]小明5000,小張8000
項目中像這類的情況,一般都是這幾種解決方法:(還有其他解決方案,后面會繼續說)
按指定順序去訪問共享資源在訪問其他鎖的時候,先把自己鎖解了trylock的重試機制得不到全部鎖就先放棄已經獲取的資源
比如上面的情況,我們如果規定,不管是誰先轉賬,先從小明開始,然后再小張,那么就沒問題了。或者誰錢多就誰(權重高的優先)
代碼語言:javascript代碼運行次數:0運行復制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的鎖z_lock = Lock() # 小張的鎖# 小明轉賬1000給小張def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock # 以上次代碼為例,這邊只修改了這塊 with z_lock: # 小張權重高,大家都先獲取小張的鎖 xiaozhang += 1000 sleep(0.01) with m_lock: xiaoming -= 1000# 小張轉賬1000給小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[轉賬前]小明{xiaoming},小張{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[轉賬后]小明{xiaoming},小張{xiaozhang}")if __name__ == '__main__': main()
輸出:
代碼語言:javascript代碼運行次數:0運行復制
[轉賬前]小明5000,小張8000[轉賬后]小明5000,小張8000
2.2.4.線程同步~條件變量Condition
條件變量一般都不是鎖,只能能阻塞線程,從而減少不必要的競爭,Python內置了 RLock(不指定就是RLock)
看看源碼:
代碼語言:javascript代碼運行次數:0運行復制
class Condition: """ 實現條件變量的類。????條件變量允許一個或多個線程等到另一個線程通知它們為止????如果給出了lock參數而不是None,那必須是Lock或RLock對象作底層鎖。 否則,一個新的RLock對象被創建并用作底層鎖。 """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # 設置lock的acquire()和release()方法 self.acquire = lock.acquire self.release = lock.release
再看看可不可以進行with托管:(支持)
代碼語言:javascript代碼運行次數:0運行復制
def __enter__(self): return self._lock.__enter__()def __exit__(self, *args): return self._lock.__exit__(*args)
看個生產消費者的簡單例子:(生產完就通知消費者)
代碼語言:javascript代碼運行次數:0運行復制
from multiprocessing.dummy import Pool as ThreadPool, Conditions_list = []con = Condition()def Shop(i): global con global s_list # 加鎖保護共享資源 for x in range(5): with con: s_list.append(x) print(f"[生產者{i}]生產商品{x}") con.notify_all() # 通知消費者有貨了def User(i): global con global s_list while True: with con: if s_list: print(f"列表商品:{s_list}") name = s_list.pop() # 消費商品 print(f"[消費者{i}]消費商品{name}") print(f"列表剩余:{s_list}") else: con.wait()def main(): p = ThreadPool() # 兩個生產者 p.map_async(Shop, range(2)) # 五個消費者 p.map_async(User, range(5)) p.close() p.join()if __name__ == '__main__': main()
輸出:(list之類的雖然可以不加global標示,但是為了后期維護方便,建議加上)
代碼語言:javascript代碼運行次數:0運行復制
[生產者0]生產商品0[生產者0]生產商品1列表商品:[0, 1][消費者0]消費商品1列表剩余:[0]列表商品:[0][消費者0]消費商品0列表剩余:[][生產者0]生產商品2列表商品:[2][消費者1]消費商品2列表剩余:[][生產者0]生產商品3[生產者1]生產商品0[生產者0]生產商品4列表商品:[3, 0, 4][消費者1]消費商品4列表剩余:[3, 0][生產者1]生產商品1[生產者1]生產商品2[生產者1]生產商品3[生產者1]生產商品4列表商品:[3, 0, 1, 2, 3, 4][消費者2]消費商品4列表剩余:[3, 0, 1, 2, 3]列表商品:[3, 0, 1, 2, 3][消費者0]消費商品3列表剩余:[3, 0, 1, 2]列表商品:[3, 0, 1, 2][消費者1]消費商品2列表剩余:[3, 0, 1]列表商品:[3, 0, 1][消費者3]消費商品1列表剩余:[3, 0]列表商品:[3, 0][消費者3]消費商品0列表剩余:[3]列表商品:[3][消費者3]消費商品3列表剩余:[]
通知方法:
notify() :發出資源可用的信號,喚醒任意一條因 wait()阻塞的進程notifyAll() :發出資源可用信號,喚醒所有因wait()阻塞的進程
2.2.5.線程同步~信號量Semaphore(互斥鎖的高級版)
記得當時在分析 multiprocessing.Queue源碼的時候,有提到過(點我回顧)
同進程的一樣, semaphore管理一個內置的計數器,每當調用 acquire()時內置函數 -1,每當調用 release()時內置函數 +1
通俗講就是:在互斥鎖的基礎上封裝了下,實現一定程度的并行
舉個例子,以前使用互斥鎖的時候:(廁所就一個坑位,必須等里面的人出來才能讓另一個人上廁所)

使用信號量之后:廁所坑位增加到5個(自己指定),這樣可以5個人一起上廁所了==>實現了一定程度的并發
舉個例子:(Python在語法這點特別爽,不用你記太多異同,功能差不多基本上代碼也就差不多)
代碼語言:javascript代碼運行次數:0運行復制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Semaphoresem = Semaphore(5) # 限制最大連接數為5def goto_wc(i): global sem with sem: print(f"[線程{i}]上廁所") sleep(0.1)def main(): p = ThreadPool() p.map_async(goto_wc, range(50)) p.close() p.join()if __name__ == '__main__': main()
輸出:

可能看了上節回顧的會疑惑:源碼里面明明是 BoundedSemaphore,搞啥呢?
其實 BoundedSemaphore就比 Semaphore多了個在調用 release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將拋出一個異常