in

如何使用网络爬虫创建任意网站的搜索引擎

如何使用网络爬虫创建任何网站的搜索引擎

在本教程中,我们将了解如何通过网络抓取网站数据、解析和索引感兴趣的部分并使用直观的 GUI 将其全部包装起来,从而为任何网站创建搜索引擎。

我们将使用lunr.js javascript 搜索引擎来显示我们的搜索索引,我们将使用 Python 进行数据抓取和索引生成。

什么是 Lunrjs?

Lunr.js 是一个用于浏览器的小型全文搜索库。它为 JSON 文档编制索引并提供一个简单的搜索界面,用于检索与文本查询最匹配的文档。

由于 lunr 使用 JSON 文档作为它的索引,并且可以从单个 json 文件在浏览器中轻松运行,我们可以轻松地将它与网络抓取集成!

我们将从源中抓取 HTML 数据,将其解析为 JSON 结构并将其输入 Lunrjs 前端:创建我们自己的搜索引擎!

数据爬虫

要收集我们的搜索引擎数据,我们首先必须编写一个抓取器,我们将使用它来检索数据以进行索引。在本文示例中,我们将使用 ScrapFly API。此外,我们将使用带有少量社区包的 Python:

  • HTTP 连接的httpx包
  • parsel包,用于解析我们希望在索引中考虑的值的 HTML 数据。
  • 用于构建我们的 lunr 索引的lunr包
  • loguru [可选] 包,用于简单、漂亮的日志,因此我们可以更轻松地跟进。

我们可以通过pip命令安装这些库:

$ pip install httpx parsel lunr loguru

使用爬行

为了收集索引数据,我们将使用称为爬行的网络抓取技术。抓取本质上是一个网络抓取循环,我们的程序在其中不断收集文档,找到更多要抓取的 url 并重复该过程,直到找不到任何新内容。

在 Python 中,我们可以使用我们的工具来说明这个httpx过程parsel

import asyncio
from typing import List

import httpx
from loguru import logger as log
from parsel import Selector


def find_urls(resp: httpx.Response, xpath: str) -> set:
    """find crawlable urls in a response from an xpath"""
    found = set()
    urls = Selector(text=resp.text).xpath(xpath).getall()
    for url in urls:
        url = httpx.URL(resp.url).join(url.split("#")[0])
        if url.host != resp.url.host:
            log.debug(f"skipping url of a different hostname: {url.host}")
            continue
        found.add(str(url))
    return found


async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]:
    """Crawl source with provided follow rules"""
    urls_seen = set()
    urls_to_crawl = [url]
    all_responses = []
    depth = 0
    while urls_to_crawl:
        # first we want to protect ourselves from accidental infinite crawl loops
        if depth > max_depth:
            log.error(
                f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue"
            )
            break
        log.info(f"scraping: {len(urls_to_crawl)} urls")
        responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl])
        found_urls = set()
        for resp in responses:
            all_responses.append(resp)
            found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath))
        # find more urls to crawl that we haven't visited before:
        urls_to_crawl = found_urls.difference(urls_seen)
        urls_seen = urls_seen.union(found_urls)
        depth += 1
    log.info(f"found {len(all_responses)} responses")
    return all_responses

在上面的示例中,我们为爬虫提供了爬行规则和起点。异步递归抓取器不断抓取 url,直到它找到所有它能找到的东西。

让我们针对我们的示例目标运行它 – ScrapFly:

# Example use:
async def run():
    limits = httpx.Limits(max_connections=3)
    headers = {"User-Agent": "ScrapFly Blog article"}
    async with httpx.AsyncClient(limits=limits, headers=headers) as session:
        responses = await crawl(
            # our starting point url
            url="https://scrapfly.io/docs", 
            # xpath to discover urls to crawl
            follow_xpath="//ul[contains(@class,'nav')]//li/a/@href",
            session=session,
        )

if __name__ == "__main__":
    asyncio.run(run())

我们可以看到这个爬虫很快给我们生成了 23 个页面:

2022-05-26 | INFO     | __main__:crawl:33 - scraping: 1 urls
2022-05-26 | INFO     | __main__:crawl:33 - scraping: 22 urls
2022-05-26 | INFO     | __main__:crawl:43 - found 23 responses

收集页面后,我们可以开始解析它们以获取我们将在搜索引擎中使用的数据。

HTML 清理

由于我们在索引中使用 HTML 值,因此我们获得了查看高亮显示、链接和图片等富文本的优势,但是为此我们必须清理我们的 HTML 数据以防止不必要的值污染我们的索引。

from httpx import Response
from parsel import Selector
from urllib.parse import urljoin

def get_clean_html_tree(
    resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]")
):
    """cleanup HTML tree from domain specific details like classes"""
    sel = Selector(text=resp.text)
    for remove_xp in remove_xpaths:
        for rm_node in sel.xpath(remove_xp):
            rm_node.remove()
    allowed_attributes = ["src", "href", "width", "height"]
    for el in sel.xpath("//*"):
        for k in list(el.root.attrib):
            if k in allowed_attributes:
                continue
            el.root.attrib.pop(k)
        # turn all link to absolute
        if el.root.attrib.get("href"):
            el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"])
        if el.root.attrib.get("src"):
            el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"])
    return sel

在这里,我们有我们的清理功能,它将删除不必要的 HTML 节点属性。对于高质量的搜索引擎,重要的是要清理我们的数据以防止误报。

索引部分

收集 HTML 后,我们可以开始解析这些数据以获取搜索索引。
对于这一部分,让我们将每个页面拆分为我们将用于生成索引的部分。这将使我们能够为我们的搜索引擎创建更好的索引。

对于部分,让我们按标题拆分页面。

因此,单个页面将产生多个索引目标。#这对我们的网络搜索引擎来说非常有用,因为标题通常表示主题或子主题,而且通常我们可以使用工件语法直接链接到页面的这一部分。
例如,当我们访问https://scrapfly.io/docs/project#introduction时,我们的浏览器会自动滚动到 Introduction 标题。这是由 HTML 节点的id属性控制的。

要按部分拆分 HTML,我们可以在 Python 中使用一个简单的解析算法:

def parse(responses: List[Response]) -> List[dict]:
    """parse responses for index documents"""
    log.info(f"parsing documents from {len(responses)} responses")
    documents = []
    for resp in responses:
        sel = get_clean_html_tree(resp)

        sections = []
        # some pages might have multiple article bodies:
        for article in sel.xpath("//article"):
            section = []
            for node in article.xpath("*"):
                # separate page by <hX> nodes
                if re.search(r"h\d", node.root.tag) and len(section) > 1:
                    sections.append(section)
                    section = [node]
                else:
                    section.append(node)
            if section:
                sections.append(section)

        page_title = sel.xpath("//h1/text()").get("").strip()
        for section in sections:
            data = {
                "title": f"{page_title} | "
                + "".join(section[0].xpath(".//text()").getall()).strip(),
                "text": "".join(s.get() for s in section[1:]).strip(),
            }
            url_with_id_pointer = (
                str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"])
            )
            data["location"] = url_with_id_pointer
            documents.append(data)
    return documents

上面的解析代码通过任何标题元素(如 等)拆分我们的 html 树h1h2此外,让我们将这些部分解析为由标题组成的索引文档,标题是该hX部分的节点文本和 html 正文。

建筑指数

我们已经准备好我们的抓取器和解析器——是时候建立我们的索引了。我们的索引将包含 JSON 文档,这些文档是我们之前提取的文章部分:

[
  {
    "title": "title of the section",
    "text": "html value of the section",
  },
...
]

建立索引的方法很少lunrjs,但最简单的方法是使用lunrpython 包:

import json
import lunr

def build_index(docs: List[dict]):
    """build lunrjs index from provided list of documents"""
    log.info(f"building index from {len(docs)} documents")
    config = {
        "lang": ["en"],
        "min_search_length": 1,
    }
    page_dicts = {"docs": docs, "config": config}
    idx = lunr(
        ref="location",
        fields=("title", "text"),
        documents=docs,
        languages=["en"],
    )
    page_dicts["index"] = idx.serialize()
    return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)

此函数接收文档列表并生成 lunr 索引。让我们试一试!

把一切放在一起

我们定义了所有组件:

  • 收集 HTML 文档的爬虫。
  • 按部分解析每个 HTML 文档的解析器。
  • 索引生成器,将部分文档转换为一个 lunrjs JSON 索引。

我们最终的项目代码

import asyncio
import json
import re
from typing import List
from urllib.parse import urljoin

import httpx
from httpx import Response
from loguru import logger as log
from lunr import lunr
from parsel import Selector


def find_urls(resp: httpx.Response, xpath: str) -> set:
    """find crawlable urls in a response from an xpath"""
    found = set()
    urls = Selector(text=resp.text).xpath(xpath).getall()
    for url in urls:
        url = httpx.URL(resp.url).join(url.split("#")[0])
        if url.host != resp.url.host:
            log.debug(f"skipping url of a different hostname: {url.host}")
            continue
        found.add(str(url))
    return found


async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]:
    """Crawl source with provided follow rules"""
    urls_seen = set()
    urls_to_crawl = [url]
    all_responses = []
    depth = 0
    while urls_to_crawl:
        # first we want to protect ourselves from accidental infinite crawl loops
        if depth > max_depth:
            log.error(
                f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue"
            )
            break
        log.info(f"scraping: {len(urls_to_crawl)} urls")
        responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl])
        found_urls = set()
        for resp in responses:
            all_responses.append(resp)
            found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath))
        # find more urls to crawl that we haven't visited before:
        urls_to_crawl = found_urls.difference(urls_seen)
        urls_seen = urls_seen.union(found_urls)
        depth += 1
    log.info(f"found {len(all_responses)} responses")
    return all_responses


def get_clean_html_tree(
    resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]")
):
    """cleanup HTML tree from domain specific details like classes"""
    sel = Selector(text=resp.text)
    for remove_xp in remove_xpaths:
        for rm_node in sel.xpath(remove_xp):
            rm_node.remove()
    allowed_attributes = ["src", "href", "width", "height"]
    for el in sel.xpath("//*"):
        for k in list(el.root.attrib):
            if k in allowed_attributes:
                continue
            el.root.attrib.pop(k)
        # turn all link to absolute
        if el.root.attrib.get("href"):
            el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"])
        if el.root.attrib.get("src"):
            el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"])
    return sel


def parse(responses: List[Response]) -> List[dict]:
    """parse responses for index documents"""
    log.info(f"parsing documents from {len(responses)} responses")
    documents = []
    for resp in responses:
        sel = get_clean_html_tree(resp)

        sections = []
        # some pages might have multiple article bodies:
        for article in sel.xpath("//article"):
            section = []
            for node in article.xpath("*"):
                # separate page by <hX> nodes
                if re.search(r"h\d", node.root.tag) and len(section) > 1:
                    sections.append(section)
                    section = [node]
                else:
                    section.append(node)
            if section:
                sections.append(section)

        page_title = sel.xpath("//h1/text()").get("").strip()
        for section in sections:
            data = {
                "title": f"{page_title} | "
                + "".join(section[0].xpath(".//text()").getall()).strip(),
                "text": "".join(s.get() for s in section[1:]).strip(),
            }
            url_with_id_pointer = (
                str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"])
            )
            data["location"] = url_with_id_pointer
            documents.append(data)
    return documents


def build_index(docs: List[dict]):
    """build lunrjs index from provided list of documents"""
    log.info(f"building index from {len(docs)} documents")
    config = {
        "lang": ["en"],
        "min_search_length": 1,
    }
    page_dicts = {"docs": docs, "config": config}
    idx = lunr(
        ref="location",
        fields=("title", "text"),
        documents=docs,
        languages=["en"],
    )
    page_dicts["index"] = idx.serialize()
    return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)


async def run():
    """
    example run function:
    establishes http session, crawls html documents, 
    turns them into index documents and compiles lunr index
    """
    limits = httpx.Limits(max_connections=3)
    timeout = httpx.Timeout(20.0)
    headers = {"User-Agent": "ScrapFly Blog article"}
    async with httpx.AsyncClient(
        limits=limits, headers=headers, timeout=timeout
    ) as session:
        responses = await crawl(
            # our starting point url
            url="https://scrapfly.io/docs",
            # xpath to discover urls to crawl
            follow_xpath="//ul[contains(@class,'nav')]//li/a/@href",
            session=session,
        )
        documents = parse(responses)
        with open("search_index.json", "w") as f:
            f.write(build_index(documents))


if __name__ == "__main__":
    asyncio.run(run())

如果我们运行此代码search_index.json将生成。没有前端对我们来说没有多大用处,所以让我们安装一个吧!

前端资源管理器

对于我们的搜索前端,为了本教程的目的,我们将一个简单的查看器放在一起,它可以在我们的 github 上找到

让我们克隆这个前端并给它我们生成的索引:

# create project directory
$ cd docs-search
# clone our front-end
$ git clone https://github.com/Granitosaurus/simple-lunrjs-display
$ cd simple-lunrjs-display
# replace search_index.json with the one we generated
$ cp ../search_index.json .
# start a http server to see our search engine live!
$ python -m http.server --bind 127.0.0.1

现在,如果我们转到http://127.0.0.1:8000,我们就可以探索我们的搜索了!

概括

在本教程中,我们使用 Python 和lunrjs框架从网络抓取的数据创建了一个搜索引擎。我们已经开始为我们的源编写一个爬虫,它递归地抓取所有 HTML 数据。然后,我们了解了通过将 HTML 文档数据解析为部分来创建索引,我们随后将这些部分输入到我们的 lunrjs 索引生成器中以预构建我们的索引。

使用这样的搜索引擎是呈现网络抓取数据和创建个人知识库的好方法。

Written by 河小马

河小马是一位杰出的数字营销行业领袖,广告中国论坛的重要成员,其专业技能涵盖了PPC广告、域名停放、网站开发、联盟营销以及跨境电商咨询等多个领域。作为一位资深程序开发者,他不仅具备强大的技术能力,而且在出海网络营销方面拥有超过13年的经验。