"""Website enrichment module. For each provider with a website but no packages yet, crawls their site to find pricing/packages pages and extracts structured data. Two extraction modes: 1. Direct HTML parsing (for sites with clear pricing structure) 2. AI extraction via API call (for complex/varied layouts) This module handles the crawling and page discovery. AI extraction is delegated to the N8N workflow (Claude Haiku node). """ import json import re import time import urllib.parse import urllib.error from pathlib import Path from base import fetch_url, get_db, CRAWL_DELAY # Common URL patterns for pricing/packages pages PRICING_PATHS = [ "/pricing", "/prices", "/our-prices", "/packages", "/funeral-packages", "/services", "/our-services", "/funeral-costs", "/funeral-services", "/service-options", "/price-list", "/transparency", "/funeral-pricing", "/costs", "/cremation", "/cremation-packages", "/burial", "/plan-a-funeral", "/arrange", ] # Keywords that suggest a link leads to pricing PRICING_KEYWORDS = [ "pric", "cost", "packag", "service", "plan", "cremation", "burial", "funeral", "transparency", "disclosure", ] def find_pricing_page(base_url: str, homepage_html: str) -> str | None: """Try to find the pricing/packages page URL. Strategy: 1. Try common URL patterns 2. Parse homepage links for pricing-related keywords """ base = base_url.rstrip("/") # Strategy 1: Try common paths for path in PRICING_PATHS: test_url = base + path try: html = fetch_url(test_url, timeout=10) # Verify it's not a 404 soft-redirect (check for pricing content) if len(html) > 1000 and ("$" in html or "price" in html.lower()): return test_url except (urllib.error.HTTPError, urllib.error.URLError, Exception): continue time.sleep(0.3) # Strategy 2: Parse homepage links link_pattern = re.compile( r']*href="([^"]*)"[^>]*>(.*?)', re.IGNORECASE | re.DOTALL ) for match in link_pattern.finditer(homepage_html): href = match.group(1) text = re.sub(r"<[^>]+>", "", match.group(2)).lower().strip() href_lower = href.lower() # Check if link text or URL contains pricing keywords if any(kw in text or kw in href_lower for kw in PRICING_KEYWORDS): # Resolve relative URLs if href.startswith("/"): full_url = base + href elif href.startswith("http"): # Only follow links to the same domain if urllib.parse.urlparse(base).netloc in href: full_url = href else: continue else: full_url = base + "/" + href try: html = fetch_url(full_url, timeout=10) if len(html) > 500: return full_url except Exception: continue time.sleep(0.3) return None def extract_description(html: str) -> str | None: """Extract a business description from homepage HTML.""" # Try meta description first meta_match = re.search( r' 20: return desc.strip() # Try OG description og_match = re.search( r' 20: return og_match.group(1).strip() return None def extract_contact_info(html: str) -> dict: """Extract contact details from HTML.""" info = {} # Phone phone_match = re.search(r'href="tel:([^"]+)"', html) if phone_match: info["phone"] = phone_match.group(1).strip() # Email email_match = re.search(r'href="mailto:([^"?]+)"', html) if email_match: info["email"] = email_match.group(1).strip() # Address from JSON-LD addr_match = re.search(r'"streetAddress"\s*:\s*"([^"]*)"', html) if addr_match: info["address"] = addr_match.group(1) return info def check_has_pricing(html: str) -> bool: """Quick check whether a page contains pricing information.""" # Look for dollar signs near numbers price_pattern = re.compile(r'\$[\d,]+(?:\.\d{2})?') prices_found = price_pattern.findall(html) # Filter out tiny amounts (likely not funeral pricing) significant_prices = [] for p in prices_found: cleaned = p.replace("$", "").replace(",", "").strip() if not cleaned: continue try: amount = float(cleaned) except ValueError: continue if amount >= 100: significant_prices.append(amount) return len(significant_prices) >= 1 def prepare_for_ai_extraction(html: str) -> str: """Clean HTML for AI extraction — remove noise, keep content.""" # Remove script and style tags cleaned = re.sub(r"]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) cleaned = re.sub(r"]*>.*?", "", cleaned, flags=re.DOTALL | re.IGNORECASE) # Remove HTML comments cleaned = re.sub(r"", "", cleaned, flags=re.DOTALL) # Remove nav, header, footer elements for tag in ["nav", "header", "footer"]: cleaned = re.sub( rf"<{tag}[^>]*>.*?", "", cleaned, flags=re.DOTALL | re.IGNORECASE ) # Strip remaining tags but keep text text = re.sub(r"<[^>]+>", " ", cleaned) # Collapse whitespace text = re.sub(r"\s+", " ", text).strip() # Truncate to ~8000 chars (fits well within Haiku context) if len(text) > 8000: text = text[:8000] + "..." return text def enrich_provider(provider_id: int, website: str, db) -> dict: """Crawl a provider's website and extract enrichment data. Returns a dict with what was found. """ result = { "homepage_fetched": False, "description": None, "contact_info": {}, "pricing_page_url": None, "has_pricing": False, "pricing_page_text": None, # cleaned text for AI extraction "pdf_links": [], } # Step 1: Fetch homepage try: homepage = fetch_url(website, timeout=15) result["homepage_fetched"] = True except Exception as e: result["error"] = str(e)[:200] return result # Step 2: Extract description and contact info result["description"] = extract_description(homepage) result["contact_info"] = extract_contact_info(homepage) # Step 3: Find pricing page time.sleep(CRAWL_DELAY) pricing_url = find_pricing_page(website, homepage) if pricing_url: result["pricing_page_url"] = pricing_url try: pricing_html = fetch_url(pricing_url, timeout=15) result["has_pricing"] = check_has_pricing(pricing_html) result["pricing_page_text"] = prepare_for_ai_extraction(pricing_html) # Check for PDF links pdf_links = re.findall( r'href="([^"]*\.pdf[^"]*)"', pricing_html, re.IGNORECASE ) for pdf_href in pdf_links: if pdf_href.startswith("/"): pdf_href = website.rstrip("/") + pdf_href elif not pdf_href.startswith("http"): pdf_href = website.rstrip("/") + "/" + pdf_href result["pdf_links"].append(pdf_href) except Exception: pass else: # Check homepage itself for pricing if check_has_pricing(homepage): result["has_pricing"] = True result["pricing_page_url"] = website result["pricing_page_text"] = prepare_for_ai_extraction(homepage) return result def run(limit: int | None = None, state_filter: str | None = None): """Enrich all providers that have a website but no packages.""" db = get_db() query = """ SELECT fb.id, fb.title, fb.website, fb.business_state FROM funeral_brand fb LEFT JOIN package p ON p.brand_id = fb.id WHERE fb.website IS NOT NULL AND fb.verified = 0 AND p.id IS NULL """ params = [] if state_filter: query += " AND fb.business_state = ?" params.append(state_filter) query += " ORDER BY fb.id" if limit: query += f" LIMIT {limit}" providers = db.execute(query, params).fetchall() print(f"Providers to enrich: {len(providers)}") enriched = 0 pricing_found = 0 failed = 0 for i, prov in enumerate(providers): if (i + 1) % 5 == 0 or i == 0: print(f" [{i+1}/{len(providers)}] {prov['title']}") result = enrich_provider(prov["id"], prov["website"], db) if not result["homepage_fetched"]: failed += 1 db.execute( """UPDATE funeral_brand SET enrichment_status = 'failed', updated_at = datetime('now') WHERE id = ?""", (prov["id"],) ) continue enriched += 1 # Update brand with discovered info updates = {} if result["description"] and not db.execute( "SELECT description FROM funeral_brand WHERE id = ?", (prov["id"],) ).fetchone()["description"]: updates["description"] = result["description"] contact = result["contact_info"] brand = db.execute("SELECT * FROM funeral_brand WHERE id = ?", (prov["id"],)).fetchone() if contact.get("email") and not brand["email"]: updates["email"] = contact["email"] if contact.get("phone") and not brand["phone"]: updates["phone"] = contact["phone"] if result["has_pricing"]: pricing_found += 1 updates["enrichment_status"] = "partial" # has pricing, needs AI extraction else: updates["enrichment_status"] = "partial" # homepage enriched, no pricing if updates: set_parts = [f"{k} = ?" for k in updates] values = list(updates.values()) + [prov["id"]] db.execute( f"UPDATE funeral_brand SET {', '.join(set_parts)}, " f"updated_at = datetime('now') WHERE id = ?", values ) # Store pricing page text for later AI extraction if result["pricing_page_text"]: db.execute( """INSERT OR REPLACE INTO source_record (source_name, source_id, source_url, raw_data, matched_brand_id, match_type) VALUES ('website_crawl', ?, ?, ?, ?, 'enrichment')""", ( f"brand_{prov['id']}", result["pricing_page_url"], json.dumps({ "pricing_text": result["pricing_page_text"], "pdf_links": result["pdf_links"], "has_pricing": result["has_pricing"], }), prov["id"], ) ) if (i + 1) % 10 == 0: db.commit() time.sleep(CRAWL_DELAY) db.commit() print(f"\nDone: {enriched} enriched, {pricing_found} with pricing, {failed} failed") db.close() if __name__ == "__main__": import sys limit = None state = None for arg in sys.argv[1:]: if arg.startswith("--state="): state = arg.split("=")[1] elif arg.startswith("--limit="): limit = int(arg.split("=")[1]) else: try: limit = int(arg) except ValueError: pass run(limit=limit, state_filter=state)