• Python中的多线程编程
  • 发布于 1个月前
  • 81 热度
    1 评论
  • 林栗锦
  • 0 粉丝 2 篇博客
  •   
并行处理是一种在同一台计算机的多个处理器中同时运行任务的工作模式。 这种工作模式的目的就是减少总的任务处理时间。 在本篇文章中,您将了解使用python多进程模块对任何类型的逻辑过程进行并行处理的流程。

1. 简介
并行处理是一种在同一台计算机的多个处理器中同时运行任务的工作模式。 这种工作模式的目的就是减少总的任务处理时间。但是进程之间的通信会有额外的开销,因此对小的任务而言,总的任务时间会有所增加而不是减少。
在Python语言中,multiprocessing模块通过使用子进程(而不是线程)来运行独立的并行进程。 它可以让您利用机器上的多个处理器(Windows和Unix),也就是说,多个进程可以完全独立的在内存中运行。
学习了本教程的内容之后,您将了解:
.在使用multiprocessing进行并行处理时,如何理解语法并组织代码?
.如何实现同步和异步并行处理?
.使用multiprocessing.Pool()接口完成3个不同的用例。


2. 最多可以进行多少个并行处理?  

您一次可以运行的最大进程数受计算机中处理器数量的限制。 如果您不知道机器中有多少处理器,可以使用multiprocessing模块中的cpu_count()函数进行显示。
Python代码:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

3. 同步执行和异步执行?
在并行处理中,有两种执行类型:同步和异步。
同步执行就是各个进程按照启动的先后,顺序完成。 这是通过锁定主程序直到相应的进程运行完毕来实现的。
而异步执行,换句话说,进程的执行不涉及锁定。这样做的结果就是,进程结果返回的顺序可能会混淆,但通常情况下,异步执行会更快完成。
multiprocessing 模块中有两个对象是用来实现函数并行执行的:Pool 类和Process 类。
Pool Class
Synchronous execution
Pool.map() and Pool.starmap()
Pool.apply()
Asynchronous execution
Pool.map_async() and Pool.starmap_async()
Pool.apply_async())
Process Class

接下来,我们讨论一个典型的问题,并使用上述技术实现并行处理。在本教程中,我们将重点使用Pool类,因为它使用起来很方便,并可以满足几乎所有的并行处理需求。

4. 问题讨论:计算每行中给定数值范围内的元素个数

第一个问题:给定一个二维矩阵(或者列表和多维列表),计算每行中给定数值范围内的元素个数。我们可以在下面的列表基础上开始工作。
import numpy as np
from time import time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
data[:5]

不使用并行处理的参考代码:

我们先看看不用并行计算它需要多长时间。为此,我们对函数howmany_within_range()(如下)进行重复以检查在范围内的数有多少个并返回计数。
# Solution Without Paralleization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
5. 如何对函数进行并行化处理?

对代码进行并行处理通常的做法是取出其中可以多次运行的特定函数,将其放在不同的处理器上并行运行。

要做到这一点,就需要使用 Pool类对数目为n的处理器进行初始化,之后将想要并行运行的函数传递给Pool类中的并行方法。

multiprocessing.Pool() 中提供了 apply(), map() 和 starmap() 等方法对传入的函数并行运行。

这简直太完美了!那么apply()和 map()之间又有什么区别呢?

apply()和 map()都是把要进行并行化的函数作为主要参数。但是不同的是, apply()接受args参数, 通过args将各个参数传送给被并行化处理的函数,而map 仅将一个迭代器作为参数。

因此,对于简单的可迭代的操作,使用map()进行并行处理更适合,而且能更快完成工作。

当我们看到如何使用apply()和map()对函数howmany_within_range()进行并行化处理之后,我们还会介绍starmap()。

5.1. Pool.apply() 进行并行化处理

我们来使用multiprocessing.Pool(),对howmany_within_range() 函数进行并行化处理。
# Parallelizing using Pool.apply()
import multiprocessing as mp
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]
# Step 3: Don't forget to close
pool.close()    
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
5.2. Parallelizing using Pool.map()

