"""Deduplication and merge engine. Processes source_records → funeral_brand + location + package entries. Handles cross-source matching and field-level merging. Matching hierarchy (strongest to weakest): 1. source_key match — same record from same source (skip/update) 2. ABN match — same business entity 3. Name + Postcode exact match — likely same business 4. Fuzzy name match (>85%) + same state — probable match, flag for review Merge priority (higher = preferred): vic_register > funerals_australia > nfda > gathered_here Never overwrite verified provider data. """ import json import re import sqlite3 from difflib import SequenceMatcher from base import get_db, generate_slug, normalize_state # Source priority for merge conflicts (higher number = more authoritative) SOURCE_PRIORITY = { "vic_register": 40, "funerals_australia": 30, "nfda": 20, "gathered_here": 10, } def normalize_name(name: str) -> str: """Normalize a business name for comparison.""" name = name.strip().upper() # Remove common suffixes for suffix in [" PTY LTD", " PTY. LTD.", " P/L", " LIMITED", " PROPRIETARY LIMITED", " INC", " LLC", " FUNERAL DIRECTORS", " FUNERAL SERVICES", " FUNERALS", " FUNERAL HOME"]: name = name.removesuffix(suffix) # Remove punctuation name = re.sub(r"[''`\".,&()-]", " ", name) name = re.sub(r"\s+", " ", name).strip() return name def fuzzy_match(name1: str, name2: str) -> float: """Return similarity ratio between two names (0.0 to 1.0).""" n1 = normalize_name(name1) n2 = normalize_name(name2) return SequenceMatcher(None, n1, n2).ratio() def find_existing_brand(db: sqlite3.Connection, record: dict) -> tuple[int | None, str]: """Find a matching funeral_brand for a source record. Returns (brand_id, match_type) or (None, 'new'). """ biz = record.get("business", {}) locs = record.get("locations", []) name = biz.get("name", "") abn = biz.get("abn") source = record.get("source", "") source_id = record.get("sourceId", "") source_key = f"{source}:{source_id}" postcode = None state = None if locs: postcode = locs[0].get("postcode") state = locs[0].get("state") # 1. Source key match (exact same record from same source) row = db.execute( "SELECT id FROM funeral_brand WHERE source_key = ?", (source_key,) ).fetchone() if row: return row["id"], "source_key" # 2. ABN match if abn: row = db.execute( "SELECT id FROM funeral_brand WHERE abn = ?", (abn,) ).fetchone() if row: return row["id"], "abn" # 3. Exact name + postcode match if name and postcode: norm = normalize_name(name) # Check all brands — need fuzzy on name rows = db.execute( "SELECT id, title FROM funeral_brand WHERE business_postcode = ?", (postcode,) ).fetchall() for row in rows: if normalize_name(row["title"]) == norm: return row["id"], "name_postcode" # 4. Fuzzy name + same state if name and state: rows = db.execute( "SELECT id, title FROM funeral_brand WHERE business_state = ?", (state,) ).fetchall() for row in rows: score = fuzzy_match(name, row["title"]) if score >= 0.85: return row["id"], "fuzzy" return None, "new" def merge_field(existing: str | None, new_val: str | None, existing_priority: int, new_priority: int) -> str | None: """Merge a single field, preferring non-null and higher-priority.""" if not new_val: return existing if not existing: return new_val # Both have values — prefer higher priority source if new_priority > existing_priority: return new_val return existing def create_brand(db: sqlite3.Connection, record: dict) -> int: """Create a new funeral_brand from a source record.""" biz = record.get("business", {}) locs = record.get("locations", []) source = record.get("source", "") source_id = record.get("sourceId", "") source_key = f"{source}:{source_id}" loc = locs[0] if locs else {} slug = generate_slug(biz.get("name", "unknown")) # Ensure unique slug base_slug = slug counter = 1 while True: existing = db.execute( "SELECT id FROM funeral_brand WHERE code = ?", (slug,) ).fetchone() if not existing: break slug = f"{base_slug}-{counter}" counter += 1 cur = db.execute( """INSERT INTO funeral_brand ( title, description, email, phone, website, abn, code, hidden, verified, source_key, source_url, enrichment_status, business_address, business_suburb, business_state, business_postcode ) VALUES (?, ?, ?, ?, ?, ?, ?, 1, 0, ?, ?, 'pending', ?, ?, ?, ?)""", ( biz.get("name"), biz.get("description"), biz.get("email"), biz.get("phone"), biz.get("website"), biz.get("abn"), slug, source_key, record.get("sourceUrl"), loc.get("address"), loc.get("suburb"), loc.get("state"), loc.get("postcode"), ) ) brand_id = cur.lastrowid # Create locations for loc_data in locs: title_parts = [loc_data.get("suburb", ""), loc_data.get("state", "")] loc_title = ", ".join(p for p in title_parts if p) or biz.get("name", "") db.execute( """INSERT INTO location ( title, address, suburb, state, postcode, lat, lng, brand_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( loc_title, loc_data.get("address"), loc_data.get("suburb"), loc_data.get("state"), loc_data.get("postcode"), loc_data.get("lat"), loc_data.get("lng"), brand_id, ) ) # Create packages (from Gathered Here pricing) packages = record.get("packages", []) for pkg in packages: if not pkg.get("price"): continue cur = db.execute( """INSERT INTO package ( title, funeral_type, brand_id, source_url, extraction_confidence ) VALUES (?, ?, ?, ?, ?)""", ( pkg.get("name"), pkg.get("funeralType"), brand_id, record.get("sourceUrl"), 0.8, # Gathered Here pricing is structured, fairly reliable ) ) pkg_id = cur.lastrowid # Create inclusions if available for inc in pkg.get("inclusions", []): db.execute( """INSERT INTO package_inclusion ( price, optional, complimentary, inclusion_type_title, package_id ) VALUES (?, ?, ?, ?, ?)""", ( inc.get("price", 0), 1 if inc.get("optional") else 0, 1 if inc.get("complimentary") else 0, inc.get("item", "Unknown"), pkg_id, ) ) return brand_id def update_brand(db: sqlite3.Connection, brand_id: int, record: dict, match_type: str) -> bool: """Merge new data into an existing brand. Returns True if updated.""" biz = record.get("business", {}) locs = record.get("locations", []) source = record.get("source", "") new_priority = SOURCE_PRIORITY.get(source, 0) # Never overwrite verified providers brand = db.execute( "SELECT * FROM funeral_brand WHERE id = ?", (brand_id,) ).fetchone() if brand["verified"]: return False # Determine existing source priority existing_source = "" if brand["source_key"]: existing_source = brand["source_key"].split(":")[0] existing_priority = SOURCE_PRIORITY.get(existing_source, 0) # Field-level merge — only fill blanks or upgrade from higher priority updates = {} field_map = { "description": biz.get("description"), "email": biz.get("email"), "phone": biz.get("phone"), "website": biz.get("website"), "abn": biz.get("abn"), } for field, new_val in field_map.items(): merged = merge_field(brand[field], new_val, existing_priority, new_priority) if merged != brand[field]: updates[field] = merged # Update location data if we have coords and existing doesn't if locs: loc = locs[0] existing_locs = db.execute( "SELECT * FROM location WHERE brand_id = ?", (brand_id,) ).fetchall() if not existing_locs and loc.get("suburb"): title_parts = [loc.get("suburb", ""), loc.get("state", "")] loc_title = ", ".join(p for p in title_parts if p) db.execute( """INSERT INTO location ( title, address, suburb, state, postcode, lat, lng, brand_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( loc_title, loc.get("address"), loc.get("suburb"), loc.get("state"), loc.get("postcode"), loc.get("lat"), loc.get("lng"), brand_id, ) ) elif existing_locs: # Update first location with coords if missing eloc = existing_locs[0] if not eloc["lat"] and loc.get("lat"): db.execute( "UPDATE location SET lat = ?, lng = ? WHERE id = ?", (loc.get("lat"), loc.get("lng"), eloc["id"]) ) # Add packages if we have them and brand doesn't yet packages = record.get("packages", []) if packages: existing_pkgs = db.execute( "SELECT COUNT(*) as n FROM package WHERE brand_id = ?", (brand_id,) ).fetchone()["n"] if existing_pkgs == 0: for pkg in packages: if not pkg.get("price"): continue cur = db.execute( """INSERT INTO package ( title, funeral_type, brand_id, source_url ) VALUES (?, ?, ?, ?)""", (pkg.get("name"), pkg.get("funeralType"), brand_id, record.get("sourceUrl")) ) if updates: set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [brand_id] db.execute( f"UPDATE funeral_brand SET {set_clause}, updated_at = datetime('now') WHERE id = ?", values ) return True return False def process_all(): """Process all source_records through deduplication and create brand entries. Order matters: process higher-priority sources first so their data forms the base record that lower-priority sources merge into. """ db = get_db() # Process in priority order (highest first) sources_ordered = sorted(SOURCE_PRIORITY.keys(), key=lambda s: SOURCE_PRIORITY[s], reverse=True) stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0} print("=" * 60) print("DEDUPLICATION ENGINE") print("=" * 60) for source in sources_ordered: records = db.execute( """SELECT id, normalized_data FROM source_record WHERE source_name = ? AND normalized_data IS NOT NULL""", (source,) ).fetchall() if not records: continue print(f"\n Processing {source}: {len(records)} records") source_stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0} for row in records: record = json.loads(row["normalized_data"]) brand_id, match_type = find_existing_brand(db, record) if match_type == "new": brand_id = create_brand(db, record) source_stats["new"] += 1 elif match_type == "source_key": source_stats["skipped"] += 1 else: # Matched to existing — merge updated = update_brand(db, brand_id, record, match_type) if updated: source_stats["updated"] += 1 else: source_stats["matched"] += 1 # Update source_record with match info db.execute( """UPDATE source_record SET matched_brand_id = ?, match_type = ?, processed_at = datetime('now') WHERE id = ?""", (brand_id, match_type, row["id"]) ) db.commit() print(f" New: {source_stats['new']}, Updated: {source_stats['updated']}, " f"Matched: {source_stats['matched']}, Skipped: {source_stats['skipped']}") for k, v in source_stats.items(): stats[k] += v # Final summary total_brands = db.execute("SELECT COUNT(*) as n FROM funeral_brand").fetchone()["n"] total_locations = db.execute("SELECT COUNT(*) as n FROM location").fetchone()["n"] total_packages = db.execute("SELECT COUNT(*) as n FROM package").fetchone()["n"] print(f"\n{'=' * 60}") print(f"DEDUP RESULTS") print(f"{'=' * 60}") print(f" New brands created: {stats['new']}") print(f" Existing updated: {stats['updated']}") print(f" Matched (no change): {stats['matched']}") print(f" Skipped (source_key): {stats['skipped']}") print(f"\n Total brands in DB: {total_brands}") print(f" Total locations in DB: {total_locations}") print(f" Total packages in DB: {total_packages}") # Show match type breakdown print(f"\n Match type breakdown:") rows = db.execute( """SELECT match_type, COUNT(*) as n FROM source_record WHERE processed_at IS NOT NULL GROUP BY match_type ORDER BY n DESC""" ).fetchall() for row in rows: print(f" {row['match_type']:15s} {row['n']:5d}") db.close() if __name__ == "__main__": process_all()