in

如何将网页爬虫转化为数据API

如何将网页爬虫转化为数据API

实时传送网络爬取的数据可能是一个真正的挑战。尽管要将 Python 网络爬取工具转变为按需提供数据的 API,我们只需要很少的成分。

在本实用教程中,我们将构建一个简单的Yahoo Finance scraper 并将其包装在由FastAPI提供支持的 Web API 中,以按需爬取和提供数千个股票数据详细信息!

为什么选择 FastAPI?

FastAPI是 Python 中的一个异步API 框架,正是出于这个原因,它使其成为网络爬取 API 的理想选择。通过使用异步代码结构,我们可以在非常少的代码中以非常少的开销拥有非常快的程序。在这篇文章中,我们将在不到 100 行代码中完成带有缓存和 webhook 支持的完整雅虎股票数据 API!

设置

在本教程中,作为我们的 API,我们将使用FastAPI作为我们的 API 服务器。为了服务我们的 API,我们将使用uvicorn并且为了更容易理解发生了什么,我们将使用一个丰富的日志包loguru

对于爬取,我们将使用httpx作为我们的 HTTP 客户端,使用 parsel作为我们的 HTML 解析器。

所有这些工具都是 Python 包,可以使用pip控制台命令安装:

$ pip install fastapi uvicorn httpx parsel loguru

准备好工具后,让我们看一下 FastAPI 基础知识。

FastAPI 快速入门

FastAPI 有一些很棒的文档资源,但对于我们的爬虫服务,我们只需要非常基础的东西。

首先,我们将在一个 python 模块中工作main.py

# main.py
from fastapi import FastAPI
from loguru import logger as log

# create API app object
app = FastAPI()

# attach route to our API app
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    return {
        "stock": symbol,
        "price": "placeholder", 
    }

上面,我们用一个路由定义了数据 API 的第一个细节:/scrape/stock/<symbol>
这条路线有一个参数 – 符号 – 指示应该刮掉哪些股票。例如,Apple 的股票代码是AAPL

我们可以运行我们的 API 并查看我们的占位符结果:

$ uvicorn main:app --reload

现在,如果我们去http://127.0.0.1:8000/scrape/stock/aapl我们应该看到我们的占位符结果为 JSON:

import httpx

print(httpx.get("http://127.0.0.1:8000/scrape/stock/aapl").json())
{
    "stock": "AAPL",
    "price": "placeholder",
}

因此,每次有人连接到这个 API 端点时,我们的scrape_stock函数都会被调用,结果将返回给客户端。现在,让它爬取东西。

爬取雅虎财经

为了爬取 Yahoo Finance,我们将生成指向股票数据页面的 URL,下载 HTML 数据并使用一些巧妙的 XPath 选择器对其进行解析。

例如,让我们看一下 Apple 的股票数据页面yahoo.com/quote/AAPL/ – 我们可以看到 URL 模式是finance.yahoo.com/quote/<symbol>. 因此,只要我们知道公司的证券交易所代码,我们就可以爬取任何股票的数据。

Apple 股票的雅虎财经页面的屏幕截图
我们将爬取价格和摘要详细信息

让我们将爬虫逻辑添加到我们的 API 中:

import asyncio
import httpx
from time import time
from fastapi import FastAPI

app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))

async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}

    # parse summary data tables:
    rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
    for row in rows:
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    # parse price:
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    # add meta field for when this was scraped
    parsed["_scraped_on"] = time()
    return parsed

@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    symbol = symbol.upper()
    return await scrape_yahoo_finance(symbol)

# on API start - open up our scraper's http client connections
@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()


# on API close - close our scraper's http client connections
@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

我们已经用一个 scraper 函数更新了我们的 API,并将它附加到我们的 API 端点。现在让我们看看结果:

$ curl http://127.0.0.1:8000/scrape/stock/aapl
{
  "prev_close": "156.90",
  "open": "157.34",
  "bid": "0.00 x 1000",
  "ask": "0.00 x 1000",
  "days_range": "153.67 - 158.74",
  "fifty_two_wk_range": "129.04 - 182.94",
  "td_volume": "101,696,790",
  "average_volume_3month": "75,324,652",
  "market_cap": "2.47T",
  "beta_5y": "1.23",
  "pe_ratio": "25.41",
  "eps_ratio": "6.05",
  "earnings_date": "Oct 26, 2022  -  Oct 31, 2022",
  "dividend_and_yield": "0.92 (0.59%)",
  "ex_dividend_date": "Aug 05, 2022",
  "one_year_target_price": "182.01",
  "price": "153.72",
  "_scraped_on": 1663838493.6148243
}

尽管我们确实有一个明显的问题:API 加载,但我们只用了几行代码就把我们的爬取器变成了一个数据 API。
如果多个客户要求在短时间内爬取同一股票怎么办?我们将把资源浪费在多余的擦除上。
为了解决这个问题,让我们来看看如何添加简单的缓存。

缓存爬虫

缓存 API 内容的方法有很多,从在我们的 Web 服务器中启用缓存到使用专用缓存工具(如 Redis 或 Memchache)。

然而,对于小型 API 项目,我们可以轻松地在 Python

import asyncio
import httpx
from time import time
from fastapi import FastAPI

app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {}  # NEW: establish global cache storage
CACHE_TIME = 60  # NEW: define how long do we want to keep cache in seconds

async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    # NEW: check cache before we commit to scraping
    cache = STOCK_CACHE.get(symbol)
    if cache and time() - CACHE_TIME < cache["_scraped_on"]:
        log.debug(f"{symbol}: returning cached item")
        return cache

    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}

    rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
    for row in rows:
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    parsed["_scraped_on"] = time()
    # NEW: store successful results to cache
    STOCK_CACHE[symbol] = parsed
    return parsed

@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    symbol = symbol.upper()
    return await scrape_yahoo_finance(symbol)

@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()
    # NEW: optionally we can clear expired cache every minute to prevent
    # memory build up. 
    async def clear_expired_cache(period=60.0):
        while True:
            global STOCK_CACHE
            log.debug(f"clearing expired cache")
            STOCK_CACHE = {
                k: v for k, v in STOCK_CACHE.items() if time() - CACHE_TIME < v["_scraped_on"]
            }
            await asyncio.sleep(period)
    clear_cache_task = asyncio.create_task(clear_expired_cache())


@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

上面,我们更新了我们的爬取功能,在爬取实时目标之前首先检查全局缓存。为此,我们使用一个简单的 Python 字典来按符号存储爬取的数据。

Python 字典非常快速和高效,因此我们可以在 API 的内存中缓存数千甚至数百万个结果。
我们还添加了一个重复的异步任务clear_expired_cache(),每分钟删除过期的缓存值。我们通过使用asyncio.create_task()将任何异步函数(协程)转换为后台任务对象的函数来做到这一点。

现在,如果我们的 API 多次收到对同一股票数据的请求,它只会爬取一次数据并将缓存提供给其他所有人:

import asyncio
import httpx
from time import time


async def many_concurrent_api_calls(n):
    _start = time()
    async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
        _start_one = time()
        await client.get("http://127.0.0.1:8000/scrape/stock/aapl")
        print(f"completed first API scrape in {time() - _start_one:.2f} seconds")
        results = await asyncio.gather(*[
            client.get("http://127.0.0.1:8000/scrape/stock/aapl")
            for i in range(n)
        ])
    print(f"completed {n-1} API scrapes in {time() - _start:.2f} seconds")

# run 1000 API calls
asyncio.run(many_concurrent_api_calls(1_000))
# will print:
# completed first API scrape in 1.23 seconds
# completed 999 API scrapes in 2.59 seconds

缓存技术会因应用程序而异,但对于像网络爬取这样耗时耗力​​的任务,即使是像这样的简单设计,我们也能看到巨大的好处。

有关 FastAPI 中更高级的缓存,请参阅fastapi-cache扩展。

用于长数据爬取的 Webhook

一些爬取任务可能需要很多秒甚至几分钟才能完成,这会超时或阻止连接客户端。处理此问题的一种方法是提供 webhook 支持。

换句话说,webhooks 本质上承诺 API 将完成任务并在稍后回调结果。我们可以通过替换路由来扩展我们的 API 以包含此功能scrape_stock

...

async def with_webhook(cor, webhook, retries=3):
    """execute corotine and send it to a webhook"""
    result = await cor
    async with httpx.AsyncClient(
        headers={"User-Agent": "scraper webhook"},
        timeout=httpx.Timeout(timeout=15.0),
    ) as client:
        for i in range(retries):
            try:
                response = await client.post(webhook, json=result)
                return
            except Exception as e:
                log.exception(f"Failed to send a webhook {i}/{retries}")
            await asyncio.sleep(5)  # wait between retries
        log.error(f"Failed to reach webhook in {retries} retries")



