Refactor web crawler and image downloader; consolidate functionality into Spider.py, add requirements and setup scripts

This commit is contained in:
whaffman 2025-07-02 12:37:04 +02:00
parent 32a43816f4
commit b739343142
4 changed files with 106 additions and 252 deletions

171
Spider.py
View File

@ -1,70 +1,89 @@
import os
import argparse
import requests import requests
import re import re
from urllib.parse import urlparse from urllib.parse import urlparse, urljoin
import argparse
from PIL import Image from collections import deque
from PIL.ExifTags import TAGS import os
class Colors:
"""
A class to hold color codes for terminal output.
"""
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RED = '\033[91m'
RESET = '\033[0m'
"""
get all links from a given url
"""
def get_all_links(url):
"""
Fetch all links from the given URL.
Args:
url (str): The URL to fetch links from.
Returns:
set: A set of unique links found on the page.
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; Spider2/1.0)'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
links = set(re.findall(r'<a[^>]+href="([^"]+)"', response.text, re.IGNORECASE))
links = {urljoin(url, link) for link in links if not link.startswith(('#', 'mailto:', 'javascript:', 'tel:'))}
print(f"{Colors.GREEN}Found {len(links)} links on {url}{Colors.RESET}")
return links
except requests.RequestException as e:
print(f"{Colors.RED}Error fetching {url}: {e}{Colors.RESET}")
return set()
def same_domain(url1, url2): def same_domain(url1, url2):
""" """
Return True if url1 and url2 have the same domain name. Check if two URLs belong to the same domain.
Args: Args:
url1 (str): The first URL. url1 (str): The first URL.
url2 (str): The second URL. url2 (str): The second URL.
Returns: Returns:
bool: True if both URLs have the same domain, False otherwise. bool: True if both URLs belong to the same domain, False otherwise.
""" """
domain1 = urlparse(url1).netloc domain1 = urlparse(url1).netloc
domain2 = urlparse(url2).netloc domain2 = urlparse(url2).netloc
return domain1 == domain2 return domain1 == domain2
"""breadth first crawl using get_all_links
def crawl(url, depth, imgs = set()): """
def crawl(url, depth):
""" """
Crawl the given URL and extract links and images. Crawl the given URL and extract links up to a specified depth.
If depth is greater than 0, continue crawling links found on the page.
If depth is 0, only extract images.
Args: Args:
url (str): The URL to crawl. url (str): The URL to start crawling from.
depth (int): The depth of crawling. depth (int): The maximum depth of crawling.
imgs (set): A set to store found image URLs.
Returns: Returns:
set: A set of found image URLs. set: A set of unique links found during the crawl.
""" """
print(f"\033[92mFetching links from {url} at depth {depth}\033[0m") visited = set()
queue = deque([(url, 0)]) # (current_url, current_depth)
while queue:
current_url, current_depth = queue.popleft()
if current_depth > depth or current_url in visited:
continue
if current_depth <= depth:
visited.add(current_url)
if current_depth < depth:
links = get_all_links(current_url)
try: for link in links:
response = requests.get(url) if link not in visited and same_domain(url, link):
response.raise_for_status() queue.append((link, current_depth + 1))
links = re.findall(r'<a[^>]+href="([^"]+)"', response.text) return visited
for link in links:
if link.startswith(('#', 'mailto:', 'javascript:', 'tel:')):
continue
if not link.startswith('http'):
link = requests.compat.urljoin(url, link)
if not same_domain(url, link):
continue
crawl(link, depth - 1, imgs) if depth > 0 else None
for img in extract_image_sources(url):
if img not in imgs:
print(f"\033[94mFound image: {img}\033[0m")
imgs.add(img)
imgs.update(extract_image_sources(url))
return imgs
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return []
def extract_image_sources(url): def extract_image_sources(url):
""" """
@ -78,15 +97,24 @@ def extract_image_sources(url):
response = requests.get(url) response = requests.get(url)
response.raise_for_status() # Raise an error for bad responses response.raise_for_status() # Raise an error for bad responses
img_sources = re.findall(r'src="([^"]+(?:jpg|jpeg|gif|png|bmp))"', response.text, re.IGNORECASE) img_sources = re.findall(r'<img[^>]+src="([^"]+(?:jpg|jpeg|gif|png|bmp))(?:\?[^"]*)?"', response.text, re.IGNORECASE)
img_sources = [requests.compat.urljoin(url, src) for src in img_sources] img_sources = [requests.compat.urljoin(url, src) for src in img_sources]
return img_sources return img_sources
except requests.RequestException as e: except requests.RequestException as e:
print(f"Error fetching {url}: {e}") print(f"{Colors.RED}Error fetching {url}: {e}{Colors.RESET}")
return [] return []
def get_filename_from_url(url):
"""
Remove http:// or https:// from the URL and replace / with _ to create a valid filename.
"""
parsed_url = urlparse(url)
path = parsed_url.path[-20:] if len(parsed_url.path) > 20 else parsed_url.path
filename = parsed_url.netloc + path
filename = filename.replace('/', '_')
return filename if filename else url
def download_images(imgs, directory): def download_images(imgs, directory):
""" """
@ -105,6 +133,11 @@ def download_images(imgs, directory):
filename = get_filename_from_url(img) filename = get_filename_from_url(img)
filepath = f"{directory}/{filename}" filepath = f"{directory}/{filename}"
base, ext = os.path.splitext(filepath)
counter = 1
while os.path.exists(filepath):
filepath = f"{base}({counter}){ext}"
counter += 1
with open(filepath, 'wb') as file: with open(filepath, 'wb') as file:
file.write(response.content) file.write(response.content)
@ -115,41 +148,29 @@ def download_images(imgs, directory):
return downloaded return downloaded
def get_filename_from_url(url):
"""
Remove http:// or https:// from the URL and replace / with _ to create a valid filename.
"""
parsed_url = urlparse(url)
filename = parsed_url.netloc + parsed_url.path
filename = filename.replace('/', '_')
return filename if filename else url
def main(): def main():
parser = argparse.ArgumentParser(description="Spider Web Crawler") parser = argparse.ArgumentParser(description="Spider Web Crawler")
parser.add_argument("url", type=str, help="The URL to start crawling from") parser.add_argument("url", type=str, help="The URL to start crawling from")
parser.add_argument("-r", help="Spider the URL") parser.add_argument("-r", action="store_true", default=False, help="Spider the URL")
parser.add_argument("-l", type=int, default=5, help="Depth of crawling") parser.add_argument("-l", type=int, default=5, help="Depth of crawling")
parser.add_argument("-p", default="./data", help="Output file to save") parser.add_argument("-p", metavar="OUTPUT_PATH", default="./data", help="Output file to save")
args = parser.parse_args() args = parser.parse_args()
print(args)
imgs = crawl(args.url, args.l)
print(f"\033[92mFound {len(imgs)} images\033[0m") visited_links = crawl(args.url, args.l)
if args.p: print(f"{Colors.YELLOW}Found {len(visited_links)} unique links on same domain:{Colors.RESET}")
if not os.path.exists(args.p):
os.makedirs(args.p)
downloaded = download_images(imgs, args.p)
print(f"\033[92mDownloaded {len(downloaded)} images to {args.p}\033[0m")
# for img in downloaded:
# exif_data = get_exif_data(img)
# if exif_data:
# print(f"\033[93mExif data for {img}:\033[0m")
# for tag, value in exif_data.items():
# print(f" {tag}: {value}")
# return 0 imgs = set()
for link in visited_links:
print(f"{Colors.BLUE}Extracting images from {link}{Colors.RESET}")
imgs.update(extract_image_sources(link))
print(f"{Colors.BLUE}Found {len(imgs)} images:{Colors.RESET}")
if not os.path.exists(args.p):
os.makedirs(args.p)
downloaded = download_images(imgs, args.p)
print(f"{Colors.GREEN}Downloaded {len(downloaded)} images to {args.p}{Colors.RESET}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,177 +0,0 @@
import requests
import re
from urllib.parse import urlparse, urljoin
import argparse
from collections import deque
import os
class Colors:
"""
A class to hold color codes for terminal output.
"""
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RED = '\033[91m'
RESET = '\033[0m'
"""
get all links from a given url
"""
def get_all_links(url):
"""
Fetch all links from the given URL.
Args:
url (str): The URL to fetch links from.
Returns:
set: A set of unique links found on the page.
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; Spider2/1.0)'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
links = set(re.findall(r'<a[^>]+href="([^"]+)"', response.text, re.IGNORECASE))
links = {urljoin(url, link) for link in links if not link.startswith(('#', 'mailto:', 'javascript:', 'tel:'))}
print(f"{Colors.GREEN}Found {len(links)} links on {url}{Colors.RESET}")
return links
except requests.RequestException as e:
print(f"{Colors.RED}Error fetching {url}: {e}{Colors.RESET}")
return set()
def same_domain(url1, url2):
"""
Check if two URLs belong to the same domain.
Args:
url1 (str): The first URL.
url2 (str): The second URL.
Returns:
bool: True if both URLs belong to the same domain, False otherwise.
"""
domain1 = urlparse(url1).netloc
domain2 = urlparse(url2).netloc
return domain1 == domain2
"""breadth first crawl using get_all_links
"""
def crawl(url, depth):
"""
Crawl the given URL and extract links up to a specified depth.
Args:
url (str): The URL to start crawling from.
depth (int): The maximum depth of crawling.
Returns:
set: A set of unique links found during the crawl.
"""
visited = set()
queue = deque([(url, 0)]) # (current_url, current_depth)
while queue:
current_url, current_depth = queue.popleft()
if current_depth > depth or current_url in visited:
continue
if current_depth <= depth:
visited.add(current_url)
if current_depth < depth:
links = get_all_links(current_url)
for link in links:
if link not in visited and same_domain(url, link):
queue.append((link, current_depth + 1))
return visited
def extract_image_sources(url):
"""
Fetch image links from the given URL.
Args:
url (str): The URL to fetch images from.
Returns:
list: A list of image URLs found on the page.
"""
try:
response = requests.get(url)
response.raise_for_status() # Raise an error for bad responses
img_sources = re.findall(r'<img[^>]+src="([^"]+(?:jpg|jpeg|gif|png|bmp))(?:\?[^"]*)?"', response.text, re.IGNORECASE)
img_sources = [requests.compat.urljoin(url, src) for src in img_sources]
return img_sources
except requests.RequestException as e:
print(f"{Colors.RED}Error fetching {url}: {e}{Colors.RESET}")
return []
def get_filename_from_url(url):
"""
Remove http:// or https:// from the URL and replace / with _ to create a valid filename.
"""
parsed_url = urlparse(url)
path = parsed_url.path[-20:] if len(parsed_url.path) > 20 else parsed_url.path
filename = parsed_url.netloc + path
filename = filename.replace('/', '_')
return filename if filename else url
def download_images(imgs, directory):
"""
Download images to the specified directory.
Args:
imgs (set): A set of image URLs to download.
directory (str): The directory where images will be saved.
Returns:
set: A set of file paths of downloaded images.
"""
downloaded = set()
for img in imgs:
try:
response = requests.get(img)
response.raise_for_status()
filename = get_filename_from_url(img)
filepath = f"{directory}/{filename}"
base, ext = os.path.splitext(filepath)
counter = 1
while os.path.exists(filepath):
filepath = f"{base}({counter}){ext}"
counter += 1
with open(filepath, 'wb') as file:
file.write(response.content)
downloaded.add(filepath)
except requests.RequestException as e:
print(f"Error downloading {img}: {e}")
return downloaded
def main():
parser = argparse.ArgumentParser(description="Spider Web Crawler")
parser.add_argument("url", type=str, help="The URL to start crawling from")
parser.add_argument("-r", action="store_true", default=False, help="Spider the URL")
parser.add_argument("-l", type=int, default=5, help="Depth of crawling")
parser.add_argument("-p", metavar="OUTPUT_PATH", default="./data", help="Output file to save")
args = parser.parse_args()
visited_links = crawl(args.url, args.l)
print(f"{Colors.YELLOW}Found {len(visited_links)} unique links on same domain:{Colors.RESET}")
imgs = set()
for link in visited_links:
print(f"{Colors.BLUE}Extracting images from {link}{Colors.RESET}")
imgs.update(extract_image_sources(link))
print(f"{Colors.BLUE}Found {len(imgs)} images:{Colors.RESET}")
if not os.path.exists(args.p):
os.makedirs(args.p)
downloaded = download_images(imgs, args.p)
print(f"{Colors.GREEN}Downloaded {len(downloaded)} images to {args.p}{Colors.RESET}")
if __name__ == "__main__":
main()

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
certifi==2025.6.15
charset-normalizer==3.4.2
idna==3.10
pillow==11.3.0
requests==2.32.4
urllib3==2.5.0

4
setup.sh Normal file
View File

@ -0,0 +1,4 @@
#!/bin/bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt