in

基于进程、线程和异步网页爬取的速度比较

网页爬取的速度比较

加速网络爬取可能是一项艰巨的任务,因此在本教程中,我们将了解网络爬取中最大的速度瓶颈。

在本文中,我们将重点关注 Python,但相同的想法和概念可以应用于几乎任何其他编程语言或网络爬取工具包。

我们将介绍什么是 CPU 绑定和 IO 绑定任务,我们如何使用进程、线程和 asyncio 对它们进行优化,以将我们的爬虫速度提高数十到数百倍。那么,让我们开始吧!

障碍

在网络爬取中,我们主要有两种类型的性能瓶颈:IO-bound 和 CPU-bound。

例如,我们的 IO(输入/输出)任务可以是任何执行外部连接的任务——无论是 HTTP 请求还是将爬取的数据存储到数据库。两者都是网络爬取程序的主要部分:

# HTTP requests are IO-bound
import requests
from time import time
_start = time()
# requests will block here untill web server responds:
response = requests.get("https://www.jingzhengli.com/TODO")
print(f"requests finished in: {time()-_start:.2f}")

我们还会遇到 CPU 任务,例如解析爬取的 HTML、加载 JSON 数据、自然语言解析等。

# To parse html our CPU needs to do quite a bit of calculations:
from parsel import Selector
selector = Selector(html)
article = "\n".join(selector.css(".post-section p").getall())

# even more so if we're using complex parsing tech such as nature language processing 
from nltk.tokenize import word_tokenize
word_tokens = word_tokenize(article)

在网络爬取中,我们遇到了这两种性能挑战。然而,IO 块在整体性能影响中所占比例要高得多,因为爬取程序会采取大量 IO 阻塞操作。

Python 中的缩放选项

在我们开始加速之前,让我们看一下 Python 中可用的扩展技术:

  • 进程是程序运行的独占内存空间。每个进程都有自己的 python 解释器和个人内存。
  • 每个进程都可以有多个子进程,但进程之间的通信很困难,这使得进程之间的数据共享变得复杂。
  • 每个进程都可以线程,它们可以轻松地在它们之间共享数据,但不能像进程那样并行运行。

总而言之,我们可以有多个进程来处理受 CPU 限制的任务,因为它们可以在每个处理器内核上并行计算。所以,如果我们的机器有 12 个内核,我们可以并行运行 12 个计算进程。

每个进程也可以有不能并行运行但可以轮流在它们之间共享数据的线程,这是一种管理 IO 绑定任务的方法,因为一个线程可以接管而另一个线程正在等待 IO 完成

那么当涉及到网络爬取时:用于解析的多处理和用于连接的多线程?
不完全是。Python 中的线程有点复杂和昂贵。我们可以轻松启动几个线程,但如果我们建立数千个连接,我们的大部分计算资源将被所有线程处理开销占用。
所以,取而代之的是,一种新技术在网络爬取中更受青睐:单线程异步程序!

所以总结一下所有这些:CPU 性能的多处理和 IO 性能的 asyncio
让我们在实际的网络爬取中看看这两个!

异步请求

IO block 是我们的程序必须等待外部系统响应的时候。例如,建立 HTTP 连接——我们的程序发送请求并等待 Web 服务器向其发送响应。这可能需要几秒钟的等待。

为什么我们的程序不能在等待的时候做点别的事情?这正是它的asyncio作用!
异步程序使用一个管理器(称为事件循环),它可以暂停某些功能并同时让其他功能接管。换句话说,当一个 IO 阻塞操作(如请求)正在等待时,事件循环会让其他操作接管。

所以,如果我们发出 50 个 HTTP 请求,每个请求花费 1 秒,在一个同步程序中我们最终将花费 50 多秒:

import requests
from time import time

_start = time()
for i in range(50):
    request.get("http://httpbin.org/delay/1")
print(f"finished in: {time() - _start:.2f} seconds")
# finished in: 52.21 seconds

这些花费的大部分时间是我们的程序在等待服务器响应我们。让我们使用异步 HTTP 客户端来摆脱这种等待:

import httpx
import asyncio
from time import time

_start = time()

async def main():
    async with httpx.AsyncClient() as client:
        tasks = [client.get("http://httpbin.org/delay/1") for i in range(50)]
        for response_future in asyncio.as_completed(tasks):
            response = await response_future
    print(f"finished in: {time() - _start:.2f} seconds")

asyncio.run(main())
# finished in: 3.55 seconds