@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
    symbol = symbol.upper()
    scrape_cor = scrape_yahoo_finance(symbol)
    if webhook:
        # run scrape coroutine in the background
        task = asyncio.create_task(with_webhook(scrape_cor, webhook))
        return {"success": True, "webhook": webhook}
    else:
        return await scrape_cor

在上面的更新中,如果?webhook向我们的 API 提供了参数,它会立即返回{"success": True, "webhook": webook}并安排一个后台任务,该任务会爬取结果并将它们发布到客户端的 webhook 地址。

为了测试这一点,我们可以使用 webhook 测试服务,例如webhook.site:

from urllib.parse import urlencode
import httpx

url = "http://127.0.0.1:8000/scrape/stock/aapl" + urlencode({
    "webhook": "https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c"  # change this to yours
})
response = httpx.get(url)
print(response.json())
# {'success': True, 'webhook': 'https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c'}

运行它时,我们会看到 API 立即返回,如果我们检查我们的 webhooks.site 会话,我们会看到那里传送的数据:

发送 webhook 后 webhook.site 的屏幕截图
我们可以看到带有爬取数据的传入 webhook

Webhooks 是一种很好的方式,不仅可以处理长时间的爬取任务,还可以确保我们的 API 在未来扩展,因为我们可以在需要时简单地启动更多进程来处理 webhook 任务。

最终爬虫代码

为了总结这篇文章,让我们看一下我们的 scraper API 的完整代码:

import asyncio
from time import time
from typing import Optional

import httpx
from fastapi import FastAPI
from loguru import logger as log
from parsel import Selector

app = FastAPI()

stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {}
CACHE_TIME = 10


async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    cache = STOCK_CACHE.get(symbol)
    if cache and time() - CACHE_TIME < cache["_scraped_on"]:
        log.debug(f"{symbol}: returning cached item")
        return cache

    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}
    for row in sel.xpath(
        '//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]'
    ):
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    parsed["_scraped_on"] = time()
    STOCK_CACHE[symbol] = parsed
    return parsed


async def with_webhook(cor, webhook, retries=3):
    result = await cor
    async with httpx.AsyncClient(
        headers={"User-Agent": "scraper webhook"},
        timeout=httpx.Timeout(timeout=15.0),
    ) as client:
        for i in range(retries):
            try:
                response = await client.post(webhook, json=result)
                return
            except Exception as e:
                log.exception(f"Failed to send a webhook {i}/{retries}")
            await asyncio.sleep(5)  # wait between retries


@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
    symbol = symbol.upper()
    scrape_cor = scrape_yahoo_finance(symbol)
    if webhook:
        # run scrape coroutine in the background
        task = asyncio.create_task(with_webhook(scrape_cor, webhook))
        return {"success": True, "webhook": webhook}
    else:
        return await scrape_cor


# we can clear cache every once in a while to prevent memory build up
# @app.on_event("startup")
# @repeat_every(seconds=5.0)  # 1 hour
async def clear_expired_cache(period=5.0):
    while True:
        global STOCK_CACHE
        _initial_len = len(STOCK_CACHE)
        log.debug(f"clearing expired cache, current len {_initial_len}")
        STOCK_CACHE = {
            k: v
            for k, v in STOCK_CACHE.items()
            if v["_scraped_on"] == "scraping" or time() - CACHE_TIME < v["_scraped_on"]
        }
        log.debug(f"cleared {_initial_len - len(STOCK_CACHE)}")
        await asyncio.sleep(period)


@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()
    clear_cache_task = asyncio.create_task(clear_expired_cache())


@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

概括

在本实用教程中,我们快速深入了解了实时爬取数据的数据 API。我们已经介绍了如何设置 FastAPI,编写了一个简单的 Yahoo Finance 股票数据爬取器,然后将所有内容打包到一个具有缓存和 webhook 支持的内聚数据 API 中。

通过利用 Python 中现有的、强大的 Web API 生态系统,我们仅用几行实际代码就做到了这一点!

Written by 河小马

河小马是一位杰出的数字营销行业领袖,广告中国论坛的重要成员,其专业技能涵盖了PPC广告、域名停放、网站开发、联盟营销以及跨境电商咨询等多个领域。作为一位资深程序开发者,他不仅具备强大的技术能力,而且在出海网络营销方面拥有超过13年的经验。