Python crawlers for VIC Register, Funerals Australia, NFDA n8n workflows for scheduled discovery and enrichment SQLite schema and seeded dev database (1,463 providers) End-to-end process documentation in n8n/PROCESS.md
426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""Deduplication and merge engine.
|
|
|
|
Processes source_records → funeral_brand + location + package entries.
|
|
Handles cross-source matching and field-level merging.
|
|
|
|
Matching hierarchy (strongest to weakest):
|
|
1. source_key match — same record from same source (skip/update)
|
|
2. ABN match — same business entity
|
|
3. Name + Postcode exact match — likely same business
|
|
4. Fuzzy name match (>85%) + same state — probable match, flag for review
|
|
|
|
Merge priority (higher = preferred):
|
|
vic_register > funerals_australia > nfda > gathered_here
|
|
|
|
Never overwrite verified provider data.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
from difflib import SequenceMatcher
|
|
|
|
from base import get_db, generate_slug, normalize_state
|
|
|
|
# Source priority for merge conflicts (higher number = more authoritative)
|
|
SOURCE_PRIORITY = {
|
|
"vic_register": 40,
|
|
"funerals_australia": 30,
|
|
"nfda": 20,
|
|
"gathered_here": 10,
|
|
}
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize a business name for comparison."""
|
|
name = name.strip().upper()
|
|
# Remove common suffixes
|
|
for suffix in [" PTY LTD", " PTY. LTD.", " P/L", " LIMITED",
|
|
" PROPRIETARY LIMITED", " INC", " LLC",
|
|
" FUNERAL DIRECTORS", " FUNERAL SERVICES",
|
|
" FUNERALS", " FUNERAL HOME"]:
|
|
name = name.removesuffix(suffix)
|
|
# Remove punctuation
|
|
name = re.sub(r"[''`\".,&()-]", " ", name)
|
|
name = re.sub(r"\s+", " ", name).strip()
|
|
return name
|
|
|
|
|
|
def fuzzy_match(name1: str, name2: str) -> float:
|
|
"""Return similarity ratio between two names (0.0 to 1.0)."""
|
|
n1 = normalize_name(name1)
|
|
n2 = normalize_name(name2)
|
|
return SequenceMatcher(None, n1, n2).ratio()
|
|
|
|
|
|
def find_existing_brand(db: sqlite3.Connection, record: dict) -> tuple[int | None, str]:
|
|
"""Find a matching funeral_brand for a source record.
|
|
|
|
Returns (brand_id, match_type) or (None, 'new').
|
|
"""
|
|
biz = record.get("business", {})
|
|
locs = record.get("locations", [])
|
|
name = biz.get("name", "")
|
|
abn = biz.get("abn")
|
|
source = record.get("source", "")
|
|
source_id = record.get("sourceId", "")
|
|
source_key = f"{source}:{source_id}"
|
|
|
|
postcode = None
|
|
state = None
|
|
if locs:
|
|
postcode = locs[0].get("postcode")
|
|
state = locs[0].get("state")
|
|
|
|
# 1. Source key match (exact same record from same source)
|
|
row = db.execute(
|
|
"SELECT id FROM funeral_brand WHERE source_key = ?",
|
|
(source_key,)
|
|
).fetchone()
|
|
if row:
|
|
return row["id"], "source_key"
|
|
|
|
# 2. ABN match
|
|
if abn:
|
|
row = db.execute(
|
|
"SELECT id FROM funeral_brand WHERE abn = ?",
|
|
(abn,)
|
|
).fetchone()
|
|
if row:
|
|
return row["id"], "abn"
|
|
|
|
# 3. Exact name + postcode match
|
|
if name and postcode:
|
|
norm = normalize_name(name)
|
|
# Check all brands — need fuzzy on name
|
|
rows = db.execute(
|
|
"SELECT id, title FROM funeral_brand WHERE business_postcode = ?",
|
|
(postcode,)
|
|
).fetchall()
|
|
for row in rows:
|
|
if normalize_name(row["title"]) == norm:
|
|
return row["id"], "name_postcode"
|
|
|
|
# 4. Fuzzy name + same state
|
|
if name and state:
|
|
rows = db.execute(
|
|
"SELECT id, title FROM funeral_brand WHERE business_state = ?",
|
|
(state,)
|
|
).fetchall()
|
|
for row in rows:
|
|
score = fuzzy_match(name, row["title"])
|
|
if score >= 0.85:
|
|
return row["id"], "fuzzy"
|
|
|
|
return None, "new"
|
|
|
|
|
|
def merge_field(existing: str | None, new_val: str | None,
|
|
existing_priority: int, new_priority: int) -> str | None:
|
|
"""Merge a single field, preferring non-null and higher-priority."""
|
|
if not new_val:
|
|
return existing
|
|
if not existing:
|
|
return new_val
|
|
# Both have values — prefer higher priority source
|
|
if new_priority > existing_priority:
|
|
return new_val
|
|
return existing
|
|
|
|
|
|
def create_brand(db: sqlite3.Connection, record: dict) -> int:
|
|
"""Create a new funeral_brand from a source record."""
|
|
biz = record.get("business", {})
|
|
locs = record.get("locations", [])
|
|
source = record.get("source", "")
|
|
source_id = record.get("sourceId", "")
|
|
source_key = f"{source}:{source_id}"
|
|
|
|
loc = locs[0] if locs else {}
|
|
slug = generate_slug(biz.get("name", "unknown"))
|
|
|
|
# Ensure unique slug
|
|
base_slug = slug
|
|
counter = 1
|
|
while True:
|
|
existing = db.execute(
|
|
"SELECT id FROM funeral_brand WHERE code = ?", (slug,)
|
|
).fetchone()
|
|
if not existing:
|
|
break
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
cur = db.execute(
|
|
"""INSERT INTO funeral_brand (
|
|
title, description, email, phone, website, abn, code,
|
|
hidden, verified, source_key, source_url, enrichment_status,
|
|
business_address, business_suburb, business_state, business_postcode
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, 1, 0, ?, ?, 'pending', ?, ?, ?, ?)""",
|
|
(
|
|
biz.get("name"),
|
|
biz.get("description"),
|
|
biz.get("email"),
|
|
biz.get("phone"),
|
|
biz.get("website"),
|
|
biz.get("abn"),
|
|
slug,
|
|
source_key,
|
|
record.get("sourceUrl"),
|
|
loc.get("address"),
|
|
loc.get("suburb"),
|
|
loc.get("state"),
|
|
loc.get("postcode"),
|
|
)
|
|
)
|
|
brand_id = cur.lastrowid
|
|
|
|
# Create locations
|
|
for loc_data in locs:
|
|
title_parts = [loc_data.get("suburb", ""), loc_data.get("state", "")]
|
|
loc_title = ", ".join(p for p in title_parts if p) or biz.get("name", "")
|
|
|
|
db.execute(
|
|
"""INSERT INTO location (
|
|
title, address, suburb, state, postcode, lat, lng, brand_id
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
loc_title,
|
|
loc_data.get("address"),
|
|
loc_data.get("suburb"),
|
|
loc_data.get("state"),
|
|
loc_data.get("postcode"),
|
|
loc_data.get("lat"),
|
|
loc_data.get("lng"),
|
|
brand_id,
|
|
)
|
|
)
|
|
|
|
# Create packages (from Gathered Here pricing)
|
|
packages = record.get("packages", [])
|
|
for pkg in packages:
|
|
if not pkg.get("price"):
|
|
continue
|
|
cur = db.execute(
|
|
"""INSERT INTO package (
|
|
title, funeral_type, brand_id, source_url, extraction_confidence
|
|
) VALUES (?, ?, ?, ?, ?)""",
|
|
(
|
|
pkg.get("name"),
|
|
pkg.get("funeralType"),
|
|
brand_id,
|
|
record.get("sourceUrl"),
|
|
0.8, # Gathered Here pricing is structured, fairly reliable
|
|
)
|
|
)
|
|
pkg_id = cur.lastrowid
|
|
|
|
# Create inclusions if available
|
|
for inc in pkg.get("inclusions", []):
|
|
db.execute(
|
|
"""INSERT INTO package_inclusion (
|
|
price, optional, complimentary, inclusion_type_title, package_id
|
|
) VALUES (?, ?, ?, ?, ?)""",
|
|
(
|
|
inc.get("price", 0),
|
|
1 if inc.get("optional") else 0,
|
|
1 if inc.get("complimentary") else 0,
|
|
inc.get("item", "Unknown"),
|
|
pkg_id,
|
|
)
|
|
)
|
|
|
|
return brand_id
|
|
|
|
|
|
def update_brand(db: sqlite3.Connection, brand_id: int,
|
|
record: dict, match_type: str) -> bool:
|
|
"""Merge new data into an existing brand. Returns True if updated."""
|
|
biz = record.get("business", {})
|
|
locs = record.get("locations", [])
|
|
source = record.get("source", "")
|
|
new_priority = SOURCE_PRIORITY.get(source, 0)
|
|
|
|
# Never overwrite verified providers
|
|
brand = db.execute(
|
|
"SELECT * FROM funeral_brand WHERE id = ?", (brand_id,)
|
|
).fetchone()
|
|
if brand["verified"]:
|
|
return False
|
|
|
|
# Determine existing source priority
|
|
existing_source = ""
|
|
if brand["source_key"]:
|
|
existing_source = brand["source_key"].split(":")[0]
|
|
existing_priority = SOURCE_PRIORITY.get(existing_source, 0)
|
|
|
|
# Field-level merge — only fill blanks or upgrade from higher priority
|
|
updates = {}
|
|
field_map = {
|
|
"description": biz.get("description"),
|
|
"email": biz.get("email"),
|
|
"phone": biz.get("phone"),
|
|
"website": biz.get("website"),
|
|
"abn": biz.get("abn"),
|
|
}
|
|
|
|
for field, new_val in field_map.items():
|
|
merged = merge_field(brand[field], new_val, existing_priority, new_priority)
|
|
if merged != brand[field]:
|
|
updates[field] = merged
|
|
|
|
# Update location data if we have coords and existing doesn't
|
|
if locs:
|
|
loc = locs[0]
|
|
existing_locs = db.execute(
|
|
"SELECT * FROM location WHERE brand_id = ?", (brand_id,)
|
|
).fetchall()
|
|
|
|
if not existing_locs and loc.get("suburb"):
|
|
title_parts = [loc.get("suburb", ""), loc.get("state", "")]
|
|
loc_title = ", ".join(p for p in title_parts if p)
|
|
db.execute(
|
|
"""INSERT INTO location (
|
|
title, address, suburb, state, postcode, lat, lng, brand_id
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
loc_title, loc.get("address"), loc.get("suburb"),
|
|
loc.get("state"), loc.get("postcode"),
|
|
loc.get("lat"), loc.get("lng"), brand_id,
|
|
)
|
|
)
|
|
elif existing_locs:
|
|
# Update first location with coords if missing
|
|
eloc = existing_locs[0]
|
|
if not eloc["lat"] and loc.get("lat"):
|
|
db.execute(
|
|
"UPDATE location SET lat = ?, lng = ? WHERE id = ?",
|
|
(loc.get("lat"), loc.get("lng"), eloc["id"])
|
|
)
|
|
|
|
# Add packages if we have them and brand doesn't yet
|
|
packages = record.get("packages", [])
|
|
if packages:
|
|
existing_pkgs = db.execute(
|
|
"SELECT COUNT(*) as n FROM package WHERE brand_id = ?", (brand_id,)
|
|
).fetchone()["n"]
|
|
|
|
if existing_pkgs == 0:
|
|
for pkg in packages:
|
|
if not pkg.get("price"):
|
|
continue
|
|
cur = db.execute(
|
|
"""INSERT INTO package (
|
|
title, funeral_type, brand_id, source_url
|
|
) VALUES (?, ?, ?, ?)""",
|
|
(pkg.get("name"), pkg.get("funeralType"),
|
|
brand_id, record.get("sourceUrl"))
|
|
)
|
|
|
|
if updates:
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [brand_id]
|
|
db.execute(
|
|
f"UPDATE funeral_brand SET {set_clause}, updated_at = datetime('now') WHERE id = ?",
|
|
values
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def process_all():
|
|
"""Process all source_records through deduplication and create brand entries.
|
|
|
|
Order matters: process higher-priority sources first so their data
|
|
forms the base record that lower-priority sources merge into.
|
|
"""
|
|
db = get_db()
|
|
|
|
# Process in priority order (highest first)
|
|
sources_ordered = sorted(SOURCE_PRIORITY.keys(),
|
|
key=lambda s: SOURCE_PRIORITY[s], reverse=True)
|
|
|
|
stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0}
|
|
|
|
print("=" * 60)
|
|
print("DEDUPLICATION ENGINE")
|
|
print("=" * 60)
|
|
|
|
for source in sources_ordered:
|
|
records = db.execute(
|
|
"""SELECT id, normalized_data FROM source_record
|
|
WHERE source_name = ? AND normalized_data IS NOT NULL""",
|
|
(source,)
|
|
).fetchall()
|
|
|
|
if not records:
|
|
continue
|
|
|
|
print(f"\n Processing {source}: {len(records)} records")
|
|
source_stats = {"new": 0, "updated": 0, "skipped": 0, "matched": 0}
|
|
|
|
for row in records:
|
|
record = json.loads(row["normalized_data"])
|
|
brand_id, match_type = find_existing_brand(db, record)
|
|
|
|
if match_type == "new":
|
|
brand_id = create_brand(db, record)
|
|
source_stats["new"] += 1
|
|
elif match_type == "source_key":
|
|
source_stats["skipped"] += 1
|
|
else:
|
|
# Matched to existing — merge
|
|
updated = update_brand(db, brand_id, record, match_type)
|
|
if updated:
|
|
source_stats["updated"] += 1
|
|
else:
|
|
source_stats["matched"] += 1
|
|
|
|
# Update source_record with match info
|
|
db.execute(
|
|
"""UPDATE source_record
|
|
SET matched_brand_id = ?, match_type = ?, processed_at = datetime('now')
|
|
WHERE id = ?""",
|
|
(brand_id, match_type, row["id"])
|
|
)
|
|
|
|
db.commit()
|
|
print(f" New: {source_stats['new']}, Updated: {source_stats['updated']}, "
|
|
f"Matched: {source_stats['matched']}, Skipped: {source_stats['skipped']}")
|
|
|
|
for k, v in source_stats.items():
|
|
stats[k] += v
|
|
|
|
# Final summary
|
|
total_brands = db.execute("SELECT COUNT(*) as n FROM funeral_brand").fetchone()["n"]
|
|
total_locations = db.execute("SELECT COUNT(*) as n FROM location").fetchone()["n"]
|
|
total_packages = db.execute("SELECT COUNT(*) as n FROM package").fetchone()["n"]
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print(f"DEDUP RESULTS")
|
|
print(f"{'=' * 60}")
|
|
print(f" New brands created: {stats['new']}")
|
|
print(f" Existing updated: {stats['updated']}")
|
|
print(f" Matched (no change): {stats['matched']}")
|
|
print(f" Skipped (source_key): {stats['skipped']}")
|
|
print(f"\n Total brands in DB: {total_brands}")
|
|
print(f" Total locations in DB: {total_locations}")
|
|
print(f" Total packages in DB: {total_packages}")
|
|
|
|
# Show match type breakdown
|
|
print(f"\n Match type breakdown:")
|
|
rows = db.execute(
|
|
"""SELECT match_type, COUNT(*) as n
|
|
FROM source_record WHERE processed_at IS NOT NULL
|
|
GROUP BY match_type ORDER BY n DESC"""
|
|
).fetchall()
|
|
for row in rows:
|
|
print(f" {row['match_type']:15s} {row['n']:5d}")
|
|
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
process_all()
|