Loading...
墨滴

yuanzhoulvpi

2021/06/14  阅读:91  主题:默认主题

python的多核利用

python 的多核利用

平时也就使用 python 的处理数据的几个包做数据处理,很少使用 python 的一些其他的功能,之前一直想搞懂 python 自带的多线程/多进程包,反复看了很多次,也不太会用。这次端午节在家又好好研究了一遍,不负有心人,终于可以用起来了:大概知道这个包怎么用,怎么按照我的想法用。我一直看的都是 python 的 multiprocessing 包。这次也是我的个人学习积累。

什么是 multiprocessing

python 的官方文档是这么说的: “”“ multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool ”“”

写多线程、多进程文章太多了,我这里不再描述,大家可以去百度看看,有很多优秀的。

其实在我眼里,我不在乎multiprocessing是多线程或者多进程,只要能充分利用cpu多核性能、提高我的计算效率就行。

使用multiprocessing目的

我们用这个包要实现什么功能

  1. 我想更快的计算;
  2. 我想把计算的结果保存起来;
  3. 我想把计算过程的进度可以反馈给我;
  4. 如果遇到异常,能不能不要终止程序,继续跑;
  5. 如果程序超时,可不可以终止;

上面就是我的目的,和我的需求;如果都达到了,那我就觉得是完成了。

创建一个场景

我们现在有一个非常耗时的函数,是计算一个值的平方:2 => 2 * 2,我们设置一个函数,每次计算都要休息1秒。那么在python中可以是这么写:


def myf(x):
    time.sleep(1)
    return x * x

如果我们不使用pandas、numpy包。我们可能需要这样写:

# 在不使用numpy的时候,进行简单的并行计算

import time
from tqdm import tqdm

