Loading...
墨滴

None1995

2021/04/21  阅读:183  主题:雁栖湖

python 并发编程手册

Part1并发编程介绍

1python中的并发编程

  • 第一列为单线程串行,即CPU和IO是串起来执行的。IO的速度会严重制约CPU的运算速度
  • 第二列为多线程并发,即当前线程遇到IO,释放CPU,可以提高CPU的运行效率
  • 第三列为多进程并行,即使用的是多个CPU,然后进行计算
  • 第四列为多机器并行,即使的的是多个机器同时计算同一个任务。

并行支持一下几种:

  • 多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
  • 多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行

线程通信主要有:

  • 使用Lock对资源加锁,防止冲突访问
  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
  • 使用subprocess启动外部程序的进程,并进行输入输出交互

2CPU密集型计算和IO密集型计算

CPU密集型(CPU-bound): CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高。例如:压缩解压缩、加密解密、正则表达式搜索

IO密集型(I/O-bound): IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,CPU占用率仍然较低。例如:文件处理程序、网络爬虫程序、读写数据库程序

3多线程,多进程,多协程的对比

python并发编程有三种方式: 多线程Thread、多进程Process、多协程Coroutine。

4怎样根据任务选择对应技术

如果是CPU密集型计算,使用多进程multiprocessing 如果是IO密集型计算,则考虑多线程或者多线程。如果任务量大,有现成协程库支持,协程实现的复杂度可接受则首选协程,否则选线程。

5全局解释器锁GIL

全局解释器锁(英语:Global Interpreter Lock,缩写GIL)是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。

虽然上图启动了三个线程,但是同一时刻,只有一个线程在执行,首先线程1拿到GIL然后执行,当前需要I/O时,则释放GIL,然后线程2拿到GIL, 执行,就这样,谁拿到GIL,谁就执行,否则,就阻塞。

为什么要有GIL?

为了解决多线程之间数据完整性和状态同步问题。 举例子:Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。开始:线程A和线程B都引用了对象obj,obj.ref_num = 2,线程A和B都想撤销对obj的引用

上图中线程A和线程B都引用了对象obj, 首先,线程A准备销毁obj引用,将计数器减1,刚减完,切换到了线程B, 线程B也要销毁obj引用,将计数器减1,然后判断引用等于0,则将其释放掉。 当切换到线程A继续执行的时候,发现obj的引用已经无法获取计数器的值了。 所以,就报错了。

怎样规避GIL带来的限制?

1、多线程 threading 机制依然是有用的,用于IO密集型计算。因为在 I/O (read,write,send,recv,etc.)期间,线程会释放GIL,实现CPU和IO的并行。因此多线程用于IO密集型计算依然可以大幅提升速度。但是多线程用于CPU密集型计算时,只会更加拖慢速度

2、使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势。为了应对GIL的问题,Python提供了multiprocessing

Part2多线程

6简单流程

  1. 准备一个函数
    def my_func(a, b):
       do_craw(a,b)
  2. 创建一个线程
    import threading
    t = threading.Thread(target=my_func, args=(100200)
  3. 启动线程和等待结束
    t.start()  # 启动线程
    t.join()   # 等待结束

7简单的一个多线程爬虫

"""
@file   : 001-多线程.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import threading
import time
import requests


def craw(url):
    # 这是个爬虫
    r = requests.get(url)
    print(url, r.status_code)


def single_thread():
    # 单线程爬虫
    print('single_thread start')
    for url in urls:
        craw(url)
    print('single_thread end')


def multi_thread():
    # 多线程爬虫
    print("multi_thread begin")
    threads = []
    for url in urls:
        threads.append(
            threading.Thread(target=craw, args=(url,))   # url, 之所以加逗号 是因为这里必须为元组
        )

    # 启动多线程
    for thread in threads:
        thread.start()

    # 等待结束
    for thread in threads:
        thread.join()
    print("multi_thread end")


if __name__ == '__main__':
    # 爬50页的内容
    urls = ['https://www.cnblogs.com/sitehome/p/{}'.format(page) for page in range(150 + 1)]

    # 单线程走起
    start = time.time()
    single_thread()
    end = time.time()
    print("single thread cost:", end - start, "seconds")

    # 多线程走起
    start = time.time()
    multi_thread()
    end = time.time()
    print("multi thread cost:", end - start, "seconds")

8生产者消费者模式进行多线程爬虫

在实现生产者消费者模式之前,了解一下多线程数据通信queue。queue.Queue可以用于多线程之间的,线程安全的数据通信。

# 1. 带入类库
import queue

# 2. 创建Queue
q = queue.Queue()

# 3. 添加元素
q.put(item)

# 4. 获取元素
item = q.get()

# 5. 查询状态
q.qsize()   # 查看当前元素的个数
q.empty()   # 判断是否为空
q.full()    # 判断是否已满

生产者消费者模式爬虫

"""
@file   : 002-生产者消费者实现多线程爬虫.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import queue
import time
import random
import threading
import requests
from bs4 import BeautifulSoup


def craw(url):
    # 爬取网页内容
    r = requests.get(url)
    return r.text


def parse(html):
    # 解析其中的内容
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]   # 那链接和标题拿出来