现在,我们所有的请求都可以一起等待,这给了我们巨大的速度提升!然而,这意味着我们必须明确地告诉我们的程序什么时候可以捆绑这个等待。我们通过asyncio.gatherasyncio.as_completed功能助手来做到这一点:

import httpx
import asyncio
from time import time


async def with_gathering():
    _start = time()
    async with httpx.AsyncClient() as client:
        tasks = [client.get("http://httpbin.org/delay/1") for i in range(50)]
        for response_future in asyncio.as_completed(tasks):
            response = await response_future
    print(f"with gathering finished in: {time() - _start:.2f} seconds")

async def without_gathering():
    _start = time()
    async with httpx.AsyncClient() as client:
        for i in range(50):
            response = await client.get("http://httpbin.org/dealy/1")
    print(f"without gathering finished in: {time() - _start:.2f} seconds")
        

asyncio.run(with_gathering())
# with gathering finished in: 3.55 seconds
asyncio.run(without_gathering())
# without gathering finished in: 52.78 seconds

我们可以看到,如果没有它,我们将回到与之前同步程序相同的速度。因此,设计异步程序有点困难,因为我们必须明确说明何时可以同时运行任务。

与同步代码混合

asyncio 的唯一缺点是我们需要我们的库为其提供明确的支持。asyncio因此,旧的社区包在不更新的情况下无法利用速度。

然而,asyncio 自带了一个绝妙的工具asyncio.to_thread()功能,可以将任何同步函数变成异步函数!

to_thread()通过将同步代码推迟到由 asyncio 管理的新线程来实现。因此,我们可以轻松地将慢同步代码集成到我们的异步程序中。

让我们看一个假想的例子,我们有两个爬虫函数:我们自己编写的异步爬虫和一个使用同步代码的社区爬虫:

from time import time
import requests
import httpx
import asyncio


def community_movie_scraper(movie_name):
    """community movie scraper is synchronous and slow"""
    response = requests.get("http://httpbin.org/delay/1")
    ...
    return {"title": movie_name}


async def our_movie_scraper(client, movie_name):
    """our movie scraper is asynchronous and fast"""
    response = await client.get("http://httpbin.org/delay/1")
    ...
    return {"title": movie_name}


async def scrape():
    """scrape movies using both our scraper and community scraper"""
    movies = ["badguys", "witch", "interstellar", "prey", "luck", "up"]
    _start = time()
    async with httpx.AsyncClient() as client:
        async_tasks = [our_movie_scraper(client, f"async: {movie}") for movie in movies]
        sync_tasks = [asyncio.to_thread(community_movie_scraper, f"sync: {movie}") for movie in movies]
        for result in asyncio.as_completed(async_tasks + sync_tasks):
            print(await result)
    print(f"completed in {time() - _start:.2f}")


if __name__ == "__main__":
    asyncio.run(scrape())

运行输出

{'title': 'sync: badguys'}
{'title': 'sync: interstellar'}
{'title': 'async: up'}
{'title': 'async: interstellar'}
{'title': 'async: badguys'}
{'title': 'sync: witch'}
{'title': 'async: luck'}
{'title': 'sync: up'}
{'title': 'sync: luck'}
{'title': 'sync: prey'}
{'title': 'async: witch'}
{'title': 'async: prey'}
completed use_threads=True in 2.24
{'title': 'sync: badguys'}
{'title': 'sync: witch'}
{'title': 'sync: interstellar'}
{'title': 'sync: prey'}
{'title': 'sync: luck'}
{'title': 'sync: up'}
{'title': 'async: badguys'}
{'title': 'async: interstellar'}
{'title': 'async: prey'}
{'title': 'async: up'}
{'title': 'async: witch'}
{'title': 'async: luck'}
completed use_threads=False in 13.24

在上面的示例中,我们有两个电影爬取功能:我们的超快异步功能和一个慢速社区功能。
为了加快我们的整体程序,我们只需将同步的、慢速的函数推迟到线程!

正如我们之前介绍的那样,python 线程不能并行运行,尽管它们可以像 asyncio 协程一样暂停和轮流运行。这意味着,我们可以很容易地混合和匹配异步代码和线程!
在我们的示例中,我们创建了 6 个异步协程和 6 个异步线程,使我们能够轻松地将快速异步代码与慢速同步代码结合起来,并以异步速度运行它们。


使用asyncio我们可以快速扩展 IO 阻塞,例如网络爬取中的 HTTP 请求。然而,网络爬取的另一个重要部分是数据解析本身。那么,接下来让我们看看如何使用多处理来扩展它。

多进程解析

使用 asyncio 我们可以快速获取数据,但是当现代处理器有几十个 CPU 内核时,我们的 Python 程序仍将使用单个 CPU 内核来解析它。