Pool.map()仅接受一个迭代器参数。 为了变通起见,我把howmany_within_range函数做了修改,为 minimum 和 maximum参数设定了缺省值,并另存为新的函数 howmany_within_range_rowonly(),这个函数可以只接受行数据列表迭代器作为输入。我知道这种做法不是map()的一个最好的用法,但它已经清楚地显示出它与apply()的不同之处。
# Parallelizing using Pool.map()
import multiprocessing as mp
# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count
pool = mp.Pool(mp.cpu_count())
results = pool.map(howmany_within_range_rowonly, [row for row in data])
pool.close()
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
5.3. 使用Pool.starmap() 进行并行化

在前面的例子中,我们必须重新定义howmany_within_range函数,让其中的一对参数使用默认值。 而使用starmap(),您就能避免这样做。 你怎么问?

与Pool.map()一样,Pool.starmap()也只仅接受一个迭代器参数,但在starmap()中,迭代器种的每一个元件也是一个迭代器。你可以通过这个内部迭代器向被并行化处理的函数传递参数,在执行时再顺序解开,只要传递和解开的顺序一致就可以。

实际上,Pool.starmap()就像是一个接受参数的Pool.map()版本。
# Parallelizing with Pool.starmap()
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])
pool.close()
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
6. 异步并行处理

和同步并行处理对等的异步并行处理函数 apply_async(),map_async()和starmap_async()允许您以异步方式并行执行进程,即下一个进程可以在前一个进程完成时立即启动,而不考虑启动顺序。 因此,无法保证结果与输入的顺序相同。

6.1 使用Pool.apply_async()进行并行化

apply_async()的使用与apply()非常相似,只是你需要提供一个回调函数来告诉如何存储计算结果。

但是,使用apply_async()时需要注意的是,结果中的数字顺序会混乱,表明进程没有按照启动的顺序完成。

变通的办法就是,我们重新定义一个新的howmany_within_range2(),接受并返回迭代序号(i),然后对最终结果进行排序。
# Parallel processing with Pool.apply_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)


# Step 3: Use loop to parallelize
for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete    
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
使用apply_async()时,不提供回调函数也是可以的。只是这时候,如果您不提供回调函数,那么您将获得pool.ApplyResult对象的列表,其中包含来自每个进程的计算输出值。 从这里,您需要使用pool.ApplyResult.get()方法来得到所需的最终结果。
# Parallel processing with Pool.apply_async() without callback function
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = []
# call apply_async() without callback
result_objects = [pool.apply_async(howmany_within_range2, args=(i, row, 4, 8)) for i, row in enumerate(data)]
# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]
pool.close()
pool.join()
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

6.2 使用Pool.starmap_async()进行并行化

你已经见识了apply_async()的使用。你是否可以想象一下或者写一个 starmap_async and map_async的对应版本呢?  实现代码如下:
# Parallelizing with Pool.starmap_async()
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = []
results = pool.starmap_async(howmany_within_range2, [(i, row, 4, 8) for i, row in enumerate(data)]).get()

# With map, use `howmany_within_range_rowonly` instead
# results = pool.map_async(howmany_within_range_rowonly, [row for row in data]).get()
pool.close()
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
7. 如何对Pandas DataFrame进行并行处理?

到目前为止,您已经了解了如何通过使函数在列表上工作来进行函数并行化。
但是,在处理数据分析或机器学习项目时,您可能希望对Pandas Dataframe 进行并行化,Pandas Dataframe是除了numpy数组之外,最常用的存储表格数据对象。

在对DataFrame进行并行化时,您可以把要被并行化的函数作为输入参数:
.DataFrame的一行
.DataFrame的一列
.整个DataFrame

前两个可以使用multiprocessing本身就可以完成。 但是对于最后一个,即对整个dataframe进行并行化,我们将使用pathos包,pathos包内部使用了dill进行序列化。