def myf(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    listx = range(10)
    listy = []

    for i in tqdm(listx):
        temp_value = myf(x=i)
        listy.append(temp_value)
    print('final y:')
    print(listy)

一个简单的并行计算

我们使用multiprocessing的Pool。Pool对象可以自动的将数据分享给不同进程。啥都不用管,只要传递进入数据就行。 在下面的代码中,我需要计算0到19的平方,我就设置了一个Pool对象,这个对象有4个进程,我把需要的计算的函数传递进去,需要被计算的可迭代的对象传递进去。然后数据就出来了。

⚠️:这里使用的是Pool的map函数

# 遇到一个大的数据,一个比较复杂的计算,使用并行,可以极大的提高计算效率

from multiprocessing import Pool
import time
from tqdm import tqdm

def myf(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    print('-- start run ---')
    value_x = range(20)
    P = Pool(processes=4)
    value_y = P.map(func=myf, iterable=value_x)
    print(value_y)

实际上,计算还是很快的,但是我中间遇到一个情况,就是我不知道这个计算到什么程度了,是计算到50%了,还是计算到80%?这个不清楚。

我想要展示计算进度

为了展示多核计算的进度条,可以这么做:

⚠️:这里使用的是Pool的imap_unordered函数

# 相对于02,这个可以有个变动的进度条,进度条不断的衍生,但是缺点也很明显,缺少进度完成情况。
from multiprocessing import Pool
import time
from tqdm import tqdm


def myf(x):
    time.sleep(1)
    return x * x


if __name__ == '__main__':
    # print('-- start run ---')
    value_x = range(20)
    P = Pool(processes=4)
    value_y = list(tqdm(P.imap_unordered(func=myf, iterable=value_x)))
    print(value_y)

这个虽然可以看到进度条不断的完成,但是没有办法看到完成率。

展示计算完成率

⚠️:这里使用的是Pool的apply_async函数

这里的apply_async需要的就不是一个迭代对象了,和上面的map、imap_unordered就有明显的差距,我们需要把一个迭代对象拿出来,放到apply_async里面,然后再将apply_async放到列表里面,然后在把这个东西从列表里面取出来。通过get调用他。

from multiprocessing import Pool
from tqdm import tqdm
import time


def myf(x):
    time.sleep(1)
    return x * x


if __name__ == '__main__':
    value_x= range(200)
    P = Pool(processes=4)

    # 这里计算很快
    res = [P.apply_async(func=myf, args=(i, )) for i in value_x]

    # 主要是看这里
    result = [i.get(timeout=2for i in tqdm(res)]

    print(result)

当然,我看还有人用apply_async的callback参数,和上面的对比,感觉就有点麻烦,这里展示给各位:


from multiprocessing import Pool
from tqdm import tqdm
import time


def myf(x):
    time.sleep(1)
    return x * x


if __name__ == '__main__':
    value_x= range(200)
    P = Pool(processes=20)

    pbar = tqdm(total=len(value_x))

    # 这里计算很快
    res = [P.apply_async(func=myf, args=(i, ), callback= lambda _: pbar.update(1)) for i in value_x]

    # 主要是看这里
    result = [i.get(timeout=2for i in res]

    print(result)

处理异常

多核计算最经常遇到的就是遇到一个错误,然后就跳出来,这怎么可以忍。就拿最常见的错误来说,函数运行超时怎么解决?

⚠️:这里使用的是Pool的apply_async函数和get来解决

from multiprocessing import Pool, TimeoutError
import time
from tqdm import tqdm


def myf(x):
    if x % 5 == 0:
        time.sleep(20.2)
    else:
        time.sleep(0.3)
    return x * x


def safely_get(value, timeout=2):

    try:
        data = value.get(timeout=timeout)
    except TimeoutError:
        data = 0
    return data


if __name__ == '__main__':
    P = Pool(processes=10)
    value = range(100)
    pbar = tqdm(total=len(value))

# way 2
    res_temp = [P.apply_async(func=myf, args=(i,), callback=lambda _: pbar.update(1)) for i in value]
    # result = [res.get(timeout=3) for res in res_temp]
    result = [safely_get(res, timeout=1for res in res_temp]


# way 1
#     res_temp = [P.apply_async(func=myf, args=(i,)) for i in value]
#     result = [safely_get(res, timeout=1) for res in tqdm(res_temp)]

    time.sleep(1)

    print(result)

我把这个myf函数做了简单的处理,如果是5的倍速,就需要设置休息20.2秒,但是我的容忍度是每个函数运行时间不能超过1秒,所以我写了一饿过safely_get函数,这个函数里面有try,可以破获错误,如果超时,那么整个函数也不跳出,并且把结果返回为0。

多个进程修改同一个数据

来都来了,也把这个问题说一下:

由于python的GIL,导致同一时间,不同的进程不能同时修改同一个数据。但是使用multiprocessing包的Manager的dict,list之类的就可以。

下面这两个代码就是用来将子进程的相关信息保存到一个列表里面。然后保存到pandas里面。

使用Process做的: 代码如下:


# multi sub process write data to one data (python dict)

from multiprocessing import Process, Manager
import os
import time
import pandas as pd
from tqdm import tqdm

def worker(id, save_data):
    time.sleep(1)
    save_data[id] = {
        '子进程': [os.getpid()],
        '父进程': [os.getppid()],
        '进程id': [id]
    }


if __name__ == "__main__":
    finaldata = Manager().dict()
    
    subprocess_list = []

    for i in tqdm(range(200)):
        p = Process(target=worker, args=(i, finaldata))
        subprocess_list.append(p)
        p.start()


    [p.join() for p in tqdm(subprocess_list)]

    finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()])

    print(finaldata)
    

使用Pool做的:

# multi sub process write data to one data (python dict)
# use Pool

from multiprocessing import Pool, Manager
import os
import time
import pandas as pd
from tqdm import tqdm


def worker(id, save_data):
    time.sleep(1)
    save_data[id] = {
        '子进程': [os.getpid()],
        '父进程': [os.getppid()],
        '进程id': [id]
    }


if __name__ == "__main__":
    finaldata = Manager().dict()

    P = Pool(processes=20)

    # reslist = []
    # for i in tqdm(range(200)):
    #     res = P.apply_async(func=worker, args=(i, finaldata))
    #     reslist.append(res)

    reslist = [P.apply_async(func=worker, args=(i, finaldata)) for i in range(200)]
    [res.get(timeout=200for res in tqdm(reslist)]
    finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()])

    print(finaldata)

代码

上面的代码全部都保存到我的GitHub里面了:大家可以按照顺序,运行看看: https://github.com/yuanzhoulvpi2017/tiny_python

参考资料

我刚开始也是很多都不懂,看了很多别人写的东西才看懂一点点,把一些写的好的放在这里:

  1. https://zhuanlan.zhihu.com/p/20953544

  2. https://stackoverflow.com/questions/5666576/show-the-progress-of-a-python-multiprocessing-pool-imap-unordered-call

  3. https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce

  4. https://docs.python.org/zh-cn/3/library/multiprocessing.html

  5. https://stackoverflow.com/questions/54066993/how-to-get-precise-timeout-with-python-multiprocess-apply-async

  6. https://stackoverflow.com/questions/5666576/show-the-progress-of-a-python-multiprocessing-pool-imap-unordered-call

  7. https://stackoverflow.com/questions/38711840/python-multiprocessing-pool-timeout

yuanzhoulvpi

2021/06/14  阅读:91  主题:默认主题

作者介绍

yuanzhoulvpi