"""Crawler for Gathered Here funeral director directory. Source: https://www.gatheredhere.com.au Method: XML sitemap → fetch individual profile pages → parse HTML Fields: name, address, coords, phone, email, website, description, pricing, reviews """ import re import time import json import xml.etree.ElementTree as ET from html.parser import HTMLParser from pathlib import Path from base import ( fetch_url, get_db, start_crawl_log, finish_crawl_log, store_source_record, normalize_phone, normalize_state, generate_slug, to_intermediate, CRAWL_DELAY, ) SOURCE_NAME = "gathered_here" SITEMAP_URL = "https://www.gatheredhere.com.au/sitemap/sitemap-funerals-listings-0.xml" BASE_URL = "https://www.gatheredhere.com.au" def fetch_all_listing_urls() -> list[str]: """Fetch and parse the sitemap to get all funeral director profile URLs.""" xml_text = fetch_url(SITEMAP_URL) root = ET.fromstring(xml_text) ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"} urls = [] for url_elem in root.findall("sm:url", ns): loc = url_elem.find("sm:loc", ns) if loc is not None and loc.text: url = loc.text.strip() # Only include individual profile pages (singular /funeral-director/) if "/funeral-director/" in url and "/funeral-directors/" not in url: urls.append(url) return urls def extract_next_data(html_text: str) -> dict | None: """Extract __NEXT_DATA__ JSON from a Next.js page.""" pattern = r'(.*?)' match = re.search(pattern, html_text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: return None return None def extract_from_next_data(next_data: dict) -> dict | None: """Extract listing data from __NEXT_DATA__ props.""" try: props = next_data.get("props", {}).get("pageProps", {}) # Structure: singleListing.listing contains the actual data single = props.get("singleListing", {}) if single: listing = single.get("listing") if listing and isinstance(listing, dict): return listing # Fallback paths listing = props.get("listing") or props.get("post") or props.get("data") return listing except (KeyError, TypeError): return None def extract_from_html(html_text: str, url: str) -> dict: """Extract listing data from page HTML using regex patterns as fallback.""" data = {"url": url} # Title title_match = re.search(r']*>(.*?)', html_text, re.DOTALL) if title_match: data["title"] = re.sub(r'<[^>]+>', '', title_match.group(1)).strip() # Phone phone_match = re.search(r'href="tel:([^"]+)"', html_text) if phone_match: data["phone"] = phone_match.group(1).strip() # Email email_match = re.search(r'href="mailto:([^"]+)"', html_text) if email_match: data["email"] = email_match.group(1).strip() # Website website_match = re.search( r']*class="[^"]*website[^"]*"[^>]*href="([^"]+)"', html_text ) if website_match: data["website"] = website_match.group(1).strip() # Address from structured data addr_match = re.search( r'"streetAddress"\s*:\s*"([^"]*)"', html_text ) if addr_match: data["address"] = addr_match.group(1) locality_match = re.search(r'"addressLocality"\s*:\s*"([^"]*)"', html_text) if locality_match: data["suburb"] = locality_match.group(1) region_match = re.search(r'"addressRegion"\s*:\s*"([^"]*)"', html_text) if region_match: data["state"] = region_match.group(1) postcode_match = re.search(r'"postalCode"\s*:\s*"([^"]*)"', html_text) if postcode_match: data["postcode"] = postcode_match.group(1) # Coordinates lat_match = re.search(r'"latitude"\s*:\s*"?(-?[\d.]+)"?', html_text) lng_match = re.search(r'"longitude"\s*:\s*"?(-?[\d.]+)"?', html_text) if lat_match: data["lat"] = float(lat_match.group(1)) if lng_match: data["lng"] = float(lng_match.group(1)) return data def extract_pricing(listing_data: dict) -> dict: """Extract pricing from listing meta fields.""" meta = listing_data.get("meta", {}) if not meta: return {} pricing = {} price_fields = { # With viewing prices "cremation_no_service_viewY": "cremation_no_service_with_viewing", "cremation_single_viewY": "cremation_single_service_with_viewing", "cremation_dual_viewY": "cremation_dual_service_with_viewing", "cremation_graveside_viewY": "cremation_graveside_with_viewing", "burial_single_viewY": "burial_single_service_with_viewing", "burial_dual_viewY": "burial_dual_service_with_viewing", "burial_graveside_viewY": "burial_graveside_with_viewing", "burial_no_service_viewY": "burial_no_service_with_viewing", # Without viewing prices "cremation_no_service_viewN": "cremation_no_service", "cremation_single_viewN": "cremation_single_service", "cremation_dual_viewN": "cremation_dual_service", "cremation_graveside_viewN": "cremation_graveside", "burial_single_viewN": "burial_single_service", "burial_dual_viewN": "burial_dual_service", "burial_graveside_viewN": "burial_graveside", "burial_no_service_viewN": "burial_no_service", } for meta_key, label in price_fields.items(): val = meta.get(meta_key, "") if val: # Parse price string like "$2,299" to float cleaned = re.sub(r'[^\d.]', '', str(val)) if cleaned: try: pricing[label] = float(cleaned) except ValueError: pass return pricing def pricing_to_packages(pricing: dict) -> list[dict]: """Convert flat pricing dict to package format.""" packages = [] # Map pricing keys to funeral types type_mappings = [ ("cremation_no_service", "Cremation Only"), ("cremation_single_service", "Service & Cremation"), ("cremation_single_service_with_viewing", "Service & Cremation"), ("burial_single_service", "Service & Burial"), ("burial_graveside", "Graveside Burial"), ] for price_key, funeral_type in type_mappings: if price_key in pricing: name = price_key.replace("_", " ").title() packages.append({ "name": name, "funeralType": funeral_type, "price": pricing[price_key], "inclusions": [], # Not available from Gathered Here listing pages }) return packages def to_normalized(listing_data: dict, url: str) -> dict: """Convert Gathered Here listing data to intermediate format.""" meta = listing_data.get("meta", {}) if isinstance(listing_data.get("meta"), dict) else {} name = listing_data.get("title", listing_data.get("name", "")).strip() slug = listing_data.get("slug", "") # Extract location suburb = meta.get("geolocation_city", "") state = normalize_state(meta.get("geolocation_state_short", "")) postcode = meta.get("geolocation_postcode", "") lat = meta.get("geolocation_lat") lng = meta.get("geolocation_long") try: lat = float(lat) if lat else None lng = float(lng) if lng else None except (ValueError, TypeError): lat = lng = None email = meta.get("email", "") or meta.get("_application", "") phone = meta.get("phone", "") or listing_data.get("phone", "") # Try to get description from content or excerpt description = listing_data.get("excerpt", listing_data.get("content", "")) if description: description = re.sub(r'<[^>]+>', '', description).strip() if len(description) > 500: description = description[:497] + "..." # Website website = listing_data.get("website") or meta.get("website") or None # Pricing pricing = extract_pricing(listing_data) packages = pricing_to_packages(pricing) business = { "name": name, "abn": None, "phone": normalize_phone(phone), "email": email.strip() or None, "website": website, "description": description or None, } locations = [{ "address": meta.get("geolocation_formatted_address", ""), "suburb": suburb, "state": state, "postcode": postcode, "lat": lat, "lng": lng, "phone": normalize_phone(phone), }] source_id = slug or generate_slug(name) return to_intermediate( source=SOURCE_NAME, source_id=source_id, source_url=url, business=business, locations=locations, packages=packages, ) def crawl_profile(url: str) -> dict | None: """Crawl a single Gathered Here profile page.""" try: html_text = fetch_url(url) except Exception as e: print(f" Error fetching {url}: {e}") return None # Try __NEXT_DATA__ first (structured) next_data = extract_next_data(html_text) if next_data: listing = extract_from_next_data(next_data) if listing: listing["_source"] = "next_data" return listing # Fallback to HTML parsing data = extract_from_html(html_text, url) data["_source"] = "html_fallback" return data def run(limit: int | None = None): """Run the full Gathered Here crawl. Args: limit: If set, only crawl this many profiles (for testing). """ db = get_db() log_id = start_crawl_log(db, SOURCE_NAME) print(f"[{SOURCE_NAME}] Starting crawl (log_id={log_id})") found = 0 new = 0 skipped = 0 errors = 0 try: # Step 1: Get all profile URLs from sitemap print(" Fetching sitemap...", end=" ", flush=True) urls = fetch_all_listing_urls() print(f"{len(urls)} profile URLs found") if limit: urls = urls[:limit] print(f" (limited to {limit} for testing)") # Step 2: Crawl each profile for i, url in enumerate(urls): slug = url.rstrip("/").split("/")[-1] if (i + 1) % 50 == 0 or i == 0: print(f" Crawling {i+1}/{len(urls)}: {slug}") listing_data = crawl_profile(url) found += 1 if not listing_data: errors += 1 continue source_id = slug row_id = store_source_record( db, SOURCE_NAME, source_id, url, listing_data, log_id ) if row_id: normalized = to_normalized(listing_data, url) db.execute( "UPDATE source_record SET normalized_data = ? WHERE id = ?", (json.dumps(normalized), row_id) ) new += 1 else: skipped += 1 if (i + 1) % 10 == 0: db.commit() # periodic commit time.sleep(CRAWL_DELAY) db.commit() finish_crawl_log(db, log_id, found, new, 0, skipped) print(f"[{SOURCE_NAME}] Done: {found} found, {new} new, " f"{skipped} skipped, {errors} errors") except Exception as e: finish_crawl_log(db, log_id, found, new, 0, skipped, "failed", str(e)) raise finally: db.close() if __name__ == "__main__": import sys limit = int(sys.argv[1]) if len(sys.argv) > 1 else None run(limit=limit)