diff --git a/.gitignore b/.gitignore index d2bfd51..e1f738e 100644 --- a/.gitignore +++ b/.gitignore @@ -214,4 +214,8 @@ config/config.yaml # input and output files need to be explicitly added input -output \ No newline at end of file +output + +# (Debug) Output file types +*.txt +*.html diff --git a/README.md b/README.md index 4849ccf..70c9510 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ More info on statistical scraping [here](https://github.com/SNStatComp/SSIG) # Getting started - Install all required packages using > pip install -r requirements.txt + > playwright install + > playwright install-deps - Activate the environment - run the following command to install modules in src as packages for proper import > pip install -e . diff --git a/config/config_template.yaml b/config/config_template.yaml index c53d1fe..b2ed76c 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -12,15 +12,17 @@ requests: timeout_read: 7 # In seconds max_retries: 3 input: - input_dir: ../input + input_dir: input input_files: + skip_domains: skipdomains.txt urls: urls.txt - keywords: keywords.txt + netloc_keywords: keywords.txt + path_keywords: keywords.txt url_max: 100 url_offset: 0 input_variables: output: - output_dir: ../output + output_dir: output batchsize: 100 logs: logs crawl: diff --git a/src/analysis/__init__.py b/output/logs/.keep similarity index 100% rename from src/analysis/__init__.py rename to output/logs/.keep diff --git a/requirements.txt b/requirements.txt index 83eb19f..4817468 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,163 +1,57 @@ -aiobotocore==2.24.0 -aiohappyeyeballs==2.6.1 -aiohttp==3.12.15 -aioitertools==0.12.0 -aiosignal==1.4.0 -alembic==1.16.4 -annotated-types==0.7.0 antlr4-python3-runtime==4.9.3 -anyio==4.10.0 -asttokens==3.0.0 -attrs==25.3.0 -beautifulsoup4==4.13.4 -blinker==1.9.0 -botocore==1.39.11 -cachetools==5.5.2 -certifi==2025.8.3 -chardet==5.2.0 -charset-normalizer==3.4.3 -click==8.2.1 -cloudpickle==3.1.1 -comm==0.2.3 -contourpy==1.3.3 -cssselect==1.3.0 -cycler==0.12.1 -dask==2025.7.0 -databricks-sdk==0.62.0 -debugpy==1.8.16 -decorator==5.2.1 -docker==7.1.0 -duckdb==1.3.2 -executing==2.2.0 -fastapi==0.116.1 -fastjsonschema==2.21.1 -Flask==3.1.1 -fonttools==4.59.0 -frozendict==2.4.7 -frozenlist==1.7.0 -fsspec==2025.7.0 -GDAL==3.8.4 -geopandas==1.1.1 -gitdb==4.0.12 -GitPython==3.1.45 -google-auth==2.40.3 -graphene==3.4.3 -graphql-core==3.2.6 -graphql-relay==3.2.0 -greenlet==3.2.4 -gunicorn==23.0.0 -h11==0.16.0 -idna==3.10 -importlib_metadata==8.7.0 -ipykernel==6.30.1 -ipython==9.4.0 -ipython_pygments_lexers==1.1.1 -itsdangerous==2.2.0 -jedi==0.19.2 -Jinja2==3.1.6 -jmespath==1.0.1 -joblib==1.5.1 -jsonschema==4.25.0 -jsonschema-specifications==2025.4.1 -jupyter_client==8.6.3 -jupyter_core==5.8.1 -jusText==3.0.2 -kiwisolver==1.4.9 -langdetect==1.0.9 -locket==1.0.0 -lxml==6.0.2 -lxml_html_clean==0.4.3 -Mako==1.3.10 -markdown-it-py==4.0.0 -MarkupSafe==3.0.2 -matplotlib==3.10.5 -matplotlib-inline==0.1.7 -mdurl==0.1.2 -mlflow==3.2.0 -mlflow-skinny==3.2.0 -mlflow-tracing==3.2.0 -multidict==6.6.3 -narwhals==2.0.1 -nbclient==0.10.2 -nbformat==5.10.4 -nest-asyncio==1.6.0 -nltk==3.9.1 -numpy==2.3.2 -nvidia-nccl-cu12==2.27.7 -omegaconf==2.3.0 -opentelemetry-api==1.36.0 -opentelemetry-sdk==1.36.0 -opentelemetry-semantic-conventions==0.57b0 -packaging==25.0 -pandas==2.3.1 -parso==0.8.4 -partd==1.4.2 -patsy==1.0.1 -pexpect==4.9.0 -pillow==11.3.0 -platformdirs==4.3.8 -playwright==1.58.0 -plotly==6.2.0 -polars==1.32.2 -prompt_toolkit==3.0.51 -propcache==0.3.2 -protobuf==6.31.1 -psutil==7.0.0 -ptyprocess==0.7.0 -pure_eval==0.2.3 -pyarrow==21.0.0 -pyasn1==0.6.1 -pyasn1_modules==0.4.2 -pydantic==2.11.7 -pydantic_core==2.33.2 -pyee==13.0.0 -Pygments==2.19.2 -pyogrio==0.11.1 -pyparsing==3.2.3 -pyproj==3.7.1 +attrs==26.1.0 +automat==25.4.16 +beautifulsoup4==4.15.0 +bs4==0.0.2 +build==1.5.0 +certifi==2026.6.17 +cffi==2.0.0 +charset-normalizer==3.4.7 +click==8.4.2 +constantly==23.10.4 +cryptography==49.0.0 +cssselect==1.4.0 +defusedxml==0.7.1 +filelock==3.29.4 +greenlet==3.5.3 +hyperlink==21.0.0 +idna==3.18 +incremental==24.11.0 +itemadapter==0.13.1 +itemloaders==1.4.0 +jmespath==1.1.0 +lxml==6.1.1 +numpy==2.5.0 +omegaconf==2.3.1 +packaging==26.2 +pandas==3.0.3 +parsel==1.11.0 +pip==26.1.2 +pip-tools==7.5.3 +playwright==1.61.0 +protego==0.6.2 +pyarrow==24.0.0 +pycparser==3.0 +pydispatcher==2.0.7 +pyee==13.0.1 +pyopenssl==26.3.0 +pyproject-hooks==1.2.0 python-dateutil==2.9.0.post0 -pytz==2025.2 -PyYAML==6.0.2 -pyzmq==27.0.1 -readability-lxml==0.8.4.1 -referencing==0.36.2 -regex==2025.7.34 -requests==2.32.4 -rich==14.2.0 -rpds-py==0.27.0 -rsa==4.9.1 -s3fs==2025.7.0 -scikit-learn==1.7.1 -scipy==1.16.1 -seaborn==0.13.2 -setuptools==80.9.0 -shapely==2.1.1 -sitemap==20191121 +pyyaml==6.0.3 +queuelib==1.9.0 +requests==2.34.2 +requests-file==3.0.1 +scrapy==2.16.0 +service-identity==26.1.0 +setuptools==82.0.1 six==1.17.0 -smmap==5.0.2 -sniffio==1.3.1 -soupsieve==2.7 -SQLAlchemy==2.0.42 -sqlparse==0.5.3 -stack-data==0.6.3 -starlette==0.47.2 -statsmodels==0.14.5 -threadpoolctl==3.6.0 -toolz==1.0.0 -tornado==6.5.2 -tqdm==4.67.1 -traitlets==5.14.3 -typing-inspection==0.4.1 -typing_extensions==4.14.1 -tzdata==2025.2 -ultimate-sitemap-parser==1.6.0 -urllib3==2.5.0 -uv==0.8.8 -uvicorn==0.35.0 -wcwidth==0.2.13 -Werkzeug==3.1.3 -wheel==0.45.1 -wrapt==1.17.2 -xgboost==3.0.3 -yarl==1.20.1 -zipp==3.23.0 +soupsieve==2.8.4 +tldextract==5.3.1 +twisted==26.4.0 +typing-extensions==4.15.0 +ultimate-sitemap-parser==1.8.1 +urllib3==2.7.0 +validators==0.35.0 +w3lib==2.4.1 +wheel==0.47.0 +zope-interface==8.5 diff --git a/src/analysis/analyze_results.py b/src/analysis/analyze_results.py deleted file mode 100644 index fb52cb1..0000000 --- a/src/analysis/analyze_results.py +++ /dev/null @@ -1,149 +0,0 @@ -import os -import logging -from typing import Set -import re - -import pandas as pd - -from util import setup - -CONFIG = setup("../config/config.yaml") - - -def is_valid_string(s): - if not isinstance(s, str): - return False - if len(s) == 0: - return False - strange_chars = re.findall(r'[\x00-\x08\x0B\x0E-\x1F\x7F]', str(s)) - return not (len(strange_chars) / len(s)) > 0.1 - - -class ParquetReader(object): - def __init__( - self, - dir_parquets: str, - filter_valid_content: bool = True): - """ - Reader finds (all) parquet files in given folder and yields each as a pd.DataFrame - """ - self._dir_parquets = dir_parquets - logging.info(f"ParquetReader will search for parquet files in: {dir_parquets}.") - self._filter_valid_content = filter_valid_content - logging.info(f"ParquetReader will filter content for valid strings: {filter_valid_content}.") - - def __iter__(self): - cnt = 0 - for root, dirs, files in os.walk(self._dir_parquets): - for file in files: - if file.endswith('.parquet'): - cnt += 1 - file_path = os.path.join(root, file) - logging.debug(f"Yielding parquet file: {file_path}.") - df = pd.read_parquet(file_path) - if not self._filter_valid_content: - yield df - else: - yield df[df['content'].apply(is_valid_string)] - logging.info(f"ParquetReader iterated through {cnt} parquet files in total.") - - -def get_baseurls(df: pd.DataFrame) -> Set: - return set(list(df['base_url'].drop_duplicates())) - - -class LogReader(object): - def __init__( - self, - dir_logs: str): - """ - """ - self._dir_logs = dir_logs - logging.info(f"LogReader will search for log files in: {dir_logs}.") - - def __iter__(self): - cnt = 0 - for root, dirs, files in os.walk(self._dir_logs): - for file in files: - if file.endswith('.log'): - cnt += 1 - file_path = os.path.join(root, file) - logging.debug(f"Yielding log file: {file_path}.") - with open(file_path, 'r', newline='\n') as filelog: - for line in filelog: - yield line - logging.info(f"LogReader iterated through {cnt} log files in total.") - - -if __name__ == "__main__": - - logging.basicConfig(level=logging.INFO) - - # Input URLs - with open(f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.urls}", 'r', encoding='utf-8') as file_in: - urls = [line.rstrip() for line in file_in] - - # Results from log file - dir_logs = f"{CONFIG.output.output_dir}/{CONFIG.output.logs}" - lr = LogReader(dir_logs=dir_logs) - urls_tried = set() - visits = {} - base_url_current = '' - - for logline in lr: - if "Trying to crawl base url: " in logline: - base_url_current = logline.split("Trying to crawl base url: ")[1].split('\n')[0] - if base_url_current not in urls_tried: - urls_tried.add(base_url_current) - visits[base_url_current] = 0 - else: - logging.error(f"This should not have happend. Twice url: {base_url_current}") - - if " visits out of maximum " in logline: - if len(base_url_current) == 0: - logging.error("This should not have happend. Found visits before new url declaration") - visits[base_url_current] = int(logline.split(" visits out of maximum ")[1].split('.')[0]) - base_url_current = '' - - for k in visits: - if k not in urls_tried: - print(k) - - logging.info(f"Processed urls: {len(urls_tried)}, of total given: {len(urls)}.") - visits_none = {k for k, v in visits.items() if v == 0} - logging.info(f"Websites without visits: {len(visits_none)}.") - visits = {k: v for k, v in visits.items() if v > 0} - logging.info(f"Websites with visits: {len(visits)}.") - - # Results in tables - dir_parquets = CONFIG.output.output_dir - pr = ParquetReader(dir_parquets=dir_parquets) - - # Analysis - urls_withcontent = set() - count_content = 0 - dfs = [] - for df in pr: - logging.debug(df.head(2)) - urls_withcontent = urls_withcontent.union(get_baseurls(df=df)) - count_content += df.shape[0] - dfs.append(df) - - logging.info(f"Total number of base-urls tried: {len(urls)}.") - logging.info(f"Total number of base-urls with scraped content: {len(urls_withcontent)}.") - logging.info(f"Total number of pages downloaded: {count_content}.") - - total = pd.concat(dfs, ignore_index=True) - logging.debug(f"Total number of base-urls with scraped content: {len(get_baseurls(df=total))}.") - logging.debug(f"Total number of pages downloaded: {total.shape[0]}.") - - gr = total.groupby(by='base_url', as_index=False)['url'].count() - gr = gr.rename(columns={'url': 'pages', 'base_url': 'count'}) - gr = gr.groupby(by='pages', as_index=False).count() - counts_stats = {row['pages']: row['count'] for row in gr.to_dict(orient='records')} - counts_out = pd.DataFrame([{'pages': k, 'counts': v} for k, v in counts_stats.items()]) - # counts_out.to_csv('scraped_pages_count.csv', index=False) - - logging.info(f"Downloaded single page for {counts_stats[1]} base-urls.") - logging.info(f"Maximum number op pages downloaded for a given base-url: {max(counts_stats.keys())}.") - \ No newline at end of file diff --git a/src/crawl/HesitantCrawler.py b/src/crawl/HesitantCrawler.py deleted file mode 100644 index 74a9de4..0000000 --- a/src/crawl/HesitantCrawler.py +++ /dev/null @@ -1,314 +0,0 @@ -from typing import List -import time -import logging -import re -import validators - -import numpy as np -from urllib.parse import urlparse, urljoin -from bs4 import BeautifulSoup - -from .base import BaseCrawler, CrawlResult -from fetch import HTMLFetcher -from util import setup - -CONFIG = setup("config/config.yaml") - - -class HesitantCrawler(BaseCrawler): - def __init__( - self, - fetcher: HTMLFetcher, - target_keywords: List[str], - add_sitemapurls: bool = False, - max_depth: int = 1): - """ - Depth-limited Search Targeted Crawler - Crawler class for obtaining urls from start_url. - Crawler will look for urls on start_url and append them to list. It will then - look for urls in the next item on the list and append urls to the end of the list. - Once the max_crawl_visits is visited, crawl stops. - - This crawler must be used as a focused crawl where target_keywords are given. - Only URLs found that meet the target keywords are stored in the results. - - Crawler will hesitate: - It will not stop at path that led to no targeted results - as long as it hasn't gone too deep yet. - The hesitancy reflects the idea to crawl on non-relevant pages because they might lead to relevant pages. - - If sitemap is also to be checked for urls, set add_sitemapurls = True - If only starting_page and sitemaps should be used exclusively, set max_crawl_visits = 0 in config file - - :param add_sitemapurls: True if urls from sitemap are added to crawl - :param target_keywords: List of targeting keywords in regex format - :param max_depth: How many steps further do we look beyond non-targeted results, defaults to 1 - """ - logging.info(f"Initializing HesitantCrawler with max_depth={max_depth}") - self.max_depth = max_depth - if max_depth < 0: - logging.debug("Only urls from starting_url (and possibly sitemap if used) can be found, since max_depth<0") - - super(HesitantCrawler, self).__init__(fetcher=fetcher) - - # crawl delay will be overwritten if robots from given domain provides a value # TODO: make sure - self.crawl_delay = 2 - logging.debug(f"Defaul crawl delay is set to {self.crawl_delay}") - - self.max_duration = CONFIG.crawl.max_duration - logging.debug(f"Max duration of crawl set to {self.max_duration} seconds") - - self.max_crawl_visits = CONFIG.crawl.max_visits - logging.debug(f"Max page visits of crawl set to {self.max_crawl_visits}") - - # Targets - self.target_keywords = target_keywords - logging.info(f"The targeted crawl will look for given keywords: {', '.join(self.target_keywords)}") - - # Excluded URLs which contain: - self._unsupported = ( - ".ics", ".mng", ".pct", ".bmp", ".gif", ".jpg", ".jpeg", ".png", ".pst", ".psp", ".tif", ".tiff", ".drw", ".dxf", ".eps", - ".woff2", ".svg", ".mp3", ".wma", ".ogg", ".wav", ".ra", ".aac", ".mid", ".aiff", ".3gp", ".asf", ".asx", ".avi", ".mp4", - ".woff", ".mpg", ".qt", ".rm", ".swf", ".wmv", ".m4a", ".css", ".pdf", ".doc", ".docx", ".exe", ".bin", ".rss", ".zip", - ".rar", ".msu", ".flv", ".dmg", ".xls", ".xlsx", ".ico", ".mng?download=true", ".pct?download=true", ".bmp?download=true", - ".gif?download=true", ".jpg?download=true", ".jpeg?download=true", ".png?download=true", ".pst?download=true", - ".psp?download=true", ".tif?download=true", ".tiff?download=true", ".ai?download=true", ".drw?download=true", - ".dxf?download=true", ".eps?download=true", ".ps?download=true", ".svg?download=true", ".mp3?download=true", - ".wma?download=true", ".ogg?download=true", ".wav?download=true", ".ra?download=true", ".aac?download=true", - ".mid?download=true", ".au?download=true", ".aiff?download=true", ".3gp?download=true", ".asf?download=true", - ".asx?download=true", ".avi?download=true", ".mov?download=true", ".mp4?download=true", ".mpg?download=true", - ".qt?download=true", ".rm?download=true", ".swf?download=true", ".wmv?download=true", ".m4a?download=true", - ".css?download=true", ".pdf?download=true", ".doc?download=true", ".exe?download=true", ".bin?download=true", - ".rss?download=true", ".zip?download=true", ".rar?download=true", ".msu?download=true", ".flv?download=true", - ".dmg?download=true") - logging.debug(f"URLs will be excluded if they contain any in path:{', '.join(self._unsupported)}") - - self.add_sitemapurls = add_sitemapurls - logging.info(f"Will we check URLs from sitemap? Answer: {add_sitemapurls}") - - def skip_this_url(self, url: str) -> bool: - """Function to see if we have already visited url""" - - # prevent duplicate crawl from trailing forward slash in URL - url = url.rstrip('/') if url.endswith('/') else url - - # Do not revisit pages - if url in self._visited: - logging.debug(f"Skip {url}, because we have visited it before") - return True # skip - return False - - def find_urls(self, url: str, html: str) -> str: - """ - Generator that yields a URLs to check for target condition - """ - - soup = BeautifulSoup(html, "html.parser") - - # Extract links - will later be checked if they are internal - for link in soup.find_all("a", href=True): - href = link["href"] - absolute_url = urljoin(url, href) # TODO: find out if necessary - absolute_url = absolute_url.rstrip('/') if absolute_url.endswith('/') else absolute_url - # parsed = urlparse(absolute_url) - - # if parsed.netloc == self.domain and absolute_url not in self._istargeted: - if absolute_url not in self._istargeted: - logging.debug(f"Found a URL to check: {absolute_url}") - yield absolute_url - - def find_target(self, parsed: str) -> str: - """Check if the parsed URL matches the target keywords in subdomain or path""" - - subdomain = parsed.netloc - logging.debug(f"Current URL subdomain is identified as: {subdomain}") - path = parsed.path - logging.debug(f"Current URL path is identified as: {path}") - - # Check for keywords in subdomain - for keyword in self.target_keywords: - first_keyword_hit = re.search(keyword, subdomain) - if first_keyword_hit is not None: - logging.debug(f"Target is met in the subdomain: {subdomain}") - logging.debug(f"Target is met with the following hit: {first_keyword_hit.group(0)}") - return first_keyword_hit.group(0) - - # Check for keywords in path - for keyword in self.target_keywords: - first_keyword_hit = re.search(keyword, path) - if first_keyword_hit is not None: - logging.debug(f"Target is met in the path: {path}") - logging.debug(f"Target is met with the following hit: {first_keyword_hit.group(0)}") - return first_keyword_hit.group(0) - - logging.debug("Target has not been met, no hit") - return '' - - def process_url(self, url: str, parent_url: str, from_sitemap: bool = False): - """check url for target and then add to results and queue""" - - if not validators.url(url): - logging.debug(f"Invalid url: {url}") - return - - if url in self._istargeted: - return - - if any(ext in url for ext in self._unsupported): - self._istargeted[url] = { - 'parent': parent_url, - 'depth': np.inf, - 'is_deadend': True} - logging.debug("Unsupported url, setting depth to infinite and deadend=True, will not be added to queue") - return - - # parse the url - parsed = urlparse(url) - domain = parsed.netloc - - # dead ends for queue - is_deadend = False - if from_sitemap: - is_deadend = True # won't be added to queue if from sitemap tree - - if domain != self.start_domain: - if domain != self._istargeted[parent_url]['domain']: - if self._istargeted[parent_url]['domain'] != self.start_domain: - # In this case we have jumped to a third domain, not allowed at all! - logging.debug("Deviated from domain twice, url is not allowed") - return - else: - # In this case we jumped the first time, allowed and we still like to crawl that site actually - pass - # TODO: check if following should be done? What if on a job board the link goes to vacancies of a different company? - # else: - # # We are still on the domain after the first jump, allowed to be targeted but no more crawl - # is_deadend = True - logging.debug(f"Result of check if the URL is a dead end: {is_deadend}") - - # determine if it is targeted - first_keyword_hit = self.find_target(parsed=parsed) - is_targeted = True if len(first_keyword_hit) > 0 else False - logging.debug(f"Result of check if the URL is targeted: {is_targeted}") - - # keep track of how far wway we've walked from targeted site - depth = 0 if is_targeted else self._istargeted[parent_url]['depth'] + 1 - logging.debug(f"Depth = steps away from a targeted URL: {depth}") - self._istargeted[url] = { - 'domain': parsed.netloc, - 'parent': parent_url, - 'depth': depth, - 'is_deadend': is_deadend} - - # Add to results if targeted - if is_targeted: - logging.info(f"Found a targeted URL: {url}") - logging.debug("Adding the URL to our list with results") - self._results.append(CrawlResult(url=url, source="NoCrawler", targeted=True, first_keyword_hit=first_keyword_hit)) - - # May anyways be added to queue of URLs to visit for more URLS - if (depth <= self.max_depth) and (not is_deadend) and (not from_sitemap): - logging.debug(f"Adding the URL to queue vor visiting with depth={depth} at max_depth={self.max_depth}") - self._queue.append(url) - - def order_queue(self): - """Reorder elements in queue by ascending depth of URL, so that targeted URLs are visited first""" - - if len(self._queue) > 0: - self._queue = sorted(self._queue, key=lambda x: self._istargeted.get(x, {'depth': np.inf})['depth']) - - def crawl(self): - """ - Main crawling function - Results can be otbained by calling get_results() - """ - - if len(self.start_url) == '': - logging.error("No start URL provided for crawler, use reset_with_starturl() to reset crawler") - return {} - logging.info(f"Starting crawl of {self.start_url}..") - - # domain - domain = urlparse(self.start_url).netloc - - # The queue will be updated with found urls and then worked through - # until a maximum number of visits or duration is reached - self._queue = [self.start_url] - start_time = time.time() - duration = 0 - - # for reference, put start_url and domain in dictionary - self._istargeted[self.start_url] = {'depth': 0, 'domain': domain, 'is_deadend': False} - self._istargeted[domain] = {'depth': 0, 'domain': domain, 'is_deadend': False} - - while self._queue and len(self._visited) < self.max_crawl_visits and duration < self.max_duration: - - # Take an element from the queue - visiting_url = self._queue.pop(0) # will start with base url, then whatever will have been added next - - # Check if we already visited URL - logging.debug(f"Check if {visiting_url} can be skipped") - if self.skip_this_url(url=visiting_url): - continue - - # Fetch from visting URL, will check robots if it is allowed (as part of Fetcher class) - try: - visiting_html, schema_indicator = self._fetcher.fetch(url=visiting_url) - self._visited[visiting_url] = visiting_html # even if nothing found, keep track of what we have tried - if len(visiting_html) == 0: # Nothing returned - continue - - for found_url in self.find_urls(url=visiting_url, html=visiting_html): - self.process_url(url=found_url, parent_url=visiting_url) - except Exception: - continue - - # At the end, measure how long we've been busy so far - duration = time.time() - start_time - - # Respect crawl delay - logging.debug("Waiting for delay to pass") - time.sleep(self.crawl_delay) - logging.debug("Delay has passed") - - # order queue by depth, ascending - so that targeted URLs are crawled before the ones further removed - self.order_queue() - - # Crawl stopped - logging.debug(f"Crawl stopped after {np.around(duration, 0)} seconds, with max duration {self.max_duration} seconds") - logging.debug(f"Crawl stopped after {len(self._visited)} page visits, with max {self.max_crawl_visits}") - logging.debug(f"Crawl stopped with {len(self._queue)} urls still in the queue") - - logging.info(f"Crawling from {self.start_url} involved checking {len(self._istargeted)} URLs for meeting the target") - logging.info(f"Crawling from {self.start_url} resulted in {len(self.get_results())} results") - logging.debug(f"Crawling from {self.start_url} results: {self.get_results()}") - - if self.add_sitemapurls: - self.extendcrawl_fromsitemaps(domain=domain) - - def extendcrawl_fromsitemaps(self, domain: str): - sitemap_urls = self._fetcher.robotsfetcher.get_sitemap_urls(domain=domain) - if sitemap_urls: - logging.info(f"Sitemaps of {self.start_url} linked to {len(sitemap_urls)} URLs to check for meeting the target") - for found_url in sitemap_urls: - self.process_url(url=found_url, parent_url=domain, from_sitemap=True) - logging.info(f"Sitemaps of {self.start_url} increased the number of results to {len(self.get_results())}") - logging.info(f"No sitemap URLs found for {self.start_url}") - - -if __name__ == "__main__": - - logging.basicConfig(level=logging.INFO) - - target_keywords = ["vacature"] - fetcher = HTMLFetcher() - - crawler = HesitantCrawler( - fetcher=fetcher, - target_keywords=target_keywords, - max_depth=-1, - add_sitemapurls=True - ) - - # Crawl can start as soon as start url provided - crawler.reset_with_starturl(start_url="https://cbs.nl") - crawler.crawl() diff --git a/src/crawl/__init__.py b/src/crawl/__init__.py deleted file mode 100644 index 77999db..0000000 --- a/src/crawl/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .base import ICrawler, NoCrawler, BaseCrawler, CrawlResult -from .HesitantCrawler import HesitantCrawler \ No newline at end of file diff --git a/src/crawl/base.py b/src/crawl/base.py deleted file mode 100644 index 36b51ca..0000000 --- a/src/crawl/base.py +++ /dev/null @@ -1,102 +0,0 @@ -from abc import ABC, abstractmethod -from typing import NamedTuple, List -import logging -from urllib.parse import urlparse - -from fetch import IFetcher - - -class CrawlResult(NamedTuple): - url: str - source: str - targeted: bool = None - first_keyword_hit: str = None - - -class ICrawler(ABC): - """ - interface for all crawlers - """ - def __init__(self, fetcher: IFetcher): - logging.info(f"Initializing crawler with fetcher of type: {type(fetcher)}") - self._fetcher = fetcher - - @abstractmethod - def reset_with_starturl(start_url: str): - """Reset crawler and set url from which to start the crawl""" - raise NotImplementedError() - - @abstractmethod - def get_results() -> List[CrawlResult]: - """Return list of crawled URLs""" - return NotImplementedError() - - @abstractmethod - def crawl(): - """Crawl candidate URLs""" - raise NotImplementedError() - - -class BaseCrawler(ICrawler): - """ - Base functionality of all Crawlers - """ - def __init__(self, fetcher: IFetcher): - super(BaseCrawler, self).__init__(fetcher=fetcher) - self.start_url = "" - self.start_domain = "" - self.crawl_delay = 2 - - def reset_results(self): - logging.debug("Crawler is (re)set with empty results") - self._results = [] # for output - self._queue = [] # for next visits - self._visited = dict() # to keep track of visited pages - self._istargeted = dict() # will keep track of urls and if they met targeting conditions - - def reset_with_starturl(self, start_url: str): - """Reset crawler and set url from which to start the crawl""" - self.reset_results() - - logging.debug(f"Crawler start url given as: {start_url}") - if not start_url.startswith('https://') and not start_url.startswith('http://'): - logging.debug("Start URL lacks required http or https prefix") - start_url = f"https://{start_url}" - logging.info(f"Prefix 'https://' added to start URL: {start_url}") - self.start_url = start_url - - self.start_domain = urlparse(start_url).netloc - - def get_results(self) -> List[CrawlResult]: - """Return list of crawled URLs""" - return self._results - - def crawl(): - """Crawl candidate URLs""" - raise NotImplementedError() - - -class NoCrawler(BaseCrawler): - """ - Do nothing Crawler for testing, just put start_url in results - """ - def __init__(self): - from fetch import NoFetcher - logging.info("Initializing NoCrawler which will not crawl") - logging.debug("Since Crawler won't go looking for urls, the NoFetcher is loaded as a dummy") - super(NoCrawler, self).__init__(fetcher=NoFetcher()) - - def crawl(self): - logging.info("Just adding start-url to the results") - result = CrawlResult(url=self.start_url, source="NoCrawler") - self._results.append(result) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - - crawler = NoCrawler() - crawler.reset_with_starturl(start_url="https://books.toscrape.com") - crawler.crawl() - for r in crawler.get_results(): - print(r) diff --git a/src/fetch/HTML.py b/src/fetch/HTML.py deleted file mode 100644 index 597cc15..0000000 --- a/src/fetch/HTML.py +++ /dev/null @@ -1,196 +0,0 @@ -import requests -from typing import Dict, Optional -import time -import random -import urllib -from urllib.parse import urlparse -import logging - -import extruct -from w3lib.html import get_base_url - -from util import setup -from .base import IFetcher - -CONFIG = setup("config/config.yaml") - - -class HTMLFetcher(IFetcher): - """ - Standard Fetcher - Fetches the HTML content of the given URL with retries and error handling. - Uses a robots fetcher - Returns a dictionary with the URL as key and the HTML content as value. - """ - def __init__( - self, - user_agent: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - headers: Optional[Dict] = None): - logging.info("Initializing HTMLFetcher") - super(HTMLFetcher, self).__init__(user_agent=user_agent) - self.user_agent = user_agent - logging.debug(f"User agent given as: {user_agent}") - - self.timeout = ( - CONFIG.requests.timeout_connect, - CONFIG.requests.timeout_read) - logging.debug(f"Timeout for connection is {CONFIG.requests.timeout_connect} seconds, for reading {CONFIG.requests.timeout_read} seconds") - - self.max_retries = CONFIG.requests.max_retries - logging.debug(f"Maximum retries set to {CONFIG.requests.max_retries}") - - self.headers = headers or { - "User-Agent": self.user_agent, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", - "Accept-Language": "en-US,en;q=0.9,nl-NL;q=0.8,nl;q=0.7", - "Accept-Encoding": "identity", - "Connection": "keep-alive" - } - - headers_str = ', '.join([f"{k}: {v}" for k, v in self.headers.items()]) - logging.debug(f"Request headers set to {headers_str}") - - # Domain will have to be identified for any given url to fetch, then the corresponding robots file will be checked - # this is handled by RobotsFetcher - from .Robots import RobotsFetcher - self.robotsfetcher = RobotsFetcher(user_agent=user_agent) - self._robots_bydomain = self.robotsfetcher.get_results() - - def resetResults(self): - self.results = {} - return - - def is_allowed(self, url: str) -> bool: - """Will check if robots of given domain allows fetching""" - - # Identify given domain to check corresponding robots file - domain = urlparse(url).netloc # obtain domain from url - logging.debug(f"The domain is identified as {domain}") - - if not self._robots_bydomain.get(domain, False): - self.robotsfetcher.fetch(domain=domain) - self._robots_bydomain[domain].read() - logging.debug(f"A new robots file has been read for domain {domain}") - - # check if allowed - if self._robots_bydomain[domain].can_fetch(useragent=self.user_agent, url=url): - return True - else: - return False - - def fetch(self, url: str) -> str: - """ - Fetches the HTML content of the given URL with retries and error handling. - Returns a dictionary with the URL as key and the HTML content as value. - """ - logging.info(f"Trying to fetch the next URL: {url}") - - # check if allowed - logging.debug("Checking if url is allowed") - if not self.is_allowed(url=url): - logging.debug(f"Given url skipped because it is not allowed: {url}") - return {} - - return self._fetch_with_retries(url) - - def _fetch_with_retries(self, url: str, retries: int = 0): - """ - Internal method that performs the request with retry logic. - """ - try: - response = requests.get(url, headers=self.headers, timeout=self.timeout) - - # Check for HTTP errors - if response.status_code != 200: - logging.warning(f"Exited with response status: {response.status_code}") - return {} - - # Check if content is HTML - if "text/html" not in response.headers.get("Content-Type", ""): - logging.info(f"Non-HTML content received for URL: {url}") - return {} - - # Success - result = response.text - - # Base URL is needed to resolve relative URLs in the metadata - base_url = get_base_url(result, response.url) - # Extract JSON-LD - schema_indicator = False - try: - data = extruct.extract(result, base_url=base_url) - - # Filter out empty formats - found_schema = {k: v for k, v in data.items() if v} - - if found_schema: - for format_type in found_schema: - if format_type == "json-ld": - for el in found_schema["json-ld"]: - if "@context" in el.keys(): - if "@type" in el.keys(): - if el["@type"] == CONFIG.crawl.schema.keyword: - logging.info(f"Element {CONFIG.crawl.schema.keyword} found using schema.org for base url: {base_url}") - print(f"{CONFIG.crawl.schema.keyword} found using schema.org for base url: {base_url}") - schema_indicator = True - if "@graph" in el.keys(): - for graphel in el["@graph"]: - if graphel["@type"] == CONFIG.crawl.schema.keyword: - logging.info(f"Graph element {CONFIG.crawl.schema.keyword} found using schema.org for base url: {base_url}") - print(f"Graph {CONFIG.crawl.schema.keyword} found using schema.org for base url: {base_url}") - schema_indicator = True - except Exception: - pass - - if CONFIG.crawl.schema.keyword in result: - logging.debug(f"Found schema keyword: {CONFIG.crawl.schema.keyword} in HTML for url: {url}, possible schema org?") - - self.results[url] = (result, schema_indicator) - return result, schema_indicator - - except requests.exceptions.RequestException as e: - # Handle exceptions - logging.info(f"Request failed for {url}. Error: {e}") - - if retries < self.max_retries: - wait_time = random.uniform(1, 5) # Random delay between 1 and 5 seconds - logging.info(f"Retrying in {wait_time:.2f} seconds...") - time.sleep(wait_time) - return self._fetch_with_retries(url, retries + 1) - - # Max retries reached - return {} - - except urllib.error.URLError as e: - logging.info(f"Request failed with exception: {e}") - return {} - - def get_results(self) -> Dict[str, str]: - """ - Returns the dictionary of fetched URLs and their HTML content. - """ - return self.results - - -if __name__ == "__main__": - - logging.basicConfig(level=logging.DEBUG) - - user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - fetcher = HTMLFetcher( - user_agent=user_agent - ) - - urls = [ - "https://example.com", - "https://books.toscrape.com", - "https://werkenbijhetcbs.nl/vacature-overzicht-express#/?page=1", - "https://books.toscrape.com/catalogue/category/books/travel_2/index.html" - ] - - for url in urls: - fetcher.fetch(url) - - for url, html in fetcher.get_results().items(): - print(f"\nURL: {url}") - print(f"...{html[:100]}...\n\n") diff --git a/src/fetch/PlaywrightText.py b/src/fetch/PlaywrightText.py new file mode 100644 index 0000000..c5c87e9 --- /dev/null +++ b/src/fetch/PlaywrightText.py @@ -0,0 +1,125 @@ +import asyncio +import random +import logging +from typing import Tuple, Union + +from playwright.async_api import async_playwright, Error as PlaywrightError +from .Robots import RobotsFetcher + + +class PlaywrightTextFetcher: + """ + Playwright Text Fetcher + Uses Playwright to load a page until DOM content is loaded, + waits for a random delay, and then extracts the inner text of the body. + """ + def __init__( + self, + user_agent: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + max_retries=3, + wait_time=5): + logging.debug("Initializing PlaywrightTextFetcher") + self.user_agent = user_agent + + self.max_retries = max_retries + self.wait_time = wait_time + + # Domain will have to be identified for any given url to fetch, then the corresponding robots file will be checked + self.robotsfetcher = RobotsFetcher(user_agent=user_agent) + self._robots_bydomain = self.robotsfetcher.get_results() + + logging.info("Launching new Chromium browser instance...") + self.setup_playwright = False + + async def setup_playwright_browser(self): + self._playwright = await async_playwright().start() + self._browser = await self._playwright.chromium.launch(headless=True) + self.setup_playwright = True + + async def close(self): + """Properly shuts down the browser and playwright.""" + if self._browser: + await self._browser.close() + self._browser = None + if self._playwright: + await self._playwright.stop() + self._playwright = None + logging.info("Playwright browser closed.") + + async def fetch(self, url: str) -> Union[Tuple[str, bool], dict]: + """ + Asynchronous fetch using Playwright. + Returns a tuple (text_content, schema_indicator) or an empty dict on failure. + """ + logging.info(f"Trying to fetch the next URL with Playwright: {url}") + if not self.setup_playwright: + await self.setup_playwright_browser() + + return await self._fetch_with_retries(url) + + async def _fetch_with_retries(self, url: str, retries: int = 0): + # Create a new context (incognito-like) for every request for isolation + context = await self._browser.new_context(user_agent=self.user_agent) + page = await context.new_page() + + try: + logging.debug(f"Navigating to {url}...") + # Use a timeout to prevent a single slow page from hanging the whole worker + await page.goto(url, wait_until="domcontentloaded", timeout=30000) + + await asyncio.sleep(self.wait_time) + + text_content = await self._extract_clean_text(page) + return text_content + + except (PlaywrightError, Exception) as e: + logging.error(f"Playwright request failed for {url}. Error: {e}") + + if retries < self.max_retries: + wait_time = random.uniform(1, 5) + logging.info(f"Retrying in {wait_time:.2f} seconds...") + await asyncio.sleep(wait_time) + # We don't need to pass context here, the next retry will create its own + return await self._fetch_with_retries(url, retries + 1) + + return "" + finally: + # ALWAYS close the context and page to free up memory, + # even if the request fails or succeeds. + await page.close() + await context.close() + + async def _extract_clean_text(self, page) -> str: + # ... (rest of your existing _extract_clean_text code remains the same) + noise_selectors = ["nav", "footer", "header", "aside"] + for selector in noise_selectors: + try: + await page.evaluate(f'document.querySelectorAll("{selector}").forEach(el => el.remove())') + except Exception: + pass + + text = await page.inner_text("body") + lines = [line.strip() for line in text.splitlines() if line.strip()] + return "\n".join(lines) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + async def main(): + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + fetcher = PlaywrightTextFetcher(user_agent=user_agent) + + urls = [ + "https://example.com", + "https://books.toscrape.com" + ] + + for url in urls: + await fetcher.fetch(url) + + for url, content in fetcher.get_results().items(): + print(f"\nURL: {url}") + print(f"...{content[:100]}...\n\n") + + asyncio.run(main()) \ No newline at end of file diff --git a/src/fetch/Robots.py b/src/fetch/Robots.py index 24d18c5..bebb3af 100644 --- a/src/fetch/Robots.py +++ b/src/fetch/Robots.py @@ -3,7 +3,7 @@ from urllib.robotparser import RobotFileParser from usp.tree import sitemap_tree_for_homepage -from util import setup +from src.util import setup from .base import IFetcher CONFIG = setup("config/config.yaml") diff --git a/src/fetch/__init__.py b/src/fetch/__init__.py index 06bfbfa..c3656b7 100644 --- a/src/fetch/__init__.py +++ b/src/fetch/__init__.py @@ -1,3 +1,3 @@ -from fetch.base import IFetcher, NoFetcher -from fetch.Robots import RobotsFetcher -from fetch.HTML import HTMLFetcher \ No newline at end of file +from .base import IFetcher, NoFetcher +from .Robots import RobotsFetcher +from .PlaywrightText import PlaywrightTextFetcher diff --git a/src/fetch/base.py b/src/fetch/base.py index 8202671..fa68037 100644 --- a/src/fetch/base.py +++ b/src/fetch/base.py @@ -62,4 +62,4 @@ def get_results(self) -> Dict[str, str]: for url, html in fetcher.get_results().items(): print(f"\nURL: {url}") - print(f"...{html[:100]}...\n\n") + print(f"...{html[:100]}...\n\n") \ No newline at end of file diff --git a/src/main.py b/src/main.py index e7a9b4f..b0c77dc 100644 --- a/src/main.py +++ b/src/main.py @@ -1,48 +1,237 @@ -import os -from omegaconf import OmegaConf -from util import setup import logging -from datetime import datetime +import multiprocessing +import os +import re +import sys import time -from scrape import build_webfocusedscraper +import numpy as np +import pandas as pd + +from datetime import datetime +from scrapy.crawler import CrawlerProcess +from src.scrape import HesitantSpider +from src.util import setup, normalize_url CONFIG = setup("config/config.yaml") -def main(): - """ - Crawl given urls to fetch relevant content from HTML - """ +# Check if string is valid and contains no strange characters +def is_valid_string(s): + if not isinstance(s, str): + return False + if len(s) == 0: + return False + strange_chars = re.findall(r'[\x00-\x08\x0B\x0E-\x1F\x7F]', str(s)) + return not (len(strange_chars) / len(s)) > 0.1 + + +# Concatenates all .parquet files in a dir (and its subdirs) +def read_parquet_dir(parquet_dir): + for root, dirs, files in os.walk(parquet_dir): + for file in files: + if file.endswith('.parquet'): + file_path = os.path.join(root, file) + df = pd.read_parquet(file_path) + yield df[df['content'].apply(is_valid_string)] + + +# Spawn spider crawler process +def spawn_spider_process(urls, netloc_keywords, path_keywords, skip_domains, process_id, log_level, logfile, output_file, schema_keywords): + print(f"Args: urls: {urls}, netloc keywords: {netloc_keywords}, path keywords: {path_keywords}, skip domains: {skip_domains}, log level: {log_level}, log file: {logfile}, output file: {output_file}, process_id: {process_id}") + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + # Create scrapy CrawlerProcess + process = CrawlerProcess( + settings={ + "ROBOTSTXT_OBEY": True, + "DOWNLOADER_MIDDLEWARES": { + "src.scrape.ScrapyCrawlMiddleware.TextTypeFilterMiddleware": 543 # High priority + }, + "LOG_FILE": logfile, + "LOG_LEVEL": log_level, + "DOWNLOAD_CONTENT_TYPES": ["text/html", "application/xhtml+xml"], + "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor", + } + ) + + # Configure logging + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + root_logger.handlers = [] + + # Define a filter to inject process_id into every LogRecord + class ProcessIdFilter(logging.Filter): + def filter(self, record): + record.process_id = process_id + return True + + fileHandler = logging.FileHandler(logfile) + fileHandler.setLevel(log_level) + + # Add the filter to the handler + fileHandler.addFilter(ProcessIdFilter()) + + + # This format mimics Scrapy's default look + formatter = logging.Formatter('%(asctime)s %(levelname)s: %(name)s: worker_id: %(process_id)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + fileHandler.setFormatter(formatter) + root_logger.addHandler(fileHandler) + + # Explicitly set levels for Scrapy and other noisy loggers + logging.getLogger('scrapy').setLevel(log_level) + logging.getLogger('twisted').setLevel(log_level) + root_logger.setLevel(log_level) + + # Remove console output + for logger_name in ['scrapy', 'twisted', 'sqlalchemy.engine']: + logger = logging.getLogger(logger_name) + logger.setLevel(log_level) + # Remove any handlers that might be printing to console + for handler in logger.handlers[:]: + logger.removeHandler(handler) + # Prevent logs from propagating up to the root logger's console handlers + logger.propagate = True + # Silence the twisted engine too + logging.getLogger('twisted').handlers = [] - user_agent = CONFIG.requests.useragent - scraper = build_webfocusedscraper(user_agent=user_agent) - scraper.scrape() + # Create crawler from process + spiderCrawler = process.create_crawler(HesitantSpider) + + # Crawl and configure spider + process.crawl( + spiderCrawler, + start_urls=urls, + max_depth=2, + target_netloc_keywords=netloc_keywords, + target_path_keywords=path_keywords, + skip_domains=skip_domains, + output_file=output_file, + allowed_top_level_domains=[".com", ".nl", ".ai", ".de", ".be", ".fr", ".eu", ".io", ".org"], + skip_paths=[ + "shop", "cart", "clients", "testimonials", "search", + "query", "calendar", "events", "archive", "news", + "blog", "media", "articles", "profile", "legal", + "tos", "products", "winkel", "winkelwagen", "archief", + "nieuws", "artikelen", "artikel", "producten", "faq", "policies", + "downloads", "portfolio" + ], + allowed_languages=["nl", "en", "en-uk", "en-gb", "nl-nl", "en-nl", "nl-en"], + allowed_countries=["nl"], + schema_keywords=schema_keywords, + timeout=3600 * 48 # 2 days + ) + + # If worker gets 0 urls, pass (shouldn't happen) + if len(urls) == 0: + return [] + + try: + print(f"Starting crawling process (PID: {process_id}, OSPID: {os.getpid()}) for {urls}!") + process.start() + except Exception as e: + print(f"Something went from starting process! Error {e}") + + if spiderCrawler.spider is not None: + print(f"Returning results of length for PID {process_id} ({len(urls)} URLs: {urls}): {len(spiderCrawler.spider.results)} ({len(spiderCrawler.spider.visited)} visited)") + return spiderCrawler.spider.results if __name__ == "__main__": - LOG_FILE = f"{CONFIG.output.output_dir}/{CONFIG.output.logs}" - if not os.path.exists(LOG_FILE): - os.makedirs(LOG_FILE) - LOG_FILE = f"{LOG_FILE}/{datetime.fromtimestamp(time.time()).strftime('%Y%m%d_%H%M%S')}_offset{CONFIG.input.url_offset}_testing-refactor.log" - logFormatter = logging.Formatter("%(levelname)s %(asctime)s %(processName)s %(message)s") - fileHandler = logging.FileHandler("{0}".format(LOG_FILE)) - fileHandler.setFormatter(logFormatter) - rootLogger = logging.getLogger() - rootLogger.addHandler(fileHandler) - rootLogger.setLevel(logging.INFO) + # Set logging level and create file + # All workers write to same log + logging_level = logging.INFO + + dir_log = f"{CONFIG.output.output_dir}/{CONFIG.output.logs}" + if not os.path.exists(dir_log): + os.makedirs(dir_log) + logfile = f"{dir_log}/log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log" + logging.basicConfig( + filename=logfile, + level=logging_level, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + logging.info("Log file created.") + + # Input URLs + file_urls = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.urls}" + logging.info(f"Reading list of base-urls from file: {file_urls}") + with open(file_urls, 'r', encoding='utf-8') as file_in: + urls = [line.rstrip() for line in file_in] + + # Normalize URLs + urls = [*map(normalize_url, urls)] + + # Keywords + file_keywords = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.netloc_keywords}" + logging.info(f"Reading list of keywords from file: {file_keywords}") + with open(file_keywords, 'r', encoding='utf-8') as file_in: + target_netloc_keywords = [line.rstrip() for line in file_in] + + file_keywords = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.path_keywords}" + logging.info(f"Reading list of keywords from file: {file_keywords}") + with open(file_keywords, 'r', encoding='utf-8') as file_in: + target_path_keywords = [line.rstrip() for line in file_in] + + # Skip domains + file_skip_domains = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.skip_domains}" + logging.info(f"Reading list of skip_domains from file: {file_skip_domains}") + with open(file_skip_domains, 'r', encoding='utf-8') as file_in: + skip_domains = [line.rstrip() for line in file_in] + + # Set amount of parallel workers and prepare chunk-wisem parallel execution + max_workers = 16 + num_workers = min([len(urls), max_workers]) + logging.info(f"Will use {num_workers} workers!") + batch_size = len(urls) // num_workers if len(urls) > num_workers else 1 + url_chunks = np.array_split(urls, num_workers) + + chunked_args = [] + + # Make output dir for specific run + time_part = datetime.now().strftime("%Y%m%d_%H%M%S") + if not os.path.exists(f"{CONFIG.output.output_dir}/{time_part}"): + os.makedirs(f"{CONFIG.output.output_dir}/{time_part}") + + for i in range(0, num_workers): + chunked_args.append( + ( + url_chunks[i], + target_netloc_keywords, + target_path_keywords, + skip_domains, + i, + logging_level, + logfile, + f"{CONFIG.output.output_dir}/{time_part}/worker_{i}.parquet", # Different output files per worker + [CONFIG.crawl.schema.keyword] + ) + ) + + print("# Workers:", num_workers) + + start_time = time.perf_counter() + + with multiprocessing.Pool(processes=num_workers) as pool: + pool.starmap(spawn_spider_process, chunked_args) - logging.info("Config:") - logging.info(OmegaConf.to_yaml(CONFIG)) + end_time = time.perf_counter() - main() + # Results in tables + dir_parquets = f"{CONFIG.output.output_dir}/{time_part}/" + parquet_dfs = read_parquet_dir(dir_parquets) - logging.info("Exiting with no error") + # Analysis + dfs = [] + for df in parquet_dfs: + dfs.append(df) - # # Read the output files by using the following syntax: - # CONFIG = setup("../config/config.yaml") - # df = pd.read_parquet(f"{CONFIG.output.output_dir}/20260304_080625", engine="pyarrow") - # print(df.head()) + if len(dfs) > 0: + results = pd.concat(dfs, ignore_index=True) + print("#Results:", len(results)) + print("Runtime: ", end_time - start_time) diff --git a/src/parse/HTML.py b/src/parse/HTML.py index a352f54..3237415 100644 --- a/src/parse/HTML.py +++ b/src/parse/HTML.py @@ -44,7 +44,7 @@ def parse(self, html: str) -> str: for tag in soup(self._disregard): tag.decompose() text = soup.get_text(separator="\n", strip=True) - logging.debug(f"First 100 characters of text extracted: {text[0:100]}") + #logging.debug(f"First 100 characters of text extracted: {text[0:100]}") return text except Exception as e: # Handle exceptions diff --git a/src/parse/Schema.py b/src/parse/Schema.py index e69de29..23394e2 100644 --- a/src/parse/Schema.py +++ b/src/parse/Schema.py @@ -0,0 +1,61 @@ +import logging +import json +from abc import ABC, abstractmethod +from typing import List + +from scrapy.http import Response + + +class ISchemaParser(ABC): + """ + Interface class for Schema parser + """ + + @abstractmethod + def parse(self, titles: List[str]) -> str: + raise NotImplementedError("Do not call abstract base class.") + + +class SchemaParser(ISchemaParser): + """ + Parser for detecting specified schema entities within response + """ + def __init__(self, schema_keywords: List[str]): + self.schema_keywords = schema_keywords + logging.info(f"Initializing SchemaParser to detect entities of any of types: {self.schema_keywords}") + + def parse(self, response: Response) -> List[str]: + """ + returns the types that were found of the allowed type + """ + results = [] + jsonlds = response.xpath("//script[@type='application/ld+json']/text()").getall() + if jsonlds: + for jsonld in jsonlds: + try: + data = json.loads(jsonld) + if "@type" in data.keys() and data["@type"] in self.schema_keywords: + logging.debug(f"Found schema entity {data["@type"]} that is within schema keywords: {self.schema_keywords}") + results.append(data["@type"]) + except json.JSONDecodeError: + pass + return results + + +if __name__ == "__main__": + from scrapy.http import TextResponse + + html = """ + + + + + + """ + response = TextResponse(url='http://example.com', body=html.encode('utf-8')) + + parser = SchemaParser(schema_keywords=['Article']) + for found_type in parser.parse(response=response): + print(found_type) diff --git a/src/parse/__init__.py b/src/parse/__init__.py index f6b4b97..8fae1c5 100644 --- a/src/parse/__init__.py +++ b/src/parse/__init__.py @@ -1 +1,2 @@ -from parse.HTML import IHTMLParser, HTMLBodyParser, EmptystringParser \ No newline at end of file +from .HTML import IHTMLParser, HTMLBodyParser, EmptystringParser +from .Schema import SchemaParser \ No newline at end of file diff --git a/src/scrape/HesitantSpider.py b/src/scrape/HesitantSpider.py new file mode 100644 index 0000000..c54f24c --- /dev/null +++ b/src/scrape/HesitantSpider.py @@ -0,0 +1,503 @@ +import json +import re +import scrapy +import time +import validators +import logging +from datetime import datetime + +import pandas as pd + +from scrapy.exceptions import CloseSpider +from typing import List +from urllib.parse import urljoin, urlparse + +from src.parse import HTMLBodyParser, SchemaParser +from src.fetch import PlaywrightTextFetcher +from src.scrape.ScrapyResult import ScrapyResult +from src.util import normalize_url + + +class HesitantSpider(scrapy.Spider): + name = "hesitant-spider" + + # Define custom settings as a class attribute + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "AUTOTHROTTLE_ENABLED": True, # Auto throttle to maximize speed without risking blocks + "AUTOTHROTTLE_START_DELAY": 5.0, # Start slow to "warm up" + "AUTOTHROTTLE_MAX_DELAY": 10.0, # Never wait more than 10s + "AUTOTHROTTLE_TARGET_CONCURRENCY": 1.0, # Aim for 1 request per worker at a time + "CONCURRENT_REQUESTS": 4,# Allow more concurrent requests within the single process + "DOWNLOAD_DELAY": 0, # Let Autothrottle handle the delay + "DOWNLOAD_TIMEOUT": 5, # CRITICAL: Fail fast (5s) if the site is dead + "RETRY_TIMES": 1, + } + + def __init__( + self, + start_urls: List[str], # List of starting (base) urls + target_netloc_keywords: List[str] = [], # List of keywords to determine targeting of URL netlocs + target_path_keywords: List[str] = [], # list of keywords to determine targeting of URL paths + max_depth: int = 2, # Maximum crawling depth with hesitancy + skip_domains: List[str] = [], # List of domains to skip + skip_paths: List[str] = [], # List of in-website paths to skip + allowed_top_level_domains: List[str] = [".com"], # List of allowed top level domains + batch_size: int = 100, # Output batch size + output_file: str = "output.parquet", # Output file name + max_jumps: int = 1, # Maximum site-to-site jumps + timeout: int = 3600, # max time in seconds + allowed_languages: List[str] = ["en", "en-us", "en-gb", "en-uk"], # Allowed languages within url paths + allowed_countries: List[str] = ["en", "us", "gb", "eu"], # Allowed countries within url paths + schema_keywords: List[str] = [], # Schema.org keywords to look for + sitemaps_tocheck: List[str] = ['sitemap.xml'], # path extensions that often lead to sitemaps to check for URL's + *args, **kwargs + ): + super(HesitantSpider, self).__init__(*args, **kwargs) + + # Set and log attributes + self.start_urls = start_urls + self.logger.debug(f"Init start_urls: {self.start_urls}") + self.max_depth = max_depth + self.logger.debug(f"Init max depth: {self.max_depth}") + self.skip_domains = skip_domains + self.logger.debug(f"Init skip domains: {self.skip_domains}") + self.skip_paths = skip_paths + self.logger.debug(f"Init skip domains: {self.skip_paths}") + self.allowed_top_level_domains = allowed_top_level_domains + self.logger.debug(f"Init allowed_top_level_domains: {self.allowed_top_level_domains}") + self.target_netloc_keywords = target_netloc_keywords + self.logger.debug(f"Init target netloc keywords: {self.target_netloc_keywords}") + self.target_path_keywords = target_path_keywords + self.logger.debug(f"Init target paths keywords: {self.target_path_keywords}") + self.batch_size = batch_size + self.logger.debug(f"Init batch_size: {self.batch_size}") + self.allowed_languages = allowed_languages + self.logger.debug(f"Init allowed languages: {self.allowed_languages}") + self.allowed_countries = allowed_countries + self.logger.debug(f"Init allowed countries: {self.allowed_countries}") + self.max_jumps = max_jumps + self.logger.debug(f"Init max_jumps: {self.max_jumps}") + self.output_file = output_file + self.logger.debug(f"Init output file: {self.output_file}") + self.sitemaps_tocheck = sitemaps_tocheck + self.logger.debug(f"Check urls found on (potential) sitemaps: {self.sitemaps_tocheck}") + + # Start batch counter + self.batch_counter = 0 + + # Set timeout + self.timeout = timeout + + # Set parser and unsupported endpoints + self._htmlparser = HTMLBodyParser() + self._fetcher = PlaywrightTextFetcher() + self._unsupported = ( + ".ics", ".mng", ".pct", ".bmp", ".gif", ".jpg", ".jpeg", ".png", ".pst", ".psp", ".tif", ".tiff", ".drw", ".dxf", ".eps", + ".woff2", ".svg", ".mp3", ".wma", ".ogg", ".wav", ".ra", ".aac", ".mid", ".aiff", ".3gp", ".asf", ".asx", ".avi", ".mp4", + ".woff", ".mpg", ".qt", ".rm", ".swf", ".wmv", ".m4a", ".css", ".pdf", ".doc", ".docx", ".exe", ".bin", ".rss", ".zip", + ".rar", ".msu", ".flv", ".dmg", ".xls", ".xlsx", ".ico", ".mng?download=true", ".pct?download=true", ".bmp?download=true", + ".gif?download=true", ".jpg?download=true", ".jpeg?download=true", ".png?download=true", ".pst?download=true", + ".psp?download=true", ".tif?download=true", ".tiff?download=true", ".ai?download=true", ".drw?download=true", + ".dxf?download=true", ".eps?download=true", ".ps?download=true", ".svg?download=true", ".mp3?download=true", + ".wma?download=true", ".ogg?download=true", ".wav?download=true", ".ra?download=true", ".aac?download=true", + ".mid?download=true", ".au?download=true", ".aiff?download=true", ".3gp?download=true", ".asf?download=true", + ".asx?download=true", ".avi?download=true", ".mov?download=true", ".mp4?download=true", ".mpg?download=true", + ".qt?download=true", ".rm?download=true", ".swf?download=true", ".wmv?download=true", ".m4a?download=true", + ".css?download=true", ".pdf?download=true", ".doc?download=true", ".exe?download=true", ".bin?download=true", + ".rss?download=true", ".zip?download=true", ".rar?download=true", ".msu?download=true", ".flv?download=true", + ".dmg?download=true") + self.logger.debug(f"URLs will be excluded if they contain any in path:{', '.join(self._unsupported)}") + + # Set schema parser + self._schemaparser = SchemaParser(schema_keywords=schema_keywords) + self.logger.debug(f"Init schemaparser with keywords: {schema_keywords}") + + # Init batch, results, visited + self.batch = [] + self.results = [] + self.visited = set() + self.sitemaps_crawled = set() + + if max_depth < 0: + self.logger.debug("Only urls from starting_url can be found, max_depth < 0") + + # Asynchronous function that starts the crawl + async def start(self): + self.start_time = time.time() + # For each start url, start crawling + for start_url in self.start_urls: + yield scrapy.Request( + url=start_url, + callback=self.parse, + errback=self.handle_error, + meta={ + "base_url": start_url, + "current_start": start_url, + "steps_from_target": 0, + "depth": 0, + "jumps": 0 + } + ) + # next, if desired, check the sitemapurls to augment existing results + parsed_url = urlparse(start_url) + self.sitemaps_crawled.add(parsed_url.netloc) + for sitemap in self.sitemaps_tocheck: + url = f"{parsed_url.scheme}://{parsed_url.netloc}/{sitemap}" + yield scrapy.Request( + url=url, + callback=self.parse_sitemap, + errback=self.handle_error, + meta={ + "base_url": start_url, + "current_start": start_url, + "steps_from_target": 0, + "depth": 0, + "jumps": 0 + } + ) + + # Save current batch to disk + def save_batch(self): + if len(self.batch) == 0: + self.logger.debug("Tried to save batch without any results..") + return + + df = pd.DataFrame({ + "base_url": [res.base_url for res in self.batch], + "url": [res.url for res in self.batch], + "timestamp": [res.timestamp for res in self.batch], + "first_keyword_hit": [res.first_keyword_hit for res in self.batch], + "content": [res.content for res in self.batch], + "crawl_depth": [res.crawl_depth for res in self.batch], + "schema_indicator": [res.schema_indicator for res in self.batch], + }) + + df.to_parquet( + self.output_file.replace(".parquet", f"_{self.batch_counter}.parquet") + ) + + self.batch_counter += 1 + + # Add batch to total results + self.results += self.batch + + # Empty batch + self.batch = [] + self.logger.debug(f"Saved batch to parquet, total results: {len(self.results)}") + + # Determine whether or not URL is a target + def url_is_target(self, url: str) -> bool: + parsed_url = urlparse(url) + # Check netloc + url_netloc = parsed_url.netloc + for keyword in self.target_netloc_keywords: + first_keyword_hit = re.search(keyword, url_netloc) + if first_keyword_hit is not None: + self.logger.debug(f"For {url} keyword hit: {first_keyword_hit.group(0)}") + return True, keyword + + # Check path + url_path = parsed_url.path + for keyword in self.target_path_keywords: + first_keyword_hit = re.search(keyword, url_path) + if first_keyword_hit is not None: + self.logger.debug(f"For {url} keyword hit: {first_keyword_hit.group(0)}") + return True, keyword + + return False, None + + # Determine whether or not to skip URL + def skip_this_url(self, url: str) -> bool: + """Function to see if we skip url""" + + if url in self.visited: + return True + + # Only visit valid urls + if not validators.url(url): + return True + + # Only visit pages with supported extensions + if any(ext in url for ext in self._unsupported): + self.logger.debug(f"Skip {url}, because extension is unsupported") + return True + + # Only visit pages on allowed top-level domains + parsed_url = urlparse(url) + url_netloc = parsed_url.netloc.lower() + + if not any([url_netloc.endswith(toplevel_domain) for toplevel_domain in self.allowed_top_level_domains]): + self.logger.debug(f"Skip {url} with netloc {url_netloc}, because top-level domain is not in allowed list") + return True + + # prevent duplicate crawl from trailing forward slash in URL + url = url.rstrip('/') if url.endswith('/') else url + + # prevent duplicate crawl from '#' such as '#content', '#main', etc. + url = url.rstrip("#") if "#" in url else url + + # Skip domains on skip-list + if any([skip_domain in url for skip_domain in self.skip_domains]): + self.logger.debug(f"Skip {url}, because domain is in skip-list") + return True # skip + + # Skip if first path is a country code but not within allowed + paths = urlparse(url).path.split("/") + if len(paths) >= 2: + if len(paths[1]) == 2 and paths[1] not in self.allowed_countries: + self.logger.debug(f"Skip {url} because path /{paths[1]}/ indicates country-page not in allowed countries: {self.allowed_countries}") + return True + + # skip pre-defined paths + for skip_path in self.skip_paths: + if any([path == skip_path for path in paths]): + self.logger.debug(f"Skip {url} because path {urlparse(url).path} contains skip-path: {skip_path}") + return True + + # Skip pages in unsupported languages + query_params = parsed_url.query.split("&") + if len(self.allowed_languages) > 0: + for query_param in query_params: + if "lang=" in query_param: + lang = query_param.split("lang=")[1] + if lang not in self.allowed_languages: + self.logger.debug(f"Skip {url} due to language parameter 'lang={lang}' not in allowed list: {self.allowed_languages}") + return True + elif "language=" in query_param: + language = query_param.split("language=")[1] + if language not in self.allowed_languages: + self.logger.debug(f"Skip {url} due to language parameter 'language={language}' not in allowed list: {self.allowed_languages}") + return True + + return False + + # Process request response + async def parse(self, response): + # Check if we passed timeout + if time.time() - self.start_time > self.timeout: + print(f"Hit timeout {self.timeout} seconds for spider with start urls: {self.start_urls}!") + self.logger.debug(f"Hit timeout {self.timeout} seconds for spider with start urls: {self.start_urls}!") + raise CloseSpider('bandwidth_exceeded') + current_depth = response.meta.get("depth", 0) + steps_from_target = response.meta.get("steps_from_target", 0) + + # Check if url is target + url_is_targeted, first_keyword_hit = self.url_is_target(response.url) + + # If url is not target and exceeds hesitancy depth, return + if not url_is_targeted and steps_from_target >= self.max_depth: + return + + # Determine whether we need to add a jump + jumps = response.meta.get("jumps", 0) + + parsed_url = urlparse(response.url) + current_netloc = parsed_url.netloc.lower().rsplit(".", 1)[0] + meta_netloc = urlparse(response.meta.get("current_start")).netloc.lower().rsplit(".", 1)[0] + if current_netloc != meta_netloc and response.meta.get("redirect_urls") is None: + self.logger.debug(f"Adding jump from {jumps} to {jumps + 1} going with base url: {meta_netloc} to {current_netloc}") + jumps += 1 + + # If we exceed jumps, return + if jumps > self.max_jumps: + self.logger.debug(f"Ending crawl path due to exceeding jumps ({jumps}/{self.max_jumps}) for {response.url}, base url: {response.meta.get("base_url")}") + return + + # Process response if above skip-conditions not met + self.logger.debug(f"Parsing url: {response.url}, targeted: {url_is_targeted}, depth: {current_depth}, steps from target: {steps_from_target}, jumps: {jumps}") + self.visited.add(response.url) + + # Add sitemap discovery + if parsed_url.netloc.lower() not in self.sitemaps_crawled: + self.sitemaps_crawled.add(parsed_url.netloc.lower()) + self.logger.debug(f"New domain detected: {parsed_url.netloc.lower()}. Checking for sitemaps...") + + for sitemap_path in self.sitemaps_tocheck: + # Construct the sitemap URL + sitemap_url = urljoin(f"{parsed_url.scheme}://{parsed_url.netloc}/", sitemap_path) + + # YIELD the request so Scrapy handles it + yield scrapy.Request( + url=sitemap_url, + callback=self.parse_sitemap, + errback=self.handle_error, + meta={ + "base_url": response.meta.get("base_url"), + "current_start": f"{parsed_url.scheme}://{parsed_url.netloc}", + "depth": current_depth, + "steps_from_target": steps_from_target, + "jumps": jumps + } + ) + + # Extract and follow links + for link in response.css("a::attr(href)").getall(): + url = urljoin(response.url, link) + + # Only continue with valid crawl paths + if self.skip_this_url(url): + continue + + yield scrapy.Request( + url=url, + callback=self.parse, + errback=self.handle_error, + meta={ + "base_url": response.meta.get("base_url"), + "current_start": f"{parsed_url.scheme}://{parsed_url.netloc}", + "depth": current_depth + 1, + "steps_from_target": steps_from_target + 1, + "jumps": jumps + }, + dont_filter=False # Skip duplicates + ) + + # Process the current page + if url_is_targeted: + self.logger.debug(f"Found targeted url: {response.url} from base url {response.meta.get("base_url")}") + # Determine schema.org indicator + schema_indicator = True if self._schemaparser.parse(response=response) else False + + # Add result to batch + result = ScrapyResult( + base_url=str(response.meta.get("base_url")), + url=response.url, + status=response.status, + first_keyword_hit=first_keyword_hit, + content=await self._fetcher.fetch(response.url), + crawl_depth=current_depth, + schema_indicator=schema_indicator, + timestamp=datetime.now().strftime("%Y-%m-%d-%H:%M:%S") + ) + + self.batch.append(result) + + # Save batch if exceeding batch size + if len(self.batch) >= self.batch_size: + self.save_batch() + + # Reset current depth because we found target at current page + steps_from_target = 0 + + def parse_sitemap(self, response): + # Extract all URLs from the sitemap, accounting for namespace + ns = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} + + # Look for both (standard) and (index) + urls = response.xpath('//ns:url/ns:loc/text() | //ns:sitemap/ns:loc/text()', namespaces=ns).getall() + + for url in urls: + url = normalize_url(url) + # Only continue with valid crawl paths + if self.skip_this_url(url): + continue + parsed_url = urlparse(url) + + # Check if the discovered URL is itself a sitemap (to allow recursive discovery) + # If it ends in .xml, we should probably call parse_sitemap again + if url.endswith('.xml'): + yield scrapy.Request( + url=url, + callback=self.parse_sitemap, + errback=self.handle_error, + meta={ + "base_url": response.meta.get("base_url"), + "current_start": f"{parsed_url.scheme}://{parsed_url.netloc}", + "depth": response.meta.get("depth", 0) + 1 + } + ) + else: + # Otherwise, it's a regular page + yield scrapy.Request( + url=url, + callback=self.parse, + errback=self.handle_error, + meta={ + "base_url": response.meta.get("base_url"), + "current_start": f"{parsed_url.scheme}://{parsed_url.netloc}", + "depth": response.meta.get("depth", 0) + 1 + } + ) + + def handle_error(self, failure): + # TODO pass some specific errors to info? + self.logger.debug(f"Error encountered: {failure}") + + # Called when the spider closes cleanly + async def closed(self, reason): + self.save_batch() + await self._fetcher.close() + self.logger.info(f"Spider closed because of: {reason}. Total collected pages: {len(self.results)}") + print(f"Spider closed because of: {reason}. Total collected pages: {len(self.results)}") + + +if __name__ == "__main__": + import os + from datetime import datetime + from scrapy.crawler import CrawlerProcess + from src.util import setup + + CONFIG = setup("config/config.yaml") + + logging_level = logging.DEBUG + + dir_log = f"{CONFIG.output.output_dir}/{CONFIG.output.logs}" + if not os.path.exists(dir_log): + os.makedirs(dir_log) + logfile = f"{dir_log}/log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log" + + # Create scrapy CrawlerProcess + process = CrawlerProcess( + settings={ + "ROBOTSTXT_OBEY": True, + "LOG_FILE": logfile, + "DOWNLOADER_MIDDLEWARES": { + "src.scrape.ScrapyCrawlMiddleware.TextTypeFilterMiddleware": 543 # High priority + }, + "DOWNLOAD_CONTENT_TYPES": ["text/html", "application/xhtml+xml"] # TODO can be removed? + } + ) + + # Create crawler from process + spiderCrawler = process.create_crawler(HesitantSpider) + + # Crawl and configure spider + urls = ['https://books.toscrape.com/'] + target_keywords = ["philosophy"] + sitemaps_tocheck = ["sitemap.xml"] + allowed_top_level_domains = [".com", ".nl"] + max_depth = 1 + + # urls = ['https://werkenbijhetcbs.nl/'] + # target_keywords = ["enqueteur"] + # sitemaps_tocheck = ["sitemap.xml"] + # allowed_top_level_domains = [".com", ".nl"] + # max_depth = 2 + + # Skip domains + file_skip_domains = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.skip_domains}" + logging.info(f"Reading list of skip_domains from file: {file_skip_domains}") + with open(file_skip_domains, 'r', encoding='utf-8') as file_in: + skip_domains = [line.rstrip() for line in file_in] + + # output + time_part = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = f"{CONFIG.output.output_dir}/{time_part}_output.parquet" + + process.crawl( + spiderCrawler, + start_urls=urls, + target_netloc_keywords=target_keywords, + target_path_keywords=target_keywords, + max_depth=max_depth, + skip_domains=skip_domains, + allowed_top_level_domains=allowed_top_level_domains, + output_file=output_file, + sitemaps_tocheck=sitemaps_tocheck + ) + + try: + process.start() + except Exception as e: + print(f"Something went from starting process! Error {e}") diff --git a/src/scrape/ScrapyCrawlMiddleware.py b/src/scrape/ScrapyCrawlMiddleware.py new file mode 100644 index 0000000..7ce42ea --- /dev/null +++ b/src/scrape/ScrapyCrawlMiddleware.py @@ -0,0 +1,27 @@ +import logging +from scrapy.exceptions import IgnoreRequest + +exceptions = [ + ".txt", + ".xml", + ".rss" +] + + +class TextTypeFilterMiddleware: + """ + Drops any response that isn't HTML or XHTML. + """ + def process_response(self, request, response, spider): + if any([response.url.endswith(exception) for exception in exceptions]): + logging.debug(f"Making exception bypass for url: {response.url}") + return response + content_type = response.headers.get('Content-Type', b'').decode('utf-8').lower() + + # Only allow HTML-based content + if 'text/html' not in content_type and 'application/xhtml+xml' not in content_type and 'application/xml' not in content_type: + logging.info(f"\t\tTextTypeFilterMiddleware: Skipping non-text content: {response.url} ({content_type})") + # Returning None tells Scrapy to drop this response entirely + raise IgnoreRequest("Not Text type response, ignore request") + + return response diff --git a/src/scrape/ScrapyResult.py b/src/scrape/ScrapyResult.py new file mode 100644 index 0000000..8fb2cb1 --- /dev/null +++ b/src/scrape/ScrapyResult.py @@ -0,0 +1,21 @@ +from typing import NamedTuple + + +class ScrapyResult(NamedTuple): + base_url: str + url: str + first_keyword_hit: str + status: str + content: str + crawl_depth: int = 0 + schema_indicator: bool = False + timestamp: str = 0 + + def __eq__(self, other): + if not isinstance(other, ScrapyResult): + return False + + return self.base_url == other.base_url and self.content == other.content + + def __hash__(self): + return hash((self.base_url, self.content)) diff --git a/src/scrape/__init__.py b/src/scrape/__init__.py index ded790d..a0255a5 100644 --- a/src/scrape/__init__.py +++ b/src/scrape/__init__.py @@ -1,39 +1,3 @@ -import logging -from scrape.base import IScraper, Scraper -from util import setup - -CONFIG = setup("config/config.yaml") - - -def build_webfocusedscraper(user_agent: str) -> IScraper: - """ - Build Scraper class with standard settings - """ - from crawl import HesitantCrawler - from fetch import HTMLFetcher - from parse import HTMLBodyParser - - with open(f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.keywords}", 'r', encoding='utf-8') as file_in: - target_keywords = [line.rstrip() for line in file_in] - - fetcher = HTMLFetcher(user_agent=user_agent) - crawler = HesitantCrawler( - fetcher=fetcher, - target_keywords=target_keywords, - add_sitemapurls=CONFIG.crawl.use_sitemap, - max_depth=CONFIG.crawl.max_depth) - htmlparser = HTMLBodyParser() - - return Scraper( - crawler=crawler, - fetcher=fetcher, - htmlparser=htmlparser) - - -if __name__ == "__main__": - - logging.basicConfig(level=logging.DEBUG) - - user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - scraper = build_webfocusedscraper(user_agent=user_agent) - scraper.scrape() +from .HesitantSpider import HesitantSpider +from .ScrapyCrawlMiddleware import TextTypeFilterMiddleware +from .ScrapyResult import ScrapyResult \ No newline at end of file diff --git a/src/scrape/base.py b/src/scrape/base.py deleted file mode 100644 index c313300..0000000 --- a/src/scrape/base.py +++ /dev/null @@ -1,162 +0,0 @@ -import logging -import os -from abc import ABC, abstractmethod -import pandas as pd -import numpy as np -from typing import List -from datetime import datetime -import time -import random - -from util import setup -from fetch import IFetcher -from crawl import ICrawler -from parse import IHTMLParser - -CONFIG = setup("config/config.yaml") - - -class IScraper(ABC): - """ - Interface for all Scrapers - """ - def __init__(self, crawler: ICrawler, fetcher: IFetcher, htmlparser: IHTMLParser): - - # Set crawler and fetcher and parser - self._crawler = crawler - self._fetcher = fetcher - self._htmlparser = htmlparser - - @abstractmethod - def save_batch(self, batch: List, batch_id: int): - raise NotImplementedError() - - @abstractmethod - def scrape(self): - raise NotImplementedError() - - -class Scraper(IScraper): - """ - Interface for all Scrapers - """ - def __init__(self, crawler: ICrawler, fetcher: IFetcher, htmlparser: IHTMLParser): - super(Scraper, self).__init__(crawler=crawler, fetcher=fetcher, htmlparser=htmlparser) - - # All scrapers take base-url input from file - file_urls = f"{CONFIG.input.input_dir}/{CONFIG.input.input_files.urls}" - logging.info(f"Reading list of base-urls from file: {file_urls}") - logging.info(f"Offset is set to {CONFIG.input.url_offset} and maximum number of base-urls is {CONFIG.input.url_max}") - with open(file_urls, 'r', encoding='utf-8') as file_in: - self._base_urls = [line.rstrip() for line in file_in] - self._base_urls = self._base_urls[CONFIG.input.url_offset:CONFIG.input.url_offset + CONFIG.input.url_max] - random.shuffle(self._base_urls) - logging.debug(f"Read list with {len(self._base_urls)} base-urls from file: {file_urls}") - logging.debug(f"Scraper will start with entry {CONFIG.input.url_offset + 1} in the file") - - # create output folder with current datetime and possible url offset - self._dir_out = f"{CONFIG.output.output_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}_offset{CONFIG.input.url_offset}" - logging.info(f"Creating output folder: {self._dir_out}") - os.makedirs(self._dir_out, exist_ok=True) - logging.debug("Created output folder") - # TODO instead consider a given folder name and crash-robust resuming of batch iteration - - def save_batch(self, batch: List, batch_id: int): - df = pd.DataFrame(batch) - - # add partition column - df["batch"] = batch_id - - df.to_parquet( - self._dir_out, - engine="pyarrow", - partition_cols=["batch"], - index=False, - compression="snappy" - ) - - def scrape(self): - - # saving data in batches - time_start = time.time() - buffer = [] - batch_id = 0 - - for cnt, base_url in enumerate(self._base_urls): - logging.info(f"Now starting scrape #{cnt + 1} of {len(self._base_urls)} base-urls") - - logging.info(f"Trying to crawl base url: {base_url}") - # Crawl can start as soon as start url provided - self._crawler.reset_with_starturl(start_url=base_url) - self._crawler.crawl() - - # track content by base_url to prevent duplicates - seen_content = set() - - delay = self._crawler.crawl_delay # might be different depending on curren domain - - # After crawl, collect results and parse content of targeted sites - # Some urls will already have their html fetched before during crawl, don't redo this then - for crawlresult in self._crawler.get_results(): - schema_indicator = False - html = self._crawler._visited.get(crawlresult.url, False) - if not html: - logging.debug(f"Downloading html from yet unvisited url {crawlresult.url}") - try: - html, schema_indicator = self._fetcher.fetch(crawlresult.url) - except Exception: - html = "" - # Respect crawl delay if crawler dose that - - logging.debug("Waiting for delay to pass") - time.sleep(delay) - logging.debug("Delay has passed") - - content = self._htmlparser.parse(html=html) - if content is not None and len(content) > 0: - if content in seen_content: # No dupliactes - logging.debug(f"Content from {crawlresult.url} is a duplicate, not added to output") - continue - - seen_content.add(content) - buffer.append({ - "base_url": base_url, - "url": crawlresult.url, - "first_keyword_hit": crawlresult.first_keyword_hit, - "content": content, - "schema_indicator": schema_indicator - }) - else: - logging.debug(f"After parsing no output for url {crawlresult.url}") - - if len(buffer) >= CONFIG.output.batchsize: - self.save_batch(batch=buffer, batch_id=batch_id) - logging.info(f"Saved batch number {batch_id} with {len(buffer)} records") - buffer = [] - batch_id += 1 - - # Remaining rows at the end - if buffer: - self.save_batch(batch=buffer, batch_id=batch_id) - logging.info(f"Saved final batch number {batch_id} with {len(buffer)} records") - - time_duration = (time.time() - time_start) / 60 - logging.info(f"Finished. Running scrape took {int(np.around(time_duration, 0))} minutes.") - - -if __name__ == "__main__": - from crawl import NoCrawler - from fetch import NoFetcher - from parse import EmptystringParser - - logging.basicConfig(level=logging.DEBUG) - - fetcher = NoFetcher() - crawler = NoCrawler() - htmlparser = EmptystringParser() - - scraper = Scraper( - crawler=crawler, - fetcher=fetcher, - htmlparser=htmlparser) - scraper.scrape() diff --git a/src/util/__init__.py b/src/util/__init__.py index 46cd90e..bb2a8cc 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -1 +1,2 @@ -from .setup import setup \ No newline at end of file +from .setup import setup +from .urls import normalize_url \ No newline at end of file diff --git a/src/util/setup.py b/src/util/setup.py index c51f470..a81ae93 100644 --- a/src/util/setup.py +++ b/src/util/setup.py @@ -4,8 +4,4 @@ def setup(config_path): # Read config object config = OmegaConf.load(config_path) - - # Set OS environment variables or other stuff... - # ... - return config diff --git a/src/util/urls.py b/src/util/urls.py new file mode 100644 index 0000000..226d1e7 --- /dev/null +++ b/src/util/urls.py @@ -0,0 +1,39 @@ +from urllib.parse import urlparse, urlunparse +import re + + +def normalize_url(url: str): + """ + Normalize URL to make sure crawler can handle it without issue + :param url: url to normalize + """ + + # Handle case where there is no scheme at all + if not re.match(r'^[a-zA-Z]+://', url): + url = 'https://' + url + + parsed = urlparse(url) + + # 2. Force HTTPS + scheme = 'https' + + # 3. Handle the domain (netloc) + netloc = parsed.netloc.lower() + + # Remove existing 'www.' to re-add cleanly + if netloc.startswith('www.'): + netloc = netloc[4:] + + netloc = 'www.' + netloc + + # Reconstruct URL + new_url = urlunparse(( + scheme, + netloc, + parsed.path, + parsed.params, + parsed.query, + parsed.fragment + )) + + return new_url