在本教程中,我们将了解如何通过网络抓取网站数据、解析和索引感兴趣的部分并使用直观的 GUI 将其全部包装起来,从而为任何网站创建搜索引擎。
我们将使用lunr.js javascript 搜索引擎来显示我们的搜索索引,我们将使用 Python 进行数据抓取和索引生成。
什么是 Lunrjs?
Lunr.js 是一个用于浏览器的小型全文搜索库。它为 JSON 文档编制索引并提供一个简单的搜索界面,用于检索与文本查询最匹配的文档。
由于 lunr 使用 JSON 文档作为它的索引,并且可以从单个 json 文件在浏览器中轻松运行,我们可以轻松地将它与网络抓取集成!
我们将从源中抓取 HTML 数据,将其解析为 JSON 结构并将其输入 Lunrjs 前端:创建我们自己的搜索引擎!
要收集我们的搜索引擎数据,我们首先必须编写一个抓取器,我们将使用它来检索数据以进行索引。在本文示例中,我们将使用 ScrapFly API。此外,我们将使用带有少量社区包的 Python:
- HTTP 连接的httpx包
- parsel包,用于解析我们希望在索引中考虑的值的 HTML 数据。
- 用于构建我们的 lunr 索引的lunr包。
- loguru [可选] 包,用于简单、漂亮的日志,因此我们可以更轻松地跟进。
$ pip install httpx parsel lunr loguru
为了收集索引数据,我们将使用称为爬行的网络抓取技术。抓取本质上是一个网络抓取循环,我们的程序在其中不断收集文档,找到更多要抓取的 url 并重复该过程,直到找不到任何新内容。
在 Python 中,我们可以使用我们的工具来说明这个httpx
import asyncio from typing import List import httpx from loguru import logger as log from parsel import Selector def find_urls(resp: httpx.Response, xpath: str) -> set: """find crawlable urls in a response from an xpath""" found = set() urls = Selector(text=resp.text).xpath(xpath).getall() for url in urls: url = httpx.URL(resp.url).join(url.split("#")[0]) if url.host != resp.url.host: log.debug(f"skipping url of a different hostname: {url.host}") continue found.add(str(url)) return found async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]: """Crawl source with provided follow rules""" urls_seen = set() urls_to_crawl = [url] all_responses = [] depth = 0 while urls_to_crawl: # first we want to protect ourselves from accidental infinite crawl loops if depth > max_depth: log.error( f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue" ) break log.info(f"scraping: {len(urls_to_crawl)} urls") responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl]) found_urls = set() for resp in responses: all_responses.append(resp) found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath)) # find more urls to crawl that we haven't visited before: urls_to_crawl = found_urls.difference(urls_seen) urls_seen = urls_seen.union(found_urls) depth += 1 log.info(f"found {len(all_responses)} responses") return all_responses
在上面的示例中,我们为爬虫提供了爬行规则和起点。异步递归抓取器不断抓取 url,直到它找到所有它能找到的东西。
让我们针对我们的示例目标运行它 – ScrapFly:
# Example use: async def run(): limits = httpx.Limits(max_connections=3) headers = {"User-Agent": "ScrapFly Blog article"} async with httpx.AsyncClient(limits=limits, headers=headers) as session: responses = await crawl( # our starting point url url="https://scrapfly.io/docs", # xpath to discover urls to crawl follow_xpath="//ul[contains(@class,'nav')]//li/a/@href", session=session, ) if __name__ == "__main__": asyncio.run(run())
我们可以看到这个爬虫很快给我们生成了 23 个页面:
2022-05-26 | INFO | __main__:crawl:33 - scraping: 1 urls 2022-05-26 | INFO | __main__:crawl:33 - scraping: 22 urls 2022-05-26 | INFO | __main__:crawl:43 - found 23 responses
由于我们在索引中使用 HTML 值,因此我们获得了查看高亮显示、链接和图片等富文本的优势,但是为此我们必须清理我们的 HTML 数据以防止不必要的值污染我们的索引。
from httpx import Response from parsel import Selector from urllib.parse import urljoin def get_clean_html_tree( resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]") ): """cleanup HTML tree from domain specific details like classes""" sel = Selector(text=resp.text) for remove_xp in remove_xpaths: for rm_node in sel.xpath(remove_xp): rm_node.remove() allowed_attributes = ["src", "href", "width", "height"] for el in sel.xpath("//*"): for k in list(el.root.attrib): if k in allowed_attributes: continue el.root.attrib.pop(k) # turn all link to absolute if el.root.attrib.get("href"): el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"]) if el.root.attrib.get("src"): el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"]) return sel
在这里,我们有我们的清理功能,它将删除不必要的 HTML 节点属性。对于高质量的搜索引擎,重要的是要清理我们的数据以防止误报。
收集 HTML 后,我们可以开始解析这些数据以获取搜索索引。
例如,当我们访问https://scrapfly.io/docs/project#introduction时,我们的浏览器会自动滚动到 Introduction 标题。这是由 HTML 节点的id
要按部分拆分 HTML,我们可以在 Python 中使用一个简单的解析算法:
def parse(responses: List[Response]) -> List[dict]: """parse responses for index documents""" log.info(f"parsing documents from {len(responses)} responses") documents = [] for resp in responses: sel = get_clean_html_tree(resp) sections = [] # some pages might have multiple article bodies: for article in sel.xpath("//article"): section = [] for node in article.xpath("*"): # separate page by <hX> nodes if re.search(r"h\d", node.root.tag) and len(section) > 1: sections.append(section) section = [node] else: section.append(node) if section: sections.append(section) page_title = sel.xpath("//h1/text()").get("").strip() for section in sections: data = { "title": f"{page_title} | " + "".join(section[0].xpath(".//text()").getall()).strip(), "text": "".join(s.get() for s in section[1:]).strip(), } url_with_id_pointer = ( str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"]) ) data["location"] = url_with_id_pointer documents.append(data) return documents
上面的解析代码通过任何标题元素(如 等)拆分我们的 html 树h1
部分的节点文本和 html 正文。
我们已经准备好我们的抓取器和解析器——是时候建立我们的索引了。我们的索引将包含 JSON 文档,这些文档是我们之前提取的文章部分:
[ { "title": "title of the section", "text": "html value of the section", }, ... ]
python 包:
import json import lunr def build_index(docs: List[dict]): """build lunrjs index from provided list of documents""" log.info(f"building index from {len(docs)} documents") config = { "lang": ["en"], "min_search_length": 1, } page_dicts = {"docs": docs, "config": config} idx = lunr( ref="location", fields=("title", "text"), documents=docs, languages=["en"], ) page_dicts["index"] = idx.serialize() return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)
此函数接收文档列表并生成 lunr 索引。让我们试一试!
- 收集 HTML 文档的爬虫。
- 按部分解析每个 HTML 文档的解析器。
- 索引生成器,将部分文档转换为一个 lunrjs JSON 索引。
import asyncio import json import re from typing import List from urllib.parse import urljoin import httpx from httpx import Response from loguru import logger as log from lunr import lunr from parsel import Selector def find_urls(resp: httpx.Response, xpath: str) -> set: """find crawlable urls in a response from an xpath""" found = set() urls = Selector(text=resp.text).xpath(xpath).getall() for url in urls: url = httpx.URL(resp.url).join(url.split("#")[0]) if url.host != resp.url.host: log.debug(f"skipping url of a different hostname: {url.host}") continue found.add(str(url)) return found async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]: """Crawl source with provided follow rules""" urls_seen = set() urls_to_crawl = [url] all_responses = [] depth = 0 while urls_to_crawl: # first we want to protect ourselves from accidental infinite crawl loops if depth > max_depth: log.error( f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue" ) break log.info(f"scraping: {len(urls_to_crawl)} urls") responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl]) found_urls = set() for resp in responses: all_responses.append(resp) found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath)) # find more urls to crawl that we haven't visited before: urls_to_crawl = found_urls.difference(urls_seen) urls_seen = urls_seen.union(found_urls) depth += 1 log.info(f"found {len(all_responses)} responses") return all_responses def get_clean_html_tree( resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]") ): """cleanup HTML tree from domain specific details like classes""" sel = Selector(text=resp.text) for remove_xp in remove_xpaths: for rm_node in sel.xpath(remove_xp): rm_node.remove() allowed_attributes = ["src", "href", "width", "height"] for el in sel.xpath("//*"): for k in list(el.root.attrib): if k in allowed_attributes: continue el.root.attrib.pop(k) # turn all link to absolute if el.root.attrib.get("href"): el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"]) if el.root.attrib.get("src"): el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"]) return sel def parse(responses: List[Response]) -> List[dict]: """parse responses for index documents""" log.info(f"parsing documents from {len(responses)} responses") documents = [] for resp in responses: sel = get_clean_html_tree(resp) sections = [] # some pages might have multiple article bodies: for article in sel.xpath("//article"): section = [] for node in article.xpath("*"): # separate page by <hX> nodes if re.search(r"h\d", node.root.tag) and len(section) > 1: sections.append(section) section = [node] else: section.append(node) if section: sections.append(section) page_title = sel.xpath("//h1/text()").get("").strip() for section in sections: data = { "title": f"{page_title} | " + "".join(section[0].xpath(".//text()").getall()).strip(), "text": "".join(s.get() for s in section[1:]).strip(), } url_with_id_pointer = ( str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"]) ) data["location"] = url_with_id_pointer documents.append(data) return documents def build_index(docs: List[dict]): """build lunrjs index from provided list of documents""" log.info(f"building index from {len(docs)} documents") config = { "lang": ["en"], "min_search_length": 1, } page_dicts = {"docs": docs, "config": config} idx = lunr( ref="location", fields=("title", "text"), documents=docs, languages=["en"], ) page_dicts["index"] = idx.serialize() return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2) async def run(): """ example run function: establishes http session, crawls html documents, turns them into index documents and compiles lunr index """ limits = httpx.Limits(max_connections=3) timeout = httpx.Timeout(20.0) headers = {"User-Agent": "ScrapFly Blog article"} async with httpx.AsyncClient( limits=limits, headers=headers, timeout=timeout ) as session: responses = await crawl( # our starting point url url="https://scrapfly.io/docs", # xpath to discover urls to crawl follow_xpath="//ul[contains(@class,'nav')]//li/a/@href", session=session, ) documents = parse(responses) with open("search_index.json", "w") as f: f.write(build_index(documents)) if __name__ == "__main__": asyncio.run(run())
对于我们的搜索前端,为了本教程的目的,我们将一个简单的查看器放在一起,它可以在我们的 github 上找到
# create project directory $ cd docs-search # clone our front-end $ git clone https://github.com/Granitosaurus/simple-lunrjs-display $ cd simple-lunrjs-display # replace search_index.json with the one we generated $ cp ../search_index.json . # start a http server to see our search engine live! $ python -m http.server --bind
在本教程中,我们使用 Python 和lunrjs框架从网络抓取的数据创建了一个搜索引擎。我们已经开始为我们的源编写一个爬虫,它递归地抓取所有 HTML 数据。然后,我们了解了通过将 HTML 文档数据解析为部分来创建索引,我们随后将这些部分输入到我们的 lunrjs 索引生成器中以预构建我们的索引。