首先,让我们创建一个简单的dataframe,看看如何进行逐行和逐列进行并行化。 在用户定义的函数种使用了类似pd.apply()的写法,但这是并行处理。
import numpy as np
import pandas as pd
import multiprocessing as mp
df = pd.DataFrame(np.random.randint(3, 10, size=[5, 2]))
print(df.head())
#>    0  1
#> 0  8  5
#> 1  5  3
#> 2  3  4
#> 3  4  4
#> 4  7  9
现在已经有了dataframe。之后使用hypotenuse对每一行进行处理,每次同时运行4个进程。

为了做到这一点,在下面的代码中,可以看到我们使用了df.itertuples(name=False)。设定name=False, 就可以把dataframe中的每一行作为一个简单的元组送入hypotenuse函数
# Row wise Operation
def hypotenuse(row):
    return round(row[1]**2 + row[2]**2, 2)**0.5

with mp.Pool(4) as pool:
    result = pool.imap(hypotenuse, df.itertuples(name=False), chunksize=10)
    output = [round(x, 2) for x in result]

print(output)
#> [9.43, 5.83, 5.0, 5.66, 11.4]
上面就是对dataframe每一行进行并行化的例子。我们来试试对每一列进行并行化。这里,我使用了 df.iteritems()将一列数据作为一个系列传递给sum_of_squares 函数。
# Column wise Operation
def sum_of_squares(column):
    return sum([i**2 for i in column[1]])
with mp.Pool(2) as pool:
    result = pool.imap(sum_of_squares, df.iteritems(), chunksize=10)
    output = [x for x in result]
print(output) 
#> [163, 147]
接下来是第三部分——完成一个能接收Pandas Dataframe、NumPy数组的并行化函数。Pathos遵循multiprocessing的风格:Pool > Map > Close > Join > Clear。请查看pathos docs文档以获取更多信息。
import numpy as np
import pandas as pd
import multiprocessing as mp
from pathos.multiprocessing import ProcessingPool as Pool
df = pd.DataFrame(np.random.randint(3, 10, size=[500, 2]))
def func(df):
    return df.shape
cores=mp.cpu_count()
df_split = np.array_split(df, cores, axis=0)
# create the multiprocessing pool
pool = Pool(cores)
# process the DataFrame by mapping function to each df across the pool
df_out = np.vstack(pool.map(func, df_split))
# close down the pool and join
pool.close()
pool.join()
pool.clear()
8.  练习

问题1: 使用 Pool.apply() 获取list_a和list_b每一行相同的元素
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
参考答案:
import multiprocessing as mp
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
def get_commons(list_1, list_2):
    return list(set(list_1).intersection(list_2))
pool = mp.Pool(mp.cpu_count())
results = [pool.apply(get_commons, args=(l1, l2)) for l1, l2 in zip(list_a, list_b)]
pool.close()    
print(results[:10])

问题2: 使用 Pool.map() 并行运行下面的 python代码
Python代码名称: ‘script1.py’, ‘script2.py’, ‘script3.py’
参考答案:
import os                                                                       
import multiprocessing as mp
processes = ('script1.py', 'script2.py', 'script3.py')                      
def run_python(process):                                                             
    os.system('python {}'.format(process))                                      
pool = mp.Pool(processes=3)                                                        
pool.map(run_python, processes) 

问题3: 将一个二维列表中的每一行归一化到0到1之间
list_a = [[2, 3, 4, 5], [6, 9, 10, 12], [11, 12, 13, 14], [21, 24, 25, 26]]
参考答案:
import multiprocessing as mp
list_a = [[2, 3, 4, 5], [6, 9, 10, 12], [11, 12, 13, 14], [21, 24, 25, 26]]
def normalize(mylist):
    mini = min(mylist)
    maxi = max(mylist)
    return [(i - mini)/(maxi-mini) for i in mylist]
pool = mp.Pool(mp.cpu_count())
results = [pool.apply(normalize, args=(l1, )) for l1 in list_a]
pool.close()    
print(results[:10])

9. 总结

在这篇文章中,我们看到了使用multiprocessing模块实现并行处理的整个过程和各种方法。 哪怕是在具有更多处理器数量的大型计算机上工作,上述过程也几乎相同,您可以通过并行处理获得真正的速度优势。
用户评论