Python非同步與平行處理

由於Python採用了所謂的GIL(Global Interpreter Lock)機制,因此具有易於編寫容易實現的特性,但卻從本質上限制了Python在多工環境的應用效能。

首先,我們瞭解一下什麼是GIL。

A) Global Interpreter Lock

字面翻譯為「全域解釋器鎖」, GIL最主要的限制在於它讓Python的執行緒只能在單一CPU執行,這樣的好處是:

  • 無需擔心多執行緒在同時存取修改同一記憶體位址時可能發生的問題。
  • 降低記憶體管理上的複雜度。
  • 容易編寫,也就是更加Pythonic,讓程式碼更容易閱讀。
  • 限制於單一CPU,因此其多工方式適用於IO密集型的任務,也就是單一時間只處理某一類型工作。

其實幾乎大部份編譯式語言都採用了GIL機制,例如:

  • Ruby: Ruby的MRI(Matz’s Ruby Interpreter)解釋器也使用類似的GIL概念,限制同一時間只有一個執行緒執行Ruby代碼。然而,Ruby的其他實現,如JRuby(基於Java虛擬機)和Rubinius,可能沒有相同的限制。
  • Perl: Perl解釋器在過去的版本中也存在類似的限制,不過在新版本中這方面已經有所改進。Perl的多執行緒支援相對較新,而且在許多情況下仍然受到一些限制。
  • PHP: PHP的標準解釋器在過去也有類似的單執行緒限制,但從PHP 7開始,引入了一個全新的Zend虛擬機(Zend Engine 3.0),部分解除了這種限制,並提高了多執行緒的效能。
  • JavaScript(單執行緒的瀏覽器環境): 在瀏覽器中執行的JavaScript通常是單執行緒的,這是因為JavaScript是在瀏覽器的主執行緒上執行的,因此一次只能執行一段JavaScript代碼。然而,在Node.js等環境中,可以使用多執行緒模型。

A.1) GIL對於Python的影響

讓Phthon增加了如下的限制

  • 由於限制多執行緒只能在單一CPU執行,讓Python進行平行處理的效益不彰。
  • 由於單一時間只有一個執行緒在執行,因此不適用於計算密集型的任務,或者無法發揮多核CPU的效益。
  • 需要透過其它第三方模組,如NumPy、Cython等,並搭配multi-processing(n多進程)的作法,取代多執行緒方式。

瞭解了Python因GIL限制,因此在平行處理上有先天的劣勢之後,接下來再來瞭解Python的多進程與多執行緒,透過這兩者,可彌補Python天生因GIL所受到的限制。

B) Multi-Threads與Multi-Process

Python的多進程和多執行緒是兩種不同的並行運算方法,它們各有各的特點和用途。下表是兩者的比較:

Multi-Threads

Multi-Process

執行方式

Multi-Threads是在同一Process中運行的多個threads,雖然每個thread有自己的程式計數器和堆疊,但資源和記憶體空間是共享的。

每個Process有自己獨立的記憶體空間,且彼此之間不共享資源。

GIL限制

全域解釋器鎖(GIL)限制了同一時間只有一個thread能夠執行。因此影響multi-threads在計算密集型任務上的效能。

每個Process都有自己的Python編譯器和GIL。因此,multi-process可以充分利用多核系統,在計算密集型任務上達到更好的效能。

資源使用

同一process下不同threads之間的資源是共享的。

process之間的資源是獨立的,需要透過其它的通訊機制(例如管道、共享記憶體…等)來實現彼此的資料傳遞。

適合的task

輕量不複雜的任務,或者需要共享資源且不需要額外記憶體空間的任務。

每個process都有自己的記憶體空間,因此multi-process相對於多執行緒來說較適合資源需求較高的任務。

適合場景

I/O密集型任務:在等待I/O操作完成的時候,GIL會釋放允許其他執行緒執行。這使得multi-threads會比起multi-process更有效率。

計算密集型任務:multi-process才能充分利用多CPU/核系統。

一個process可以包含數個process,而每個process又可以包含多個threads。我們應該依據應用程式的需求和特性來選擇multi-threads或multi-process,但不代表只能兩者擇一,在一些情況下,也可能同時使用兩者來以滿足不同的需求。

C) Concurrency與Parallelism

很多人容易搞糊塗Concurrency與Parallelism之間的不同,或許我們把Concurrency翻譯為「並發」而Parallelism翻譯為「並行」,會比較好解釋和理解。

並發:指的是多個task在單CPU/核心的執行能力,這些tasks以交錯方式而非同時執行,因此在單個CPU能同時運行多個tasks,重點在於如何協調各個tasks相互切換執行。

並行:指的是多個任務在多個CPU/核心的執行能力,每個tasks會分配給不同的處理單元或核心,因此一個Parallelism task可以包含數個Concurrency的tasks。重點在於將一個job分解為數個可以同時執行的較小任務。

C.1) 多發與並行的應用時機

要採用Concurrency或Parallelism,亦或選擇multi-threads或multi-processing,取決於該任務是CPU密集型還是IO密集型,CPU密集型建議採用Parallelism(multi-processing),而IO密集型建議採用Concurrency(multi-threads)。

C.1.1) CPU密集型(CPU bounded):如果增加CPU/核心數可增加執行效率,則此類的任務為CPU密集型,此類任務可分成N個jobs(或tasks)同時運作於不同的CPU/cores,並同將總執行時間減少至1/N。屬於CPU密集型的任務類型如下:

C.1.2) IO密集型(I/O bounded):對於 IO 密集型任務來說,當某個任務在等待取得IO資源時,若能將CPU時間挪給其他需要的task而非等待,將可以提高執行的效率。

D) ASYNCIO支援並發/並行處理的Package

Asyncio是Python內建的一個標準函式庫,用於設計非同步(asynchronous)的程式。它提供了一個基於協程(coroutine)的非同步 I/O 框架,支援mulit-threads以及multi-processing的非同步處理。因此,無論我們的程式是I/O bounded亦或CPU bounded,都可以利用AsyncIO來處理及控制非同步的程式。

上述中的協程(coroutine),指的是是一種特殊函式,可以暫停執行並交出控制權讓其他協程有機會執行。在非同步 I/O 中,協程通常用於表示非同步任務,例如網路通信、文件操作等。在Python程式中,我們常看到的asyncawait便是用於定義和執行協程。

範例一

from time import sleep

def hello_world():
print("Hello")
sleep(1)
print("World")

hello_world()
hello_world()
hello_world()

上面的範例中,hello_word()會先印出hello,然後等待一秒後再印出World。

範例中連續執行了三次hello_word(),所花的時間為三倍,執行結果為:

Hello
World
Hello
World
Hello
World

我們可以把程式修改一下,使用asyncio一次同時間執行三個hello_word()

import asyncio

async def hello_world():
print("Hello")
# 模擬一個非同步 I/O 操作,這裡使用 asyncio.sleep 來模擬等待操作
await asyncio.sleep(1)
print("World")

async def main():
# 使用 asyncio.gather 同時執行多個協程
await asyncio.gather(hello_world(), hello_world(), hello_world())

# 創建事件循環
loop = asyncio.get_event_loop()

# 使用 run_until_complete 啟動 main 協程
loop.run_until_complete(main())

# 關閉事件循環
loop.close()

執行結果如下:

Hello
Hello
Hello
World
World
World

上述的範例中,async的使用方式:

  • 使用async定義了兩個協程 hello_world 和 main。
  • hello_world ()中的sleep要改為使用非同步 I/O 操作的asyncio.sleep
  • main() 使用asyncio.gather來同時啟動多個hello_world協程。
  • 將main()置於asyncio.get_event_loop()執行,最後要close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

範例中可看到asyncio的基本使用方式,我們可以把要非同步的程式放在協程中執行,而使用Event loop的目的是協調非同步程式的操作及執行。

D.1) await的作用

await它會等待一個耗時的協程,例如網路請求、文件讀寫、資料庫查詢等,在這段await時間內,程式並不會受到await影響仍可以處理其他任務,從而提高程式的效能。待協程結束後,執行流程會轉到await該行繼續執行後續的程式碼。

我們可以把await想像成,它會將非同步程式的控制權轉交給同步程式,而且在等待的這段時間,整體程式並不會阻塞可以執行其它任務。

E) 應用Asyncio於影像檔案的寫入

在實際的應用中,asyncio可以用於處理大量並發的 I/O 操作,提高應用程式的效能。例如下方範例會播放一段影片,使用者可在播放時按下「P」鍵來捕捉目前的畫面存成圖片檔。程式執行時,會發現當我們如何頻繁的按P鍵來捕捉畫面,影片播放速度仍然保持順暢,不因圖片儲存工作的影響而而稍微delay的狀況。

import asyncio
import aiofiles
import cv2
import numpy as np

cam_id = r'C:\Users\ch.tseng\Videos\tcar01.mp4'
cam_id = cam_id.replace('\\', '/')
video_size = (1920,1080)
camera = cv2.VideoCapture(cam_id)
camera.set(cv2.CAP_PROP_FRAME_WIDTH, video_size[0])
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, video_size[1])
width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)) # float
height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) # float
camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
camera.set(cv2.CAP_PROP_FPS, 30)

fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')

print("USB Camera's resolution is: %d x %d" % (width, height))

async def take_photo():
print("Taking photo...")
(grabbed, frame) = camera.read()
print("Photo taken")
return frame

async def write_to_file(photo_data):
# 將圖片寫入文件
async with aiofiles.open('photo.jpg', 'wb') as file:
print("Writing to file...")
await file.write(cv2.imencode('.jpg', photo_data)[1])
print("Write to file complete")

async def main():
(grabbed, frame) = camera.read()
while grabbed is True:
cv2.imshow("Frame", frame)
k = cv2.waitKey(1)
if k == ord('p'):
# 啟動 take_photo 協程,並獲取拍照後的圖片數據
if grabbed is True:
photo_data = await take_photo()

# 使用 async with 開始非同步寫入文件操作
await write_to_file(photo_data)

(grabbed, frame) = camera.read()

# 創建事件循環
loop = asyncio.get_event_loop()

# 使用 run_until_complete 啟動 main 協程
loop.run_until_complete(main())

# 關閉事件循環
loop.close()