Source code for ocdsindex.crawler
import os
from collections import defaultdict
from urllib.parse import urljoin
import lxml.html
def true(root, file):
return True
[docs]
class Crawler:
"""
Crawls a directory for documents to index.
"""
[docs]
def __init__(self, directory, base_url, extract, *, allow=true):
"""
:param str directory: the directory to crawl
:param str base_url: the remote URL at which the files will be available
:param extract: a function that accepts a file's remote URL and its root HTML element, and returns the
documents to index as a list of dicts
:param allow: a function that accepts a directory path and a file basename, and returns whether to crawl the
file as a boolean
"""
self.directory = directory
self.base_url = base_url
self.extract = extract
self.allow = allow
[docs]
def get_documents_by_language(self):
"""
Returns the documents to index for each language.
:returns: a dict in which the key is a language code and the value is the documents to index
:rtype: dict
"""
documents = defaultdict(list)
# The entries are sorted to make it easier to manually test whether output has changed.
for entry in sorted(os.scandir(self.directory), key=lambda entry: entry.name):
if not entry.is_dir() or len(entry.name) != 2: # not an ISO 639-1 language code directory
continue
for root, dirs, files in os.walk(entry.path):
for file in files:
if self.allow(root, file):
documents[entry.name].extend(self.get_documents_from_file(os.path.join(root, file)))
return documents
[docs]
def get_documents_from_file(self, path):
"""
Parses the file's HTML contents, calculates its remote URL, and returns the documents to index from the file.
:param str path: a file path
:returns: the documents to index
:rtype: list
"""
if not path.endswith(".html"):
return []
with open(path) as f:
content = f.read()
url = urljoin(self.base_url, os.path.relpath(path, self.directory).replace(os.sep, "/"))
if url.endswith("/index.html"):
url = url[:-10]
tree = lxml.html.fromstring(content)
return self.extract(url, tree)