def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    '''
    生产者
    :param url_queue: url的队列  生产者从中拿出链接  去爬虫
    :param html_queue:  生产者将爬取的内容放到这里
    :return:
    '''

    while True:
        url = url_queue.get()
        html = craw(url)
        html_queue.put(html)
        print('线程名: ', threading.current_thread().name,
              "url_queue.size=", url_queue.qsize())   # 获取url队列中还有多少待爬取的
        time.sleep(random.randint(12))


def do_parse(html_queue: queue.Queue, fout):
    '''
    消费者
    :param html_queue: 生产者生产出的内容
    :param fout: 消费者将内容解析出来  存到fout中
    :return:
    '''

    while True:
        html = html_queue.get()
        results = parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        print('线程名: ', threading.current_thread().name,
              "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(12))


if __name__ == '__main__':
    # 待爬取的网页链接
    urls = [
        "https://www.cnblogs.com/sitehome/p/{}".format(page) for page in range(150 + 1)
    ]

    url_queue = queue.Queue()
    html_queue = queue.Queue()

    # 将url放进队列中
    for url in urls:
        url_queue.put(url)

    # 启动三个线程去做生产者
    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue),
                             name="craw{}".format(idx))
        t.start()

    fout = open("data.txt""w")
    # 启动两个线程去做消费者
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout),
                             name="parse{}".format(idx))
        t.start()

9线程安全概念

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。

上图展示的是一个取钱的过程,每次取钱,先进行if判断,然后再减去金额。线程1执行到if判断完,就被切换到线程2了。 此时,线程2也进入到了if中又被切换到线程1,线程1继续执行下去,减去金额,取到了钱。切换到线程2,也减去金额,取到了钱,显然就有问题了。 银行亏了600块。

Lock用于解决线程安全问题:

用法一: try-finally模式

import threading

lock = threading.Lock()

lock.acquire()   # 获取锁  其他线程就进不到下面的try中了
try:
    # do something
finally:
    lock.release()   # 释放锁,其他线程就可以通过前面的acquire获取到锁了。

用法二: with模式(更常用)

import threading

lock = threading.Lock()

with lock:
    # do something

10线程锁使用实例

"""
@file   : 003-多线程锁机制.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import threading
import time


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    with lock:
        if account.balance >= amount:
            # time.sleep(0.1)   # 如果不加锁,这里休息0.1秒,每次都会出问题,因为这里会引起线程阻塞,一定会切换
            print(threading.current_thread().name, "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name, "余额", account.balance)
        else:
            print(threading.current_thread().name,
                  "取钱失败,余额不足")


if __name__ == "__main__":
    account = Account(1000)    # 金额
    
    lock = threading.Lock()   # 实例化线程锁
    
    # 启动两个线程  分别去800块
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    tb = threading.Thread(name="tb", target=draw, args=(account, 800))

    ta.start()
    tb.start()

11线程池概念介绍

上图左侧是展示的是一个线程的生命周期,首先,新建线程,然后准备就绪,等cpu调用,如果被调用,则开始运行,如果被切换,则又返回就绪状态,如果是因为io或者sleep,则进入阻塞状态,阻塞结束则又回到就绪状态,反反复复,直到执行完。之所以要采用线程池,右上角以说明原因。

线程池的好处:

  1. 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;
  2. 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
  3. 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

12线程池的使用方法

用法一: map函数,很简单。注意map的结果和入参是顺序对应的。

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as pool:
    results = pool.map(craw, urls)
    for result in results:
        print(result)

用法二: futures模式,更强大。注意如果用as_completed顺序是不定的。

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    futures = [ pool.submit(craw, url) for url in urls ]

    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())

使用线程池实现多线程爬虫

"""
@file   : 004-线程池的使用.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import concurrent.futures
import requests
from bs4 import BeautifulSoup


def craw(url):
    # 爬取网页内容
    r = requests.get(url)
    return r.text


def parse(html):
    # 解析其中的内容
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]   # 那链接和标题拿出来


if __name__ == '__main__':
    # 待爬取的网页链接
    urls = [
        "https://www.cnblogs.com/sitehome/p/{}".format(page) for page in range(150 + 1)
    ]
        
    # craw
    with concurrent.futures.ThreadPoolExecutor() as pool:
        htmls = pool.map(craw, urls)
        htmls = list(zip(urls, htmls))
        for url, html in htmls:
            print(url, len(html))
    print("craw over")
    
    # parse
    with concurrent.futures.ThreadPoolExecutor() as pool:
        futures = {}
        for url, html in htmls:
            future = pool.submit(parse, html)
            futures[future] = url
    
        # for future, url in futures.items():
        #     print(url, future.result())
    
        for future in concurrent.futures.as_completed(futures):
            url = futures[future]
            print(url, future.result())

使用线程池在flask-web服务中加速

import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)
pool = ThreadPoolExecutor()


def read_file():
    time.sleep(0.1)
    return "file result"


