Loading...
墨滴

yuanzhoulvpi

2021/08/18  阅读:29  主题:橙心

python的协程和任务介绍

python的协程和任务介绍

背景

之前我写过多线程、多进程

  1. https://blog.csdn.net/yuanzhoulvpi/article/details/105315671?spm=1001.2014.3001.5501
  2. https://blog.csdn.net/yuanzhoulvpi/article/details/119678521?spm=1001.2014.3001.5501

难道现在不够用么,怎么还要写协程呢?

这样吧,我们按照需求来:

  1. 之前我们因为涉及到大量的计算机本地的I/O处理,于是我们使用线程;
  2. 之前我们因为涉及到大量的计算机本地的CPU高负荷的计算,于是为了突破python的单个cpu使用问题,我们使用进程;
  3. 这一次因为很多cpu计算和I/O处理都是在远程的服务器中(比如我们经常遇到的调用云端数据库、我们对某个网站爬虫),我们为了使用对云端的服务器保持并发的话,那我们就可以不使用线程和进程来完成我的并发操作,因为多线程和多进程是很浪费我们本地计算机的cpu和内存资源,使用多协程就可以使得我们再不浪费本地计算机资源的情况下,还能做到并发操作。这样的操作难道不香么。

具体案例

我自己有个数据看板,数据看板上的数据,需要不断的刷新,大概是每隔一小时刷新一次,怎么刷新呢,大概是有30个sql文件,这个30个文件是需要被执行,按道理来说,我有两种方案:

  1. 第一种就是使用for循环,一个一个的执行sql,但是这样太慢了,如果遇到有的sql运行卡住了,那后面的sql文件可能就执行不了了,可能30个sql文件一个小时都还没刷新完,下一次的刷新任务又启动了,这样肯定不可取。
  2. 第二种方法就是使用多进程,对30多个sql进行多进程处理,也就是30个多个sql文件同时运行,emmmmm,如果使用这种方法,我感觉我的数据库账号可能会被吊销,还有可能把数据库搞崩溃,这个责任我可担不起。
  3. 第三种也就是我现在使用的方法,就是使用多进程处理,但是我设置30个sql在每次执行的时候,同时最高有4个sql在执行,这样我防止单个sql被卡住了,不会影响后面的sql执行;同时服务器的压力也不高;

方法3确实很不错,目前已经稳定运行2个多月了,目前没有出现任何问题;但是我后来在知道协程的时候,才发现方法3依然可以被优化。被优化的地方主要是在于当计算机本地的多进程在进行切换的时候,开销很大,造成本地的计算机性能浪费。

为什么协程就是开销小呢,因为协程在python的实现中,实际上还是在一个线程中运行,只是使用了很巧妙的方法来实现。既然协程的异步请求这么有优势,为什么不使用它呢,接下来开始介绍

协程入门

part1_单个协程

下面的代码就是一个最简单的协程代码,先打印出第一个'hello',然后延迟2秒以后,在打印出'world'。

import asyncio
import datetime

# 小demo
# part 1
async def main():
    """
    part 1 简单介绍一下异步
    :return:
    """

    print('hello')
    await asyncio.sleep(2)
    print('world')

if __name__ == '__main__':
    asyncio.run(main())
    # asyncio.run(main2())
    # asyncio.run(main3())
    # asyncio.run(main4())
    # asyncio.run(main5())
    # asyncio.run(main6())
    # asyncio.run(main7())

part2_多个协程运行

需要注意的是协程函数都是使用async def 定义的,另外使用异步函数的时候,需要在异步函数前面加上await,下面这段代码首先是:先定义一个异步函数say_after,然后再使用这个函数4次。下面的代码运行时间是10秒。

import asyncio
import datetime
# part 2
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main2():
    """
    part 2
    创建一个异步程序,需要运行say_after函数4次,怎么运行
    整个过程会运行10秒
    :return:
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')

    await say_after(delay=1, what='hello')
    await say_after(delay=2, what='world')
    await say_after(delay=3, what='foo')
    await say_after(delay=4, what='bar')

    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')
    
if __name__ == '__main__':
    # asyncio.run(main())
    asyncio.run(main2())
    # asyncio.run(main3())
    # asyncio.run(main4())
    # asyncio.run(main5())
    # asyncio.run(main6())
    # asyncio.run(main7())

part3_多个协程优雅运行

在part2中,我们为了使用say_after函数,写了4行代码,而且使用花了10s去运行,那么如果让4个协程同时运行呢?方法就是:

  1. 使用asyncio的create_task函数对我们的目标函数包装一下,创建一个task;
  2. 将4个task创建好之后,再使用await来等待4个线程完成。

下面这个代码就是一个很好的案例,整个代码运行时间是只有4秒,因为最长的协程时间就是4秒。

import asyncio
import datetime

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


# part 3
# 因为part 2运行上面4个say_after函数需要10s,我们想要减少运行时间,方法1,
# 这个方法只是需要4s即可运行完4个say_after函数

async def main3():
    """
    # 因为part 2运行上面4个say_after函数需要10s,我们想要减少运行时间,方法1
    :return:
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')

    task1 = asyncio.create_task(say_after(delay=1, what='hello'))
    task2 = asyncio.create_task(say_after(delay=2, what='world'))
    task3 = asyncio.create_task(say_after(delay=3, what='foo'))
    task4 = asyncio.create_task(say_after(delay=4, what='bar'))

    await task1
    await task2
    await task3
    await task4
    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')
    
if __name__ == '__main__':
    # asyncio.run(main())
    # asyncio.run(main2())
    asyncio.run(main3())
    # asyncio.run(main4())
    # asyncio.run(main5())
    # asyncio.run(main6())
    # asyncio.run(main7())

part4_多个协程优雅运行

虽然part3让4个协程同时运行,减少了程序时间,但是还是不够优雅,因为我们有4个任务,竟然要写4行代码,那我们有100个任务要写100行代码?我记得好像有个东西叫列表推导式,如果我们使用列表推导式来做,那岂不是很简单?

下面这个代码里面,我使用了列表推导式来实现多个任务运行计算,说人话就是我吧asyncio的create_task放到列表里面了,然后再使用await解析这列表里面的每一个task,是不是很优雅。

import asyncio
import datetime

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


# part 4
# part3 虽然将part2的时间从10s减少到了4s,可是写法实在是不优雅,我不喜欢
# 那么多task1,task2,task3,task4,那我要是有很多task,我要写那么多代码,实在是太难看,
# 那么这个能不能使用列表推导式呢?可以的
async def main4():
    """
    part 4比part 3就是好在了可以使用列表推导式,代码看着更加优雅
    :return:
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')

    task = [asyncio.create_task(say_after(delay=delay, what=what)) for delay, what in
            zip([1234], ['hello''world''foo''bar'])]

    [await task_ for task_ in task]
    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')
    
if __name__ == '__main__':
    # asyncio.run(main())
    # asyncio.run(main2())
    # asyncio.run(main3())
    asyncio.run(main4())
    # asyncio.run(main5())
    # asyncio.run(main6())
    # asyncio.run(main7())

part5_多个协程优雅运行

上面的part4已经很优雅了,将8行代码转换成了2行,主要是靠列表推导式的功劳,那么我们可不可使用更加简单的代码?

也是可以的,我们将列表推导式和create_task换成了asyncio的gather函数,这样一行代码就完成了,代码如下:

import asyncio
import datetime

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


# part 5
# part 4 虽然使用列表,这样可以使用列表推导式,但是依然不够优雅:
# 因为我们每次都要创建一个task:create_task部分 来对say_after包装一下
# 那我们直接将这个crate_task拿到这个列表推导式外面应该怎么写
# 于是part 5来了
async def main5():
    """
    将create_task拿到列表外面(变成了gather)
    并且舍去列表推导式展开部分
    :return:
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')

    await asyncio.gather(*[say_after(delay=delay, what=what) for delay, what in
                           zip([1234], ['hello''world''foo''bar'])])

    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')
    
if __name__ == '__main__':
    # asyncio.run(main())
    # asyncio.run(main2())
    # asyncio.run(main3())
    # asyncio.run(main4())
    asyncio.run(main5())
    # asyncio.run(main6())
    # asyncio.run(main7())

part6_面向现实的协程和任务

上面几个part_1,2,3,4,5都是让我们可以更加优雅的解决问题,但是现实中可不是这样的,如何解决单个协程运行超时问题?如果超时了,整体的代码不能出现错误。

