當前位置:
首頁 > 知識 > Python多進程編程

Python多進程編程


Linux編程

點擊右側關注,免費入門到精通!

作者丨未知


https://www.cnblogs.com/kaituorensheng/p/4445418.html



序. multiprocessing

python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助這個包,可以輕鬆完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。



1. Process



創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。 方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。


屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。



例1.1:創建函數並將其作為單個進程


import

 multiprocessing

import

 time

def

 

worker

(interval)

:


    n = 

5


    

while

 n > 

0

:
        print(

"The time is {0}"

.format(time.ctime()))
        time.sleep(interval)
        n -= 

1

if

 __name__ == 

"__main__"

:
    p = multiprocessing.Process(target = worker, args = (

3

,))
    p.start()
    

print

 

"p.pid:"

, p.pid
    

print

 

"p.name:"

, p.name
    

print

 

"p.is_alive:"

, p.is_alive()



結果

p

.pid

: 8736

p

.name

Process-1


p

.is_alive

True


The

 

time

 

is

 

Tue

 

Apr

 21 20

:55:12

 2015

The

 

time

 

is

 

Tue

 

Apr

 21 20

:55:15

 2015

The

 

time

 

is

 

Tue

 

Apr

 21 20

:55:18

 2015

The

 

time

 

is

 

Tue

 

Apr

 21 20

:55:21

 2015

The

 

time

 

is

 

Tue

 

Apr

 21 20

:55:24

 2015



例1.2:創建函數並將其作為多個進程


import

 multiprocessing

import

 time

def

 

worker_1

(interval)

:


    

print

 

"worker_1"


    time.sleep(interval)
    

print

 

"end worker_1"

def

 

worker_2

(interval)

:


    

print

 

"worker_2"


    time.sleep(interval)
    

print

 

"end worker_2"

def

 

worker_3

(interval)

:


    

print

 

"worker_3"


    time.sleep(interval)
    

print

 

"end worker_3"

if

 __name__ == 

"__main__"

:
    p1 = multiprocessing.Process(target = worker_1, args = (

2

,))
    p2 = multiprocessing.Process(target = worker_2, args = (

3

,))
    p3 = multiprocessing.Process(target = worker_3, args = (

4

,))

    p1.start()
    p2.start()
    p3.start()

    print(

"The number of CPU is:"

 + str(multiprocessing.cpu_count()))
    

for

 p 

in

 multiprocessing.active_children():
        print(

"child   p.name:"

 + p.name + 

" p.id"

 + str(p.pid))
    

print

 

"END!!!!!!!!!!!!!!!!!"




結果


The

 

number

 

of

 

CPU

 

is

:4


child

   

p

.name

:Process-3

    

p

.id7992


child

   

p

.name

:Process-2

    

p

.id4204


child

   

p

.name

:Process-1

    

p

.id6380


END

!!!!!!!!!!!!!!!!!

worker_1


worker_3


worker_2


end

 

worker_1


end

 

worker_2


end

 

worker_3




例1.3:將進程定義為類


import multiprocessing
import time

class

 

ClockProcess

(

multiprocessing

.

Process

):


    

def

 

__init__

(

self

, interval)

:
        multiprocessing.Process.__init_

_

(

self

)
        

self

.interval = interval

    

def

 

run

(

self

)

:
        n = 

5


        

while

 n > 

0

:
            print(

"the time is {0}"

.format(time.ctime()))
            time.sleep(

self

.interval)
            n -= 

1

if

 __name_

_

 == 

"__main__"

:
    p = ClockProcess(

3

)
    p.start()      



註:進程p調用start()時,自動調用run()



結果


the

 

time

 

is

 

Tue

 

Apr

 21 20

:31:30

 2015

the

 

time

 

is

 

Tue

 

Apr

 21 20

:31:33

 2015

the

 

time

 

is

 

Tue

 

Apr

 21 20

:31:36

 2015

the

 

time

 

is

 

Tue

 

Apr

 21 20

:31:39

 2015

the

 

time

 

is

 

Tue

 

Apr

 21 20

:31:42

 2015   



例1.4:daemon程序對比結果1.4-1 不加daemon屬性


import multiprocessing
import 

time

def worker(interval):
    

print

(

"work start:{0}"

.

format

(

time

.ctime()));
    

time

.sleep(interval)
    

print

(

"work end:{0}"

.

format

(

time

.ctime()));

if

 __name__ == 

"__main__"

:
    p = multiprocessing.Process(target = worker, args = (

3

,))
    p.start()
    

print

 

"end!"




結果


end

!

work

 

start

:Tue

 

Apr

 21 21

:29:10

 2015

work

 

end

:Tue

 

Apr

 21 21

:29:13

 2015



1.4-2 加上daemon屬性


import

 multiprocessing

import

 time

def

 

worker

(interval)

:


    print(

"work start:{0}"

.format(time.ctime()));
    time.sleep(interval)
    print(

"work end:{0}"

.format(time.ctime()));

if

 __name__ == 

"__main__"

:
    p = multiprocessing.Process(target = worker, args = (

3

,))
    p.daemon = 

True


    p.start()
    

print

 

"end!"




結果


end

!



註:因子進程設置了daemon屬性,主進程結束,它們就隨著結束了。



1.4-3 設置daemon執行完結束的方法


import

 multiprocessing

import

 time

def

 

worker

(interval)

:


    print(

"work start:{0}"

.format(time.ctime()));
    time.sleep(interval)
    print(

"work end:{0}"

.format(time.ctime()));

if

 __name__ == 

"__main__"

:
    p = multiprocessing.Process(target = worker, args = (

3

,))
    p.daemon = 

True


    p.start()
    p.join()
    

print

 

"end!"




結果


work

 

start

:Tue

 

Apr

 21 22

:16:32

 2015

work

 

end

:Tue

 

Apr

 21 22

:16:35

 2015

end

!



2. Lock



當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的衝突。


import multiprocessing
import sys

def 

worker_with

(

lock

, f

):
    with 

lock

:
        fs 

= open(f, 

"a+"

)
        n = 

10


        

while

 n > 

1

:
            fs.write(

"Lockd acquired via with
"

)
            n -= 

1


        fs.close()

def 

worker_no_with

(

lock

, f

):
    

lock

.

acquire

()
    

try

:
        fs 

= open(f, 

"a+"

)
        n = 

10


        

while

 n > 

1

:
            fs.write(

"Lock acquired directly
"

)
            n -= 

1


        fs.close()
    

finally

:
        

lock

.release()

if

 __name__ == 

"__main__"

:
    

lock

 = multiprocessing.Lock()
    f = 

"file.txt"


    w = multiprocessing.Process(target = worker_with, args=(

lock

, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(

lock

, f))
    w.start()
    nw.start()
    print 

"end"




結果(輸出文件)


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lockd acquired via 

with


Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly



3. Semaphore



Semaphore用來控制對共享資源的訪問數量,例如池的最大連接數。


import

 multiprocessing

import

 time

def

 

worker

(s, i)

:


    s.acquire()
    print(multiprocessing.current_process().name + 

"acquire"

);
    time.sleep(i)
    print(multiprocessing.current_process().name + 

"release
"

);
    s.release()

if

 __name__ == 

"__main__"

:
    s = multiprocessing.Semaphore(

2

)
    

for

 i 

in

 range(

5

):
        p = multiprocessing.Process(target = worker, args=(s, i*

2

))
        p.start()



結果


Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-5acquire
Process-3release

Process-4acquire
Process-5release

Process-4release



4. Event



Event用來實現進程間同步通信。


import

 multiprocessing

import

 time

def

 

wait_for_event

(e)

:


    print(

"wait_for_event: starting"

)
    e.wait()
    print(

"wairt_for_event: e.is_set()->"

 + str(e.is_set()))

def

 

wait_for_event_timeout

(e, t)

:


    print(

"wait_for_event_timeout:starting"

)
    e.wait(t)
    print(

"wait_for_event_timeout:e.is_set->"

 + str(e.is_set()))

if

 __name__ == 

"__main__"

:
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = 

"block"

,
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = 

"non-block"

,
            target = wait_for_event_timeout,
            args = (e, 

2

))
    w1.start()
    w2.start()

    time.sleep(

3

)

    e.set()
    print(

"main: event is set"

)



結果


wait_for_event: starting


wait_for_event_timeout:starting


wait_for_event_timeout:e.is_set->False


main: event is set


wairt_for_event: e.is_set()->True




5. Queue



Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。



get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。Queue的一段示例代碼:


import

 multiprocessing

def

 

writer_proc

(q)

:

      
    

try

:         
        q.put(

1

, block = 

False


    

except

:         
        

pass

   

def

 

reader_proc

(q)

:

      
    

try

:         
        

print

 q.get(block = 

False


    

except

:         
        

pass

if

 __name__ == 

"__main__"

:
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()



結果


1



6. Pipe



Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1隻負責接受消息,conn2隻負責發送消息。



send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會拋出EOFError。


import

 multiprocessing

import

 time

def

 

proc1

(pipe)

:


    

while

 

True

:
        

for

 i 

in

 xrange(

10000

):
            

print

 

"send: %s"

 %(i)
            pipe.send(i)
            time.sleep(

1

)

def

 

proc2

(pipe)

:


    

while

 

True

:
        

print

 

"proc2 rev:"

, pipe.recv()
        time.sleep(

1

)

def

 

proc3

(pipe)

:


    

while

 

True

:
        

print

 

"PROC3 rev:"

, pipe.recv()
        time.sleep(

1

)

if

 __name__ == 

"__main__"

:
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[

0

],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[

1

],))
    

#p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    

#p3.start()

    p1.join()
    p2.join()
    

#p3.join()




結果



7. Pool



在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。 Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來它。



例7.1:使用進程池(非阻塞)


#coding: utf-8


import

 multiprocessing

import

 time

def

 

func

(msg)

:


    

print

 

"msg:"

, msg
    time.sleep(

3

)
    

print

 

"end"

if

 __name__ == 

"__main__"

:
    pool = multiprocessing.Pool(processes = 

3

)
    

for

 i 

in

 xrange(

4

):
        msg = 

"hello %d"

 %(i)
        pool.apply_async(func, (msg, ))   

#維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去

    

print

 

"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"


    pool.close()
    pool.join()   

#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束


    

print

 

"Sub-process(es) done."




一次執行結果


mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 

0

msg: hello 

1


msg: hello 

2


end


msg: hello 

3


end


end


end


Sub

-process(es) done.



函數解釋:



apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)close() 關閉pool,使其不在接受新的任務。terminate() 結束工作進程,不在處理未完成的任務。join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。



執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事後才空出一個進程處理對象3,所以會出現輸出「msg: hello 3」出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環後直接輸出「mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~」,主程序在pool.join()處等待各個進程的結束。



例7.2:使用進程池(阻塞)


#coding: utf-8


import

 multiprocessing

import

 time

def

 

func

(msg)

:


    

print

 

"msg:"

, msg
    time.sleep(

3

)
    

print

 

"end"

if

 __name__ == 

"__main__"

:
    pool = multiprocessing.Pool(processes = 

3

)
    

for

 i 

in

 xrange(

4

):
        msg = 

"hello %d"

 %(i)
        pool.apply(func, (msg, ))   

#維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去

    

print

 

"Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"


    pool.close()
    pool.join()   

#調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束


    

print

 

"Sub-process(es) done."




一次執行的結果


msg: hello 

0


end


msg: hello 

1


end


msg: hello 

2


end


msg: hello 

3


end


Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~

Sub

-process(es) done.



例7.3:使用進程池,並關注結果


import

 multiprocessing

import

 time

def

 

func

(msg)

:


    

print

 

"msg:"

, msg
    time.sleep(

3

)
    

print

 

"end"


    

return

 

"done"

 + msg

if

 __name__ == 

"__main__"

:
    pool = multiprocessing.Pool(processes=

4

)
    result = []
    

for

 i 

in

 xrange(

3

):
        msg = 

"hello %d"

 %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    

for

 res 

in

 result:
        

print

 

":::"

, res.get()
    

print

 

"Sub-process(es) done."




一次執行結果


msg: hello 0


msg: hello 1


msg: hello 2


end
end
end

::: donehello 0


::: donehello 1


::: donehello 2


Sub-process(es) done.



例7.4:使用多個進程池


#coding: utf

-8


import multiprocessing
import 

os

time

random

def Lee():
    

print

 

"
Run task Lee-%s"

 %(

os

.getpid()) #

os

.getpid()獲取當前的進程的ID
    start = 

time

.

time

()
    

time

.sleep(

random

.

random

() * 

10

) #

random

.

random

()隨機生成

0-1

之間的小數
    

end

 = 

time

.

time

()
    

print

 

"Task Lee, runs %0.2f seconds."

 %(

end

 - start)

def Marlon():
    

print

 

"
Run task Marlon-%s"

 %(

os

.getpid())
    start = 

time

.

time

()
    

time

.sleep(

random

.

random

() * 

40

)
    

end

=

time

.

time

()
    

print

 

"Task Marlon runs %0.2f seconds."

 %(

end

 - start)

def Allen():
    

print

 

"
Run task Allen-%s"

 %(

os

.getpid())
    start = 

time

.

time

()
    

time

.sleep(

random

.

random

() * 

30

)
    

end

 = 

time

.

time

()
    

print

 

"Task Allen runs %0.2f seconds."

 %(

end

 - start)

def Frank():
    

print

 

"
Run task Frank-%s"

 %(

os

.getpid())
    start = 

time

.

time

()
    

time

.sleep(

random

.

random

() * 

20

)
    

end

 = 

time

.

time

()
    

print

 

"Task Frank runs %0.2f seconds."

 %(

end

 - start)

if

 __name__==

"__main__"

:
    function_list=  [Lee, Marlon, Allen, Frank] 
    

print

 

"parent process %s"

 %(

os

.getpid())

    pool=multiprocessing.Pool(

4

)
    

for

 func 

in

 function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中

    

print

 

"Waiting for all subprocesses done..."


    pool.

close

()
    pool.join()    #調用join之前,一定要先調用

close

() 函數,否則會出錯, 

close

()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    

print

 

"All subprocesses done."




一次執行結果


parent

 

process

 7704

Waiting

 

for

 

all

 

subprocesses

 

done

...

Run

 

task

 

Lee-6948

Run

 

task

 

Marlon-2896

Run

 

task

 

Allen-7304

Run

 

task

 

Frank-3052


Task

 

Lee

runs

 1

.59

 

seconds

.

Task

 

Marlon

 

runs

 8

.48

 

seconds

.

Task

 

Frank

 

runs

 15

.68

 

seconds

.

Task

 

Allen

 

runs

 18

.08

 

seconds

.

All

 

subprocesses

 

done

.



https://www.cnblogs.com/kaituorensheng/p/4445418.html


 推薦↓↓↓ 






??

16個技術公眾號

】都在這裡!


涵蓋:程序員大咖、源碼共讀、程序員共讀、數據結構與演算法、黑客技術和網路安全、大數據科技、編程前端、Java、Python、Web編程開發、Android、iOS開發、Linux、資料庫研發、幽默程序員等。

萬水千山總是情,點個 「

好看

」 行不行

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

如何測試你是否放下一個人?
單身怕什麼,總有一天你會遇見你喜歡的人

TAG:Python開發 |