def read_db():
    time.sleep(0.2)
    return "db result"


def read_api():
    time.sleep(0.3)
    return "api result"


@app.route("/")
def index():
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)

    return json.dumps({
        "result_file": result_file.result(),
        "result_db": result_db.result(),
        "result_api": result_api.result(),
    })


if __name__ == "__main__":
    app.run()

Part3多进程

13多进程multiprocessing知识梳理

上图的上面展示的是一个多线程执行的过程,主要通过并行IO和CPU来提高执行速度,但是对于CPU密集型运算,即上图的下面部分,一直都需CPU计算,则线程的切换耽误时间,导致多线程反而没有多线程速度快。

对比多线程和多进程的实现

14多线程的实现

这里判断100个大数 是否为素数?分别对比了单线程,多线程,多进程的效率。

"""
@file   : 005-多进程的使用.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 12):
        if n % i == 0:
            return False
    return True


def single_thread():
    for number in PRIMES:
        is_prime(number)


def multi_thread():
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


def multi_process():
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)


if __name__ == "__main__":
    PRIMES = [112272535095293] * 100

    start = time.time()
    single_thread()
    end = time.time()
    print("single_thread, cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread, cost:", end - start, "seconds")

    start = time.time()
    multi_process()
    end = time.time()
    print("multi_process, cost:", end - start, "seconds")
    # single_thread, cost: 48.5049991607666 seconds
    # multi_thread, cost: 50.53124475479126 seconds
    # multi_process, cost: 16.009512901306152 seconds
    

15使用多进程在flask-web服务中加速

import flask
from concurrent.futures import ProcessPoolExecutor
import math
import json


app = flask.Flask(__name__)


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 12):
        if n % i == 0:
            return False
    return True


@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
    number_list = [int(x) for x in numbers.split(",")]
    results = process_pool.map(is_prime, number_list)
    return json.dumps(dict(zip(number_list, results)))


if __name__ == "__main__":
    process_pool = ProcessPoolExecutor()
    app.run()

Part4协程

16协程内容的介绍

上图的上面是单线程爬虫 cpu的执行情况,可以发现,经常因为等待IO而影响CPU的执行效率。 上图的下面是协程,协程主要是在单线程内实现的,以爬虫为例,协程先是让cpu爬取第一个url的内容,等待IO的时候,它又让CPU爬取第二个url的内容,当第二个任务等待IO的时候,它又让CPU爬取第三个url的内容,然后第三个任务等待IO, 它又循环回来,执行第一个任务,就这样返回循环。 所以,协程就是大循环。

17python异步IO库介绍: asyncio

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 定义协程
async def myfunc(url):
    await get_url(url)

# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]

# 执行爬虫事件列表
loop.run_until_complete(asyncio.wait(tasks))

注意:

  • 要用在异步IO编程中, 依赖的库必须支持异步IO特性
  • 爬虫引用中:requests 不支持异步, 需要用 aiohttp

18协程爬虫实现

"""
@file   : 008-协程爬虫.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import asyncio
import aiohttp
import time


async def async_craw(url):
    print("craw url: ", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}{len(result)}")


if __name__ == '__main__':
    urls = [
        "https://www.cnblogs.com/sitehome/p/{}".format(page) for page in range(150 + 1)
    ]
    
    loop = asyncio.get_event_loop()   # 获取超级循环
    tasks = [loop.create_task(async_craw(url)) for url in urls]  # 建立任务
    start = time.time()
    loop.run_until_complete(asyncio.wait(tasks))   # 开始执行
    end = time.time()
    print("use time seconds: ", end - start)

19信号量

信号量(英语:Semaphore)又称为信号量、旗语是一个同步对象,用于保持在0至指定最大值之间的一个计数值。

  • 当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;
  • 当线程完成一次对semaphore对象的释放(release)时,计数值加一。
  • 当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态
  • semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.

信号量是用来控制并发度的。

主要有两种实现方式:
方式一:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

方式二:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

20使用信号量控制协程数进行爬虫

"""
@file   : 009-使用信号量控制协程数进行爬虫.py
@author : xiaolu
@email  : luxiaonlp@163.com
@time   : 2021-02-01
"""

import asyncio
import aiohttp
import time


async def async_craw(url):
    async with semaphore:   # 加了这个
        print("craw url: ", url)
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                result = await resp.text()
                await asyncio.sleep(5)
                print(f"craw url: {url}{len(result)}")


if __name__ == '__main__':
    urls = [
        "https://www.cnblogs.com/sitehome/p/{}".format(page) for page in range(150 + 1)
    ]
    semaphore = asyncio.Semaphore(10)   # 控制并发量

    loop = asyncio.get_event_loop()   # 获取超级循环
    tasks = [loop.create_task(async_craw(url)) for url in urls]  # 建立任务
    start = time.time()
    loop.run_until_complete(asyncio.wait(tasks))   # 开始执行
    end = time.time()
    print("use time seconds: ", end - start)

None1995

2021/04/21  阅读:183  主题:雁栖湖

作者介绍

None1995