在下面,我创建了一个solove_timeout函数,这个函数可以控制单个协程的运行时间,如果等待时间超过3.2秒,那么这个协程将会被杀掉,但是不影响别的协程运行,代码如下:

import asyncio
import datetime

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

# part 6
# 虽然现在part 5已经基本上很好了,足够优雅到可以让我们去解决一些比较
# 麻烦、困难的问题,但是现实生活中可不是这样的,比如说:
# 每一个函数运行时间不能过长,都有时间限制,这种情况怎么办?
# 那么part 6就是来解决这个问题的
# 我们这个了假设每一个任务运行时间不能超过3秒,并且如果一个任务超时了,
# 不能出现错误,应该直接忽略,不影响别的任务运行,那这样应该怎么做

async def solove_timeout(func, timeout=1.0, **kargs):
    try:
        await asyncio.wait_for(func(**kargs), timeout=timeout)
    except asyncio.TimeoutError:
        print("Timeout!")


async def main6():
    """
    part 6
    这里加上时间限制,如果时间超过3.2秒,那么子任务就会被取消,但是不影响别的任务
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')

    await asyncio.gather(*[solove_timeout(say_after, timeout=3.2, delay=delay, what=what) for delay, what in
                           zip([1234], ['hello''world''foo''bar'])])

    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')

    
if __name__ == '__main__':
    # asyncio.run(main())
    # asyncio.run(main2())
    # asyncio.run(main3())
    # asyncio.run(main4())
    # asyncio.run(main5())
    asyncio.run(main6())
    # asyncio.run(main7())

part7_面向现实的协程和任务

part6让我们的协程不会超时运行,代码稍微改一下也就可以容纳我们的代码错误,但是回到刚开始问题,如果我们限制最大的协程数量呢,应该怎么做?

我们使用asyncio.Semaphore就可以限制最大的协程并发数量。用起来也很简单,将代码包装一下就行,下面代码就是限制了协程的最大并发数量,我在代码中设置的是2,也可对不同的协程设置不同的asyncio.Semaphore限制。

import asyncio
import datetime

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
# part 7
# part 6解决了单个子任务运行超时的问题,还有一个问题,如果限制并发的任务数量?
# 比如我有100个任务需要访问服务器(不管是爬虫还是数据库)
# 如果不作限制,这个100个任务可能会在一瞬间去访问服务器,对服务器造成很大的压力,
# 那我怎么做,可以现在并发的数量呢?
# part 7就可以解决这个问题,可以设置每个时刻,最多只有2个协程运行,
# 上一个运行完了,下一个继续补上,最终全部运行完成。

async def run_limit(func, sem, **kargs):
    async with sem:
        await func(**kargs)


async def main7():
    """
    使用asyncio.semaphore来限制最大的协程数量
    这里运行的时间是6s,因为1运行完,3开始运行,2结束的时候是4运行,最终根据最长时间的来计算,也就是6秒
    """

    start_date = datetime.datetime.now()
    print(f'Start at {start_date:%Y-%m-%d %H:%M:%S}')
    sem = asyncio.Semaphore(2)
    await asyncio.gather(*[run_limit(say_after, sem=sem, delay=delay, what=what) for delay, what in
                           zip([1234], ['hello''world''foo''bar'])])

    end_date = datetime.datetime.now()
    print(f'finished at {end_date:%Y-%m-%d %H:%M:%S}')
    print(f'total used time: {(end_date - start_date).seconds} s')
    
if __name__ == '__main__':
    # asyncio.run(main())
    # asyncio.run(main2())
    # asyncio.run(main3())
    # asyncio.run(main4())
    # asyncio.run(main5())
    # asyncio.run(main6())
    asyncio.run(main7())

总结

  1. 上面的代码我全部都放在了我的GitHub里面,连接为:https://github.com/yuanzhoulvpi2017/tiny_python/blob/main/asyncio_task/01_introduction.py

  2. 个人计算机本地的I/O问题用线程解决,计算机本地的CPU计算问题用进程解决,远程服务器链接(爬虫、数据库)之类的可以使用协程解决。

  3. 之前我也不懂asyncio,也都是一点一点看,如果有错误的地方,欢迎大家提出来,我继续学习改进。

yuanzhoulvpi

2021/08/18  阅读:29  主题:橙心

作者介绍

yuanzhoulvpi