Files
Provider-Crawl/crawlers/dedup.py
Richie cc91427789 Initial commit: funeral provider discovery pipeline
Python crawlers for VIC Register, Funerals Australia, NFDA
n8n workflows for scheduled discovery and enrichment
SQLite schema and seeded dev database (1,463 providers)
End-to-end process documentation in n8n/PROCESS.md
2026-04-24 10:27:08 +10:00

426 lines
14 KiB
Python

"""Deduplication and merge engine.
Processes source_records → funeral_brand + location + package entries.
Handles cross-source matching and field-level merging.
Matching hierarchy (strongest to weakest):
1. source_key match — same record from same source (skip/update)
2. ABN match — same business entity
3. Name + Postcode exact match — likely same business
4. Fuzzy name match (>85%) + same state — probable match, flag for review
Merge priority (higher = preferred):
vic_register > funerals_australia > nfda > gathered_here
Never overwrite verified provider data.
"""
import json
import re
import sqlite3
from difflib import SequenceMatcher
from base import get_db, generate_slug, normalize_state
# Source priority for merge conflicts (higher number = more authoritative)
SOURCE_PRIORITY = {
"vic_register": 40,
"funerals_australia": 30,
"nfda": 20,
"gathered_here": 10,
}
def normalize_name(name: str) -> str:
"""Normalize a business name for comparison."""
name = name.strip().upper()
# Remove common suffixes
for suffix in [" PTY LTD", " PTY. LTD.", " P/L", " LIMITED",
" PROPRIETARY LIMITED", " INC", " LLC",
" FUNERAL DIRECTORS", " FUNERAL SERVICES",
" FUNERALS", " FUNERAL HOME"]:
name = name.removesuffix(suffix)
# Remove punctuation
name = re.sub(r"[''`\".,&()-]", " ", name)
name = re.sub(r"\s+", " ", name).strip()
return name
def fuzzy_match(name1: str, name2: str) -> float:
"""Return similarity ratio between two names (0.0 to 1.0)."""
n1 = normalize_name(name1)
n2 = normalize_name(name2)
return SequenceMatcher(None, n1, n2).ratio()
def find_existing_brand(db: sqlite3.Connection, record: dict) -> tuple[int | None, str]:
"""Find a matching funeral_brand for a source record.
Returns (brand_id, match_type) or (None, 'new').
"""
biz = record.get("business", {})
locs = record.get("locations", [])
name = biz.get("name", "")
abn = biz.get("abn")
source = record.get("source", "")
source_id = record.get("sourceId", "")
source_key = f"{source}:{source_id}"
postcode = None
state = None
if locs:
postcode = locs[0].get("postcode")
state = locs[0].get("state")
# 1. Source key match (exact same record from same source)
row = db.execute(
"SELECT id FROM funeral_brand WHERE source_key = ?",
(source_key,)
).fetchone()
if row:
return row["id"], "source_key"
# 2. ABN match
if abn:
row = db.execute(
"SELECT id FROM funeral_brand WHERE abn = ?",
(abn,)
).fetchone()
if row:
return row["id"], "abn"
# 3. Exact name + postcode match
if name and postcode:
norm = normalize_name(name)
# Check all brands — need fuzzy on name
rows = db.execute(
"SELECT id, title FROM funeral_brand WHERE business_postcode = ?",
(postcode,)
).fetchall()
for row in rows:
if normalize_name(row["title"]) == norm:
return row["id"], "name_postcode"
# 4. Fuzzy name + same state
if name and state:
rows = db.execute(
"SELECT id, title FROM funeral_brand WHERE business_state = ?",
(state,)
).fetchall()
for row in rows:
score = fuzzy_match(name, row["title"])
if score >= 0.85:
return row["id"], "fuzzy"
return None, "new"
def merge_field(existing: str | None, new_val: str | None,
existing_priority: int, new_priority: int) -> str | None:
"""Merge a single field, preferring non-null and higher-priority."""
if not new_val:
return existing
if not existing:
return new_val
# Both have values — prefer higher priority source
if new_priority > existing_priority:
return new_val
return existing
def create_brand(db: sqlite3.Connection, record: dict) -> int:
"""Create a new funeral_brand from a source record."""
biz = record.get("business", {})
locs = record.get("locations", [])
source = record.get("source", "")
source_id = record.get("sourceId", "")
source_key = f"{source}:{source_id}"
loc = locs[0] if locs else {}
slug = generate_slug(biz.get("name", "unknown"))
# Ensure unique slug
base_slug = slug
counter = 1
while True:
existing = db.execute(
"SELECT id FROM funeral_brand WHERE code = ?", (slug,)
).fetchone()
if not existing:
break
slug = f"{base_slug}-{counter}"
counter += 1
cur = db.execute(
"""INSERT INTO funeral_brand (
title, description, email, phone, website, abn, code,
hidden, verified, source_key, source_url, enrichment_status,
business_address, business_suburb, business_state, business_postcode
) VALUES (?, ?, ?, ?, ?, ?, ?, 1, 0, ?, ?, 'pending', ?, ?, ?, ?)""",
(
biz.get("name"),
biz.get("description"),
biz.get("email"),
biz.get("phone"),
biz.get("website"),
biz.get("abn"),
slug,
source_key,
record.get("sourceUrl"),
loc.get("address"),
loc.get("suburb"),
loc.get("state"),
loc.get("postcode"),
)
)
brand_id = cur.lastrowid
# Create locations
for loc_data in locs:
title_parts = [loc_data.get("suburb", ""), loc_data.get("state", "")]
loc_title = ", ".join(p for p in title_parts if p) or biz.get("name", "")
db.execute(
"""INSERT INTO location (
title, address, suburb, state, postcode, lat, lng, brand_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
loc_title,
loc_data.get("address"),
loc_data.get("suburb"),
loc_data.get("state"),
loc_data.get("postcode"),
loc_data.get("lat"),
loc_data.get("lng"),
brand_id,
)
)
# Create packages (from Gathered Here pricing)
packages = record.get("packages", [])
for pkg in packages:
if not pkg.get("price"):
continue
cur = db.execute(
"""INSERT INTO package (
title, funeral_type, brand_id, source_url, extraction_confidence
) VALUES (?, ?, ?, ?, ?)""",
(
pkg.get("name"),
pkg.get("funeralType"),
brand_id,
record.get("sourceUrl"),
0.8, # Gathered Here pricing is structured, fairly reliable
)
)
pkg_id = cur.lastrowid
# Create inclusions if available
for inc in pkg.get("inclusions", []):
db.execute(
"""INSERT INTO package_inclusion (
price, optional, complimentary, inclusion_type_title, package_id
) VALUES (?, ?, ?, ?, ?)""",
(
inc.get("price", 0),
1 if inc.get("optional") else 0,
1 if inc.get("complimentary") else 0,
inc.get("item", "Unknown"),
pkg_id,
)
)
return brand_id
def update_brand(db: sqlite3.Connection, brand_id: int,
record: dict, match_type: str) -> bool:
"""Merge new data into an existing brand. Returns True if updated."""
biz = record.get("business", {})
locs = record.get("locations", [])
source = record.get("source", "")
new_priority = SOURCE_PRIORITY.get(source, 0)
# Never overwrite verified providers
brand = db.execute(
"SELECT * FROM funeral_brand WHERE id = ?", (brand_id,)
).fetchone()
if brand["verified"]:
return False
# Determine existing source priority
existing_source = ""
if brand["source_key"]:
existing_source = brand["source_key"].split(":")[0]
existing_priority = SOURCE_PRIORITY.get(existing_source, 0)
# Field-level merge — only fill blanks or upgrade from higher priority
updates = {}
field_map = {
"description": biz.get("description"),
"email": biz.get("email"),
"phone": biz.get("phone"),
"website": biz.get("website"),
"abn": biz.get("abn"),
}
for field, new_val in field_map.items():
merged = merge_field(brand[field], new_val, existing_priority, new_priority)
if merged != brand[field]:
updates[field] = merged
# Update location data if we have coords and existing doesn't
if locs:
loc = locs[0]
existing_locs = db.execute(
"SELECT * FROM location WHERE brand_id = ?", (brand_id,)
).fetchall()
if not existing_locs and loc.get("suburb"):
title_parts = [loc.get("suburb", ""), loc.get("state", "")]
loc_title = ", ".join(p for p in title_parts if p)
db.execute(
"""INSERT INTO location (
title, address, suburb, state, postcode, lat, lng, brand_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
loc_title, loc.get("address"), loc.get("suburb"),
loc.get("state"), loc.get("postcode"),
loc.get("lat"), loc.get("lng"), brand_id,
)
)
elif existing_locs:
# Update first location with coords if missing
eloc = existing_locs[0]
if not eloc["lat"] and loc.get("lat"):
db.execute(
"UPDATE location SET lat = ?, lng = ? WHERE id = ?",
(loc.get("lat"), loc.get("lng"), eloc["id"])
)
# Add packages if we have them and brand doesn't yet
packages = record.get("packages", [])
if packages:
existing_pkgs = db.execute(
"SELECT COUNT(*) as n FROM package WHERE brand_id = ?", (brand_id,)
).fetchone()["n"]
if existing_pkgs == 0:
for pkg in packages:
if not pkg.get("price"):
continue
cur = db.execute(
"""INSERT INTO package (
title, funeral_type, brand_id, source_url
) VALUES (?, ?, ?, ?)""",
(pkg.get("name"), pkg.get("funeralType"),
brand_id, record.get("sourceUrl"))
)
if updates:
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [brand_id]
db.execute(
f"UPDATE funeral_brand SET {set_clause}, updated_at = datetime('now') WHERE id = ?",
values
)
return True
return False
def process_all():
"""Process all source_records through deduplication and create brand entries.
Order matters: process higher-priority sources first so their data
forms the base record that lower-priority sources merge into.
"""
db = get_db()
# Process in priority order (highest first)
sources_ordered = sorted(SOURCE_PRIORITY.keys(),
key=lambda s: SOURCE_PRIORITY[s], reverse=True)
stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0}
print("=" * 60)
print("DEDUPLICATION ENGINE")
print("=" * 60)
for source in sources_ordered:
records = db.execute(
"""SELECT id, normalized_data FROM source_record
WHERE source_name = ? AND normalized_data IS NOT NULL""",
(source,)
).fetchall()
if not records:
continue
print(f"\n Processing {source}: {len(records)} records")
source_stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0}
for row in records:
record = json.loads(row["normalized_data"])
brand_id, match_type = find_existing_brand(db, record)
if match_type == "new":
brand_id = create_brand(db, record)
source_stats["new"] += 1
elif match_type == "source_key":
source_stats["skipped"] += 1
else:
# Matched to existing — merge
updated = update_brand(db, brand_id, record, match_type)
if updated:
source_stats["updated"] += 1
else:
source_stats["matched"] += 1
# Update source_record with match info
db.execute(
"""UPDATE source_record
SET matched_brand_id = ?, match_type = ?, processed_at = datetime('now')
WHERE id = ?""",
(brand_id, match_type, row["id"])
)
db.commit()
print(f" New: {source_stats['new']}, Updated: {source_stats['updated']}, "
f"Matched: {source_stats['matched']}, Skipped: {source_stats['skipped']}")
for k, v in source_stats.items():
stats[k] += v
# Final summary
total_brands = db.execute("SELECT COUNT(*) as n FROM funeral_brand").fetchone()["n"]
total_locations = db.execute("SELECT COUNT(*) as n FROM location").fetchone()["n"]
total_packages = db.execute("SELECT COUNT(*) as n FROM package").fetchone()["n"]
print(f"\n{'=' * 60}")
print(f"DEDUP RESULTS")
print(f"{'=' * 60}")
print(f" New brands created: {stats['new']}")
print(f" Existing updated: {stats['updated']}")
print(f" Matched (no change): {stats['matched']}")
print(f" Skipped (source_key): {stats['skipped']}")
print(f"\n Total brands in DB: {total_brands}")
print(f" Total locations in DB: {total_locations}")
print(f" Total packages in DB: {total_packages}")
# Show match type breakdown
print(f"\n Match type breakdown:")
rows = db.execute(
"""SELECT match_type, COUNT(*) as n
FROM source_record WHERE processed_at IS NOT NULL
GROUP BY match_type ORDER BY n DESC"""
).fetchall()
for row in rows:
print(f" {row['match_type']:15s} {row['n']:5d}")
db.close()
if __name__ == "__main__":
process_all()