实时传送网络爬取的数据可能是一个真正的挑战。尽管要将 Python 网络爬取工具转变为按需提供数据的 API,我们只需要很少的成分。
在本实用教程中,我们将构建一个简单的Yahoo Finance scraper 并将其包装在由FastAPI提供支持的 Web API 中,以按需爬取和提供数千个股票数据详细信息!
为什么选择 FastAPI?
FastAPI是 Python 中的一个异步API 框架,正是出于这个原因,它使其成为网络爬取 API 的理想选择。通过使用异步代码结构,我们可以在非常少的代码中以非常少的开销拥有非常快的程序。在这篇文章中,我们将在不到 100 行代码中完成带有缓存和 webhook 支持的完整雅虎股票数据 API!
设置
在本教程中,作为我们的 API,我们将使用FastAPI作为我们的 API 服务器。为了服务我们的 API,我们将使用uvicorn并且为了更容易理解发生了什么,我们将使用一个丰富的日志包loguru。
对于爬取,我们将使用httpx作为我们的 HTTP 客户端,使用 parsel作为我们的 HTML 解析器。
所有这些工具都是 Python 包,可以使用pip
控制台命令安装:
$ pip install fastapi uvicorn httpx parsel loguru
准备好工具后,让我们看一下 FastAPI 基础知识。
FastAPI 快速入门
FastAPI 有一些很棒的文档资源,但对于我们的爬虫服务,我们只需要非常基础的东西。
首先,我们将在一个 python 模块中工作main.py
:
# main.py from fastapi import FastAPI from loguru import logger as log # create API app object app = FastAPI() # attach route to our API app @app.get("/scrape/stock/{symbol}") async def scrape_stock(symbol: str): return { "stock": symbol, "price": "placeholder", }
上面,我们用一个路由定义了数据 API 的第一个细节:/scrape/stock/<symbol>
。
这条路线有一个参数 – 符号 – 指示应该刮掉哪些股票。例如,Apple 的股票代码是AAPL
。
我们可以运行我们的 API 并查看我们的占位符结果:
$ uvicorn main:app --reload
现在,如果我们去http://127.0.0.1:8000/scrape/stock/aapl
我们应该看到我们的占位符结果为 JSON:
import httpx print(httpx.get("http://127.0.0.1:8000/scrape/stock/aapl").json()) { "stock": "AAPL", "price": "placeholder", }
因此,每次有人连接到这个 API 端点时,我们的scrape_stock
函数都会被调用,结果将返回给客户端。现在,让它爬取东西。
爬取雅虎财经
为了爬取 Yahoo Finance,我们将生成指向股票数据页面的 URL,下载 HTML 数据并使用一些巧妙的 XPath 选择器对其进行解析。
例如,让我们看一下 Apple 的股票数据页面yahoo.com/quote/AAPL/ – 我们可以看到 URL 模式是finance.yahoo.com/quote/<symbol>
. 因此,只要我们知道公司的证券交易所代码,我们就可以爬取任何股票的数据。
让我们将爬虫逻辑添加到我们的 API 中:
import asyncio import httpx from time import time from fastapi import FastAPI app = FastAPI() stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0)) async def scrape_yahoo_finance(symbol): """scrapes stock data from yahoo finance""" log.info(f"{symbol}: scraping data") response = await stock_client.get( f"https://finance.yahoo.com/quote/{symbol}?p={symbol}" ) sel = Selector(response.text) parsed = {} # parse summary data tables: rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]') for row in rows: label = row.xpath("@data-test").get().split("-value")[0].lower() value = " ".join(row.xpath(".//text()").getall()) parsed[label] = value # parse price: parsed["price"] = sel.css( f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)' ).get() # add meta field for when this was scraped parsed["_scraped_on"] = time() return parsed @app.get("/scrape/stock/{symbol}") async def scrape_stock(symbol: str): symbol = symbol.upper() return await scrape_yahoo_finance(symbol) # on API start - open up our scraper's http client connections @app.on_event("startup") async def app_startup(): await stock_client.__aenter__() # on API close - close our scraper's http client connections @app.on_event("shutdown") async def app_shutdown(): await stock_client.__aexit__()
我们已经用一个 scraper 函数更新了我们的 API,并将它附加到我们的 API 端点。现在让我们看看结果:
$ curl http://127.0.0.1:8000/scrape/stock/aapl { "prev_close": "156.90", "open": "157.34", "bid": "0.00 x 1000", "ask": "0.00 x 1000", "days_range": "153.67 - 158.74", "fifty_two_wk_range": "129.04 - 182.94", "td_volume": "101,696,790", "average_volume_3month": "75,324,652", "market_cap": "2.47T", "beta_5y": "1.23", "pe_ratio": "25.41", "eps_ratio": "6.05", "earnings_date": "Oct 26, 2022 - Oct 31, 2022", "dividend_and_yield": "0.92 (0.59%)", "ex_dividend_date": "Aug 05, 2022", "one_year_target_price": "182.01", "price": "153.72", "_scraped_on": 1663838493.6148243 }
尽管我们确实有一个明显的问题:API 加载,但我们只用了几行代码就把我们的爬取器变成了一个数据 API。
如果多个客户要求在短时间内爬取同一股票怎么办?我们将把资源浪费在多余的擦除上。
为了解决这个问题,让我们来看看如何添加简单的缓存。
缓存爬虫
缓存 API 内容的方法有很多,从在我们的 Web 服务器中启用缓存到使用专用缓存工具(如 Redis 或 Memchache)。
然而,对于小型 API 项目,我们可以轻松地在 Python
import asyncio import httpx from time import time from fastapi import FastAPI app = FastAPI() stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0)) STOCK_CACHE = {} # NEW: establish global cache storage CACHE_TIME = 60 # NEW: define how long do we want to keep cache in seconds async def scrape_yahoo_finance(symbol): """scrapes stock data from yahoo finance""" # NEW: check cache before we commit to scraping cache = STOCK_CACHE.get(symbol) if cache and time() - CACHE_TIME < cache["_scraped_on"]: log.debug(f"{symbol}: returning cached item") return cache log.info(f"{symbol}: scraping data") response = await stock_client.get( f"https://finance.yahoo.com/quote/{symbol}?p={symbol}" ) sel = Selector(response.text) parsed = {} rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]') for row in rows: label = row.xpath("@data-test").get().split("-value")[0].lower() value = " ".join(row.xpath(".//text()").getall()) parsed[label] = value parsed["price"] = sel.css( f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)' ).get() parsed["_scraped_on"] = time() # NEW: store successful results to cache STOCK_CACHE[symbol] = parsed return parsed @app.get("/scrape/stock/{symbol}") async def scrape_stock(symbol: str): symbol = symbol.upper() return await scrape_yahoo_finance(symbol) @app.on_event("startup") async def app_startup(): await stock_client.__aenter__() # NEW: optionally we can clear expired cache every minute to prevent # memory build up. async def clear_expired_cache(period=60.0): while True: global STOCK_CACHE log.debug(f"clearing expired cache") STOCK_CACHE = { k: v for k, v in STOCK_CACHE.items() if time() - CACHE_TIME < v["_scraped_on"] } await asyncio.sleep(period) clear_cache_task = asyncio.create_task(clear_expired_cache()) @app.on_event("shutdown") async def app_shutdown(): await stock_client.__aexit__()
上面,我们更新了我们的爬取功能,在爬取实时目标之前首先检查全局缓存。为此,我们使用一个简单的 Python 字典来按符号存储爬取的数据。
Python 字典非常快速和高效,因此我们可以在 API 的内存中缓存数千甚至数百万个结果。
我们还添加了一个重复的异步任务clear_expired_cache()
,每分钟删除过期的缓存值。我们通过使用asyncio.create_task()
将任何异步函数(协程)转换为后台任务对象的函数来做到这一点。
现在,如果我们的 API 多次收到对同一股票数据的请求,它只会爬取一次数据并将缓存提供给其他所有人:
import asyncio import httpx from time import time async def many_concurrent_api_calls(n): _start = time() async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client: _start_one = time() await client.get("http://127.0.0.1:8000/scrape/stock/aapl") print(f"completed first API scrape in {time() - _start_one:.2f} seconds") results = await asyncio.gather(*[ client.get("http://127.0.0.1:8000/scrape/stock/aapl") for i in range(n) ]) print(f"completed {n-1} API scrapes in {time() - _start:.2f} seconds") # run 1000 API calls asyncio.run(many_concurrent_api_calls(1_000)) # will print: # completed first API scrape in 1.23 seconds # completed 999 API scrapes in 2.59 seconds
缓存技术会因应用程序而异,但对于像网络爬取这样耗时耗力的任务,即使是像这样的简单设计,我们也能看到巨大的好处。
有关 FastAPI 中更高级的缓存,请参阅fastapi-cache扩展。
用于长数据爬取的 Webhook
一些爬取任务可能需要很多秒甚至几分钟才能完成,这会超时或阻止连接客户端。处理此问题的一种方法是提供 webhook 支持。
换句话说,webhooks 本质上承诺 API 将完成任务并在稍后回调结果。我们可以通过替换路由来扩展我们的 API 以包含此功能scrape_stock
:
... async def with_webhook(cor, webhook, retries=3): """execute corotine and send it to a webhook""" result = await cor async with httpx.AsyncClient( headers={"User-Agent": "scraper webhook"}, timeout=httpx.Timeout(timeout=15.0), ) as client: for i in range(retries): try: response = await client.post(webhook, json=result) return except Exception as e: log.exception(f"Failed to send a webhook {i}/{retries}") await asyncio.sleep(5) # wait between retries log.error(f"Failed to reach webhook in {retries} retries") @app.get("/scrape/stock/{symbol}") async def scrape_stock(symbol: str, webhook: Optional[str] = None): symbol = symbol.upper() scrape_cor = scrape_yahoo_finance(symbol) if webhook: # run scrape coroutine in the background task = asyncio.create_task(with_webhook(scrape_cor, webhook)) return {"success": True, "webhook": webhook} else: return await scrape_cor
在上面的更新中,如果?webhook
向我们的 API 提供了参数,它会立即返回{"success": True, "webhook": webook}
并安排一个后台任务,该任务会爬取结果并将它们发布到客户端的 webhook 地址。
为了测试这一点,我们可以使用 webhook 测试服务,例如webhook.site:
from urllib.parse import urlencode import httpx url = "http://127.0.0.1:8000/scrape/stock/aapl" + urlencode({ "webhook": "https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c" # change this to yours }) response = httpx.get(url) print(response.json()) # {'success': True, 'webhook': 'https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c'}
运行它时,我们会看到 API 立即返回,如果我们检查我们的 webhooks.site 会话,我们会看到那里传送的数据:
Webhooks 是一种很好的方式,不仅可以处理长时间的爬取任务,还可以确保我们的 API 在未来扩展,因为我们可以在需要时简单地启动更多进程来处理 webhook 任务。
最终爬虫代码
为了总结这篇文章,让我们看一下我们的 scraper API 的完整代码:
import asyncio from time import time from typing import Optional import httpx from fastapi import FastAPI from loguru import logger as log from parsel import Selector app = FastAPI() stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0)) STOCK_CACHE = {} CACHE_TIME = 10 async def scrape_yahoo_finance(symbol): """scrapes stock data from yahoo finance""" cache = STOCK_CACHE.get(symbol) if cache and time() - CACHE_TIME < cache["_scraped_on"]: log.debug(f"{symbol}: returning cached item") return cache log.info(f"{symbol}: scraping data") response = await stock_client.get( f"https://finance.yahoo.com/quote/{symbol}?p={symbol}" ) sel = Selector(response.text) parsed = {} for row in sel.xpath( '//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]' ): label = row.xpath("@data-test").get().split("-value")[0].lower() value = " ".join(row.xpath(".//text()").getall()) parsed[label] = value parsed["price"] = sel.css( f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)' ).get() parsed["_scraped_on"] = time() STOCK_CACHE[symbol] = parsed return parsed async def with_webhook(cor, webhook, retries=3): result = await cor async with httpx.AsyncClient( headers={"User-Agent": "scraper webhook"}, timeout=httpx.Timeout(timeout=15.0), ) as client: for i in range(retries): try: response = await client.post(webhook, json=result) return except Exception as e: log.exception(f"Failed to send a webhook {i}/{retries}") await asyncio.sleep(5) # wait between retries @app.get("/scrape/stock/{symbol}") async def scrape_stock(symbol: str, webhook: Optional[str] = None): symbol = symbol.upper() scrape_cor = scrape_yahoo_finance(symbol) if webhook: # run scrape coroutine in the background task = asyncio.create_task(with_webhook(scrape_cor, webhook)) return {"success": True, "webhook": webhook} else: return await scrape_cor # we can clear cache every once in a while to prevent memory build up # @app.on_event("startup") # @repeat_every(seconds=5.0) # 1 hour async def clear_expired_cache(period=5.0): while True: global STOCK_CACHE _initial_len = len(STOCK_CACHE) log.debug(f"clearing expired cache, current len {_initial_len}") STOCK_CACHE = { k: v for k, v in STOCK_CACHE.items() if v["_scraped_on"] == "scraping" or time() - CACHE_TIME < v["_scraped_on"] } log.debug(f"cleared {_initial_len - len(STOCK_CACHE)}") await asyncio.sleep(period) @app.on_event("startup") async def app_startup(): await stock_client.__aenter__() clear_cache_task = asyncio.create_task(clear_expired_cache()) @app.on_event("shutdown") async def app_shutdown(): await stock_client.__aexit__()
概括
在本实用教程中,我们快速深入了解了实时爬取数据的数据 API。我们已经介绍了如何设置 FastAPI,编写了一个简单的 Yahoo Finance 股票数据爬取器,然后将所有内容打包到一个具有缓存和 webhook 支持的内聚数据 API 中。
通过利用 Python 中现有的、强大的 Web API 生态系统,我们仅用几行实际代码就做到了这一点!