为了通过多个 CPU 内核分发我们的解析,我们可以使用多处理。即使是现代笔记本电脑也有十几个或更多的内核:

import multiprocessing
print(f"This machine has {multiprocessing.cpu_count()} CPU cores")
# This machine has 12 CPU cores

如果我们有 12 个核心,我们可以产生 12 个并发进程来解析我们爬取的内容,这可能会提高 12 倍的速度!

在 python 中利用多处理的最简单方法是通过concurrent.futures.ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time


def fibonacci_of(n):
    """get fibonnaci number"""
    if n in {0, 1}:
        return n
    return fibonacci_of(n - 1) + fibonacci_of(n - 2)


def multi_process(number, times):
    start = time()
    with ProcessPoolExecutor() as executor:
        for result in executor.map(fibonacci_of, [number for i in range(times)]):
            pass
        print("done")
    print(f"multi-process finished in {time() - start:.2f}")


def single_process(number, times):
    start = time()
    for i in range(times):
        fibonacci_of(number)
    print(f"single-process finished in {time() - start:.2f}")


if __name__ == "__main__":
    fib_number = 36  # single calculation of 36 takes around 1-3 seconds
    count = 12
    multi_process(fib_number, count)
    # multi-process finished in 3.1
    single_process(fib_number, count)
    # single-process finished in 32.8

在这里我们可以看到使用 ProcessPoolExecutor 如何将我们的程序加速 10 倍以上。

ProcessPoolExecutor启动最大数量的子进程,它等于可用 CPU 内核的数量。因此,在具有 12 个 cpu 内核的机器上,它将产生 12 个进程,这些进程将平均分配工作负载,从而显着提高性能。

异步+多处理

最后,我们可以结合这两种技术来充分利用 Python 进行网络爬取。我们可以用异步 python 编写我们的爬虫,然后通过多个进程分发它。让我们看一下这个示例爬取器:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

import httpx


async def scrape(urls):
    """this is our async scraper that scrapes"""
    results = []
    async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
        scrape_tasks = [client.get(url) for url in urls]
        for response_f in asyncio.as_completed(scrape_tasks):
            response = await response_f
            # emulate data parsing/calculation
            sleep(0.5)
            ...
            results.append("done")
    return results


def scrape_wrapper(args):
    i, urls = args
    print(f"subprocess {i} started")
    result = asyncio.run(scrape(urls))
    print(f"subprocess {i} ended")
    return result


def multi_process(urls):
    _start = time()

    batches = []
    batch_size = multiprocessing.cpu_count() - 1  # let's keep 1 core for ourselves
    print(f"scraping {len(urls)} urls through {batch_size} processes")
    for i in range(0, len(urls), batch_size):
        batches.append(urls[i : i + batch_size])
    with ProcessPoolExecutor() as executor:
        for result in executor.map(scrape_wrapper, enumerate(batches)):
            print(result)
        print("done")

    print(f"multi-process finished in {time() - _start:.2f}")

def single_process(urls):
    _start = time()
    results = asyncio.run(scrape(urls))
    print(f"single-process finished in {time() - _start:.2f}")



if __name__ == "__main__":
    urls = ["http://httpbin.org/delay/1" for i in range(100)]
    multi_process(urls)
    # multi-process finished in 7.22
    single_process(urls)
    # single-process finished in 51.28

在我们上面的代码示例中,我们有两个爬取运行器:

  • single_process是我们简单的异步爬取运行器,它绕过 IO 阻塞但仍然花费大量时间进行解析。
  • multi_process是我们的异步爬取器分布在多个进程中,它绕过了 IO 阻塞并提高了解析速度。

设计具有异步进程的爬取器乍一看可能令人望而生畏,但只要稍加努力,我们就可以实现令人难以置信的网络爬取速度。

概括

在这个 python 网络爬取教程中,我们了解了爬取速度的基础知识。我们介绍了线程或 asyncio 如何帮助我们提高 IO 块的速度,以及多处理如何帮助我们提高 CPU 块的速度。

通过使用 Python 中的内置工具,我们可以将我们的爬虫速度从几十倍提高到几千倍,而只需要很少的额外资源或开发时间开销。

Written by 河小马

河小马是一位杰出的数字营销行业领袖,广告中国论坛的重要成员,其专业技能涵盖了PPC广告、域名停放、网站开发、联盟营销以及跨境电商咨询等多个领域。作为一位资深程序开发者,他不仅具备强大的技术能力,而且在出海网络营销方面拥有超过13年的经验。