- 先從 Unix/Linux 系統的 fork 調用來看,MacOS 是類 Unix 所以 fork 調用是支持的,在 Python 可以直接用
os
module 創建子程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
import os
import time
print(f'程序 ({os.getpid()}) 開始...')
print("開始呼叫 os.fork()")
print("======================")
pid = os.fork()
print(f"呼叫 os.fork() 後的結果,現在 return : {pid}")
if pid == 0:
print("pid == 0, 子程序返回: ", pid)
print(f'子程序說話: 我是子程序 ({os.getpid()}) 然後我的父程序是 ({os.getppid()}).')
else:
print("pid != 0, 父程序返回子程序 pid: ", pid)
print(f'原本程序說話: 我是程序 ({os.getpid()}) 我創造了子程序 ({pid}).')
print("======父程序睡五秒=======")
time.sleep(5)
print("========最後一行, 現在是:", pid, "=============")
# 輸出會像這樣
"""
程序 (19810) 開始...
開始呼叫 os.fork()
======================
呼叫 os.fork() 後的結果,現在 return : 19826
pid != 0, 父程序返回子程序 pid: 19826
原本程序說話: 我是程序 (19810) 我創造了子程序 (19826).
======父程序睡五秒=======
呼叫 os.fork() 後的結果,現在 return : 0
pid == 0, 子程序返回: 0
子程序說話: 我是子程序 (19826) 然後我的父程序是 (19810).
========最後一行, 現在是: 0 =============
"""
# 並在等待一下後,才會出現
"""
========最後一行, 現在是: 19826 =============
"""
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# 假設我們在呼叫 os.fork() 後 time.sleep(20) 看一下 `ps -f`
# 會發現實際真的有 父程序和子程序這兩個 pid
import os
import time
print(f'程序 ({os.getpid()}) 開始...')
pid = os.fork()
time.sleep(20)
if pid == 0:
print("pid == 0, 子程序返回: ", pid)
print(f'子程序說話: 我是子程序 ({os.getpid()}) 然後我的父程序是 ({os.getppid()}).')
else:
print("pid != 0, 父程序返回子程序 pid: ", pid)
print(f'原本程序說話: 我是程序 ({os.getpid()}) 我創造了子程序 ({pid}).')
print("========最後一行, 現在是:", pid, "=============")
# 輸出會像這樣
"""
程序 (83196) 開始...
pid == 0, 子程序返回: 0
子程序說話: 我是子程序 (83216) 然後我的父程序是 (83196).
========最後一行, 現在是: 0 =============
pid != 0, 父程序返回子程序 pid: 83216
原本程序說話: 我是程序 (83196) 我創造了子程序 (83216).
========最後一行, 現在是: 83216 =============
"""
|
ps -f
:
UID PID PPID C STIME TTY TIME
501 83196 11135 0 3:55PM ttys017 0:00.04
501 83216 83196 0 3:55PM ttys017 0:00.00
- 雖然 MacOS 有 fork 調用,但在 windows 沒有 fork 調用,那如何做到跨平台,使用 python 的標準函式庫裡的
multiprocessing
模組就可以支援:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
"""啟動一個子程序(進程)(process)並等待其結束然後再 print 出 "子程序結束"
"""
from multiprocessing import Process
import os
# 子程序要做的事
def child_procss(name):
print("跑子程序", name, "pid 是:", os.getpid())
print(1)
print(2)
print(3)
if __name__ == '__main__':
print("主程序", os.getpid())
p = Process(target=child_procss, args=('print123',))
print("子程序將要開始")
p.start()
p.join() # 加 join 是因為如此的話 可以等待子程序(進程)結束後再繼續往下運行
print("子程序結束")
"""
主程序 67494
子程序將要開始
跑子程序 print123 pid 是: 67511
1
2
3
子程序結束
"""
|
- 同樣也是標準函式庫
multiprocessing
使用裡面的 Pool()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
"""啟動大量程序(進程)(process), 使用 Pool
"""
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('跑任務 %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('任務 %s 跑了 %0.2f 秒' % (name, (end - start)))
if __name__=='__main__':
print('主程序開始 %s.' % os.getpid())
p = Pool(35)
for i in range(35):
p.apply_async(long_time_task, args=(i,))
print('apply_async() 非同步, 正在等待所有子程序從運行直到完畢...')
p.close() # join 前要加 close
p.join() # 加 join 是因為如此的話,可以等待全部子程序(進程)結束後再繼續往下運行
print('所有子程序結束。')
|
close() 必須加,不加的話會 raise error
raise ValueError("Pool is still running")
ValueError: Pool is still running
加了的話意思就是避免其他 Task 再進來這個 Pool, 同時確保所有 Job 做完後,子程序都會關閉。
join() 必須加,為了確保主程序關閉之前,子程序都要跑完,所以加這個。
不加的話,一旦主程序結束,子程序也就沒了。所以通常前面一會放 close() 或是你想直接終止的時候就會放 terminate(),總之 join() 就是確保等待子程序關閉。
p = Pool(8)
可以發現我設定 p = Pool(8)
表示設定我這個 Pool 可以同時跑 8 個 process
,如果我不設定 by default 就是當下 CPU 守備範圍內的核心,通常看 CPU 核心就能推估。或著用 python 標準函式庫裡的 module 可以看目前電腦核心數量:
1
2
3
4
5
6
7
|
import multiprocessing
cpu_amount = multiprocessing.cpu_count()
print(cpu_amount)
import os
cpu_amount = os.cpu_count()
print(cpu_amount)
|
1
2
3
|
p = Pool()
p.map(func, arg) # 會確保子程序都結束,才會跑下面一行,也就是阻塞在這
print("阻塞") # 這行會在子程序都結束後才會印出來
|
1
2
3
4
|
# 假設有 return,可直接拿到 因為 map 是 return `list`
p = Pool()
results = p.map(func, arg)
print(results)
|
1
2
3
4
5
|
p = Pool()
p.map_async(func, arg) # 不會停在這,也就是說下一行不會等子程序結束
print("不阻塞") #直接印出來
pool.close()
pool.join()
|
1
2
3
4
5
6
|
# 假設有 return,要呼叫 get(), 因為 map_async 是 return `multiprocessing.pool.MapResult`
p = Pool()
results = p.map_async(func, arg)
print(results.get())
pool.close()
pool.join()
|
1
2
3
4
|
p = Pool()
p.starmap(func, arg) # 這裡的 arg 必須是 iterable, 如果是 list,則裡面的 element 也必須是 iterable, 而 map() 的話 list 裡面不用是 iterable
pool.close()
pool.join()
|
- chunksize : 每個子程序可以處理的任務數量
1
2
|
p = Pool()
p.map(func, arg, chunksize=10)
|
1
2
3
4
5
6
7
8
|
def print_func(pool_outputs):
print("callback", pool_outputs)
p = Pool()
pool_outputs = p.map_async(func, arg, callback=print_func)
print(pool_outputs.get())
pool.close()
pool.join()
|
- maxtasksperchild : 設定每個子程序完成多少任務後,就會再重啟新的子程序
1
|
p = Pool(5, maxtasksperchild=50)
|
ex1:
1
2
3
4
5
6
7
|
import subprocess
print('$ curl https://www.google.com')
r = subprocess.call(['curl', 'https://www.google.com'])
print('Exit code:', r)
# 效果和 `curl https://www.google.com` 一樣
|
ex2:
1
2
3
4
5
6
7
|
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
# 效果和 `nslookup www.python.org` 一樣
|
ex1:
1
2
3
4
5
6
7
8
|
import subprocess
print('$ date -R')
p = subprocess.Popen(['date', '-R'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate()
print(output)
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
|
ex2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\twitter.com\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
"""
$ nslookup
> set q=mx
twitter.com
exit
"""
# 輸出
"""
$ nslookup
Server: 192.168.1.1
Address: 192.168.1.1#53
Non-authoritative answer:
twitter.com mail exchanger = 20 alt2.aspmx.l.google.com.
twitter.com mail exchanger = 30 aspmx2.googlemail.com.
twitter.com mail exchanger = 10 aspmx.l.google.com.
twitter.com mail exchanger = 30 aspmx3.googlemail.com.
twitter.com mail exchanger = 20 alt1.aspmx.l.google.com.
Authoritative answers can be found from:
Exit code: 0
"""
|
subprocess.run() 是建立在 subprocess.Popen() 之上
1
2
3
4
5
6
|
import subprocess
with open('output.txt', 'w') as f:
subprocess.run(['date', '-R'], stdout=f)
# 如此 output.txt 就會產生
|
1
2
3
4
5
6
7
8
|
import subprocess
result = subprocess.run(['date', '-R'], stdout=subprocess.PIPE)
print(result)
print("------")
print(result.stdout)
print("------")
print(result.stdout.decode('utf-8'))
|
subprocess.check_output() 更簡化了 subprocess.run()
1
2
3
4
5
|
import subprocess
result = subprocess.check_output(['date', '-R'])
print(result)
print(result.decode("utf-8"))
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
from multiprocessing import Process, Queue
import os
import time
# 資料寫進 Queue
def write(q):
print(os.getpid(), "子程序寫入")
for value in ['A', 'B', 'C']:
print(f'放 {value} 到 queue...')
q.put(value)
time.sleep(1)
# 從 Queue 拿資料
def read(q):
print(os.getpid(), "子程序讀取")
while True:
value = q.get(True)
print(f'拿到 {value} 來自 queue.')
if __name__ == '__main__':
# 主程序創造 Queue, 傳給兩個子程序
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子程序pw,寫入:
pw.start()
# 啟動子程序pr,讀取:
pr.start()
# 等待 pw 結束:
pw.join()
# pr 程序是 無窮loop,所以等待 pw 結束後,就把它終止:
pr.terminate()
# 輸出
"""
92372 子程序寫入
放 A 到 queue...
92373 子程序讀取
拿到 A 來自 queue.
放 B 到 queue...
拿到 B 來自 queue.
放 C 到 queue...
拿到 C 來自 queue.
"""
|
以上,結束。
有參考網路一些例子,在這裡針對基本使用統整紀錄一下。
結束!