"""Website enrichment module.
For each provider with a website but no packages yet, crawls their site
to find pricing/packages pages and extracts structured data.
Two extraction modes:
1. Direct HTML parsing (for sites with clear pricing structure)
2. AI extraction via API call (for complex/varied layouts)
This module handles the crawling and page discovery.
AI extraction is delegated to the N8N workflow (Claude Haiku node).
"""
import json
import re
import time
import urllib.parse
import urllib.error
from pathlib import Path
from base import fetch_url, get_db, CRAWL_DELAY
# Common URL patterns for pricing/packages pages
PRICING_PATHS = [
"/pricing",
"/prices",
"/our-prices",
"/packages",
"/funeral-packages",
"/services",
"/our-services",
"/funeral-costs",
"/funeral-services",
"/service-options",
"/price-list",
"/transparency",
"/funeral-pricing",
"/costs",
"/cremation",
"/cremation-packages",
"/burial",
"/plan-a-funeral",
"/arrange",
]
# Keywords that suggest a link leads to pricing
PRICING_KEYWORDS = [
"pric", "cost", "packag", "service", "plan",
"cremation", "burial", "funeral",
"transparency", "disclosure",
]
def find_pricing_page(base_url: str, homepage_html: str) -> str | None:
"""Try to find the pricing/packages page URL.
Strategy:
1. Try common URL patterns
2. Parse homepage links for pricing-related keywords
"""
base = base_url.rstrip("/")
# Strategy 1: Try common paths
for path in PRICING_PATHS:
test_url = base + path
try:
html = fetch_url(test_url, timeout=10)
# Verify it's not a 404 soft-redirect (check for pricing content)
if len(html) > 1000 and ("$" in html or "price" in html.lower()):
return test_url
except (urllib.error.HTTPError, urllib.error.URLError, Exception):
continue
time.sleep(0.3)
# Strategy 2: Parse homepage links
link_pattern = re.compile(
r']*href="([^"]*)"[^>]*>(.*?)',
re.IGNORECASE | re.DOTALL
)
for match in link_pattern.finditer(homepage_html):
href = match.group(1)
text = re.sub(r"<[^>]+>", "", match.group(2)).lower().strip()
href_lower = href.lower()
# Check if link text or URL contains pricing keywords
if any(kw in text or kw in href_lower for kw in PRICING_KEYWORDS):
# Resolve relative URLs
if href.startswith("/"):
full_url = base + href
elif href.startswith("http"):
# Only follow links to the same domain
if urllib.parse.urlparse(base).netloc in href:
full_url = href
else:
continue
else:
full_url = base + "/" + href
try:
html = fetch_url(full_url, timeout=10)
if len(html) > 500:
return full_url
except Exception:
continue
time.sleep(0.3)
return None
def extract_description(html: str) -> str | None:
"""Extract a business description from homepage HTML."""
# Try meta description first
meta_match = re.search(
r' 20:
return desc.strip()
# Try OG description
og_match = re.search(
r' 20:
return og_match.group(1).strip()
return None
def extract_contact_info(html: str) -> dict:
"""Extract contact details from HTML."""
info = {}
# Phone
phone_match = re.search(r'href="tel:([^"]+)"', html)
if phone_match:
info["phone"] = phone_match.group(1).strip()
# Email
email_match = re.search(r'href="mailto:([^"?]+)"', html)
if email_match:
info["email"] = email_match.group(1).strip()
# Address from JSON-LD
addr_match = re.search(r'"streetAddress"\s*:\s*"([^"]*)"', html)
if addr_match:
info["address"] = addr_match.group(1)
return info
def check_has_pricing(html: str) -> bool:
"""Quick check whether a page contains pricing information."""
# Look for dollar signs near numbers
price_pattern = re.compile(r'\$[\d,]+(?:\.\d{2})?')
prices_found = price_pattern.findall(html)
# Filter out tiny amounts (likely not funeral pricing)
significant_prices = []
for p in prices_found:
cleaned = p.replace("$", "").replace(",", "").strip()
if not cleaned:
continue
try:
amount = float(cleaned)
except ValueError:
continue
if amount >= 100:
significant_prices.append(amount)
return len(significant_prices) >= 1
def prepare_for_ai_extraction(html: str) -> str:
"""Clean HTML for AI extraction — remove noise, keep content."""
# Remove script and style tags
cleaned = re.sub(r"", "", html,
flags=re.DOTALL | re.IGNORECASE)
cleaned = re.sub(r"", "", cleaned,
flags=re.DOTALL | re.IGNORECASE)
# Remove HTML comments
cleaned = re.sub(r"", "", cleaned, flags=re.DOTALL)
# Remove nav, header, footer elements
for tag in ["nav", "header", "footer"]:
cleaned = re.sub(
rf"<{tag}[^>]*>.*?{tag}>", "", cleaned,
flags=re.DOTALL | re.IGNORECASE
)
# Strip remaining tags but keep text
text = re.sub(r"<[^>]+>", " ", cleaned)
# Collapse whitespace
text = re.sub(r"\s+", " ", text).strip()
# Truncate to ~8000 chars (fits well within Haiku context)
if len(text) > 8000:
text = text[:8000] + "..."
return text
def enrich_provider(provider_id: int, website: str, db) -> dict:
"""Crawl a provider's website and extract enrichment data.
Returns a dict with what was found.
"""
result = {
"homepage_fetched": False,
"description": None,
"contact_info": {},
"pricing_page_url": None,
"has_pricing": False,
"pricing_page_text": None, # cleaned text for AI extraction
"pdf_links": [],
}
# Step 1: Fetch homepage
try:
homepage = fetch_url(website, timeout=15)
result["homepage_fetched"] = True
except Exception as e:
result["error"] = str(e)[:200]
return result
# Step 2: Extract description and contact info
result["description"] = extract_description(homepage)
result["contact_info"] = extract_contact_info(homepage)
# Step 3: Find pricing page
time.sleep(CRAWL_DELAY)
pricing_url = find_pricing_page(website, homepage)
if pricing_url:
result["pricing_page_url"] = pricing_url
try:
pricing_html = fetch_url(pricing_url, timeout=15)
result["has_pricing"] = check_has_pricing(pricing_html)
result["pricing_page_text"] = prepare_for_ai_extraction(pricing_html)
# Check for PDF links
pdf_links = re.findall(
r'href="([^"]*\.pdf[^"]*)"', pricing_html, re.IGNORECASE
)
for pdf_href in pdf_links:
if pdf_href.startswith("/"):
pdf_href = website.rstrip("/") + pdf_href
elif not pdf_href.startswith("http"):
pdf_href = website.rstrip("/") + "/" + pdf_href
result["pdf_links"].append(pdf_href)
except Exception:
pass
else:
# Check homepage itself for pricing
if check_has_pricing(homepage):
result["has_pricing"] = True
result["pricing_page_url"] = website
result["pricing_page_text"] = prepare_for_ai_extraction(homepage)
return result
def run(limit: int | None = None, state_filter: str | None = None):
"""Enrich all providers that have a website but no packages."""
db = get_db()
query = """
SELECT fb.id, fb.title, fb.website, fb.business_state
FROM funeral_brand fb
LEFT JOIN package p ON p.brand_id = fb.id
WHERE fb.website IS NOT NULL
AND fb.verified = 0
AND p.id IS NULL
"""
params = []
if state_filter:
query += " AND fb.business_state = ?"
params.append(state_filter)
query += " ORDER BY fb.id"
if limit:
query += f" LIMIT {limit}"
providers = db.execute(query, params).fetchall()
print(f"Providers to enrich: {len(providers)}")
enriched = 0
pricing_found = 0
failed = 0
for i, prov in enumerate(providers):
if (i + 1) % 5 == 0 or i == 0:
print(f" [{i+1}/{len(providers)}] {prov['title']}")
result = enrich_provider(prov["id"], prov["website"], db)
if not result["homepage_fetched"]:
failed += 1
db.execute(
"""UPDATE funeral_brand
SET enrichment_status = 'failed', updated_at = datetime('now')
WHERE id = ?""",
(prov["id"],)
)
continue
enriched += 1
# Update brand with discovered info
updates = {}
if result["description"] and not db.execute(
"SELECT description FROM funeral_brand WHERE id = ?", (prov["id"],)
).fetchone()["description"]:
updates["description"] = result["description"]
contact = result["contact_info"]
brand = db.execute("SELECT * FROM funeral_brand WHERE id = ?",
(prov["id"],)).fetchone()
if contact.get("email") and not brand["email"]:
updates["email"] = contact["email"]
if contact.get("phone") and not brand["phone"]:
updates["phone"] = contact["phone"]
if result["has_pricing"]:
pricing_found += 1
updates["enrichment_status"] = "partial" # has pricing, needs AI extraction
else:
updates["enrichment_status"] = "partial" # homepage enriched, no pricing
if updates:
set_parts = [f"{k} = ?" for k in updates]
values = list(updates.values()) + [prov["id"]]
db.execute(
f"UPDATE funeral_brand SET {', '.join(set_parts)}, "
f"updated_at = datetime('now') WHERE id = ?",
values
)
# Store pricing page text for later AI extraction
if result["pricing_page_text"]:
db.execute(
"""INSERT OR REPLACE INTO source_record
(source_name, source_id, source_url, raw_data,
matched_brand_id, match_type)
VALUES ('website_crawl', ?, ?, ?, ?, 'enrichment')""",
(
f"brand_{prov['id']}",
result["pricing_page_url"],
json.dumps({
"pricing_text": result["pricing_page_text"],
"pdf_links": result["pdf_links"],
"has_pricing": result["has_pricing"],
}),
prov["id"],
)
)
if (i + 1) % 10 == 0:
db.commit()
time.sleep(CRAWL_DELAY)
db.commit()
print(f"\nDone: {enriched} enriched, {pricing_found} with pricing, {failed} failed")
db.close()
if __name__ == "__main__":
import sys
limit = None
state = None
for arg in sys.argv[1:]:
if arg.startswith("--state="):
state = arg.split("=")[1]
elif arg.startswith("--limit="):
limit = int(arg.split("=")[1])
else:
try:
limit = int(arg)
except ValueError:
pass
run(limit=limit, state_filter=state)