Building Matchbox DAGs¶
Entity resolution usually needs more than one matching step. Matchbox uses a directed acyclic graph (DAG) to describe that workflow.
In Matchbox, a complete pipeline uses three kinds of node:
Sourcesteps index data from warehouses or files.Modelsteps score pairs of records that may refer to the same entity.Resolversteps turn one or more model outputs into entity clusters.
This guide walks through the full flow: define sources, build models, add resolvers, run the DAG, and publish a default run.
Understanding Matchbox DAGs¶
Sources provide raw records. Models emit scored edges. Resolvers consume model outputs and materialise the clusters that Matchbox queries. A DAG can alternate between scoring and clustering layers several times.
graph TD
SA["Source A"] --> MA["Deduper A"]
MA --> RA["Resolver A"]
RA --> LAB["Linker AB"]
SB["Source B"] --> LAB
LAB --> RF["Final resolver"]
All objects are lazy in a Matchbox DAG. You must explicitly run them after you define them.
# Queries are lazy
query_data = query.data()
# Steps are lazy
model_scores = dedupe_companies_house.run()
resolver_clusters = companies_resolver.run()
# DAGs are lazy
dag = DAG("companies").load_default()
dag.run_and_sync()
1. Defining a DAG¶
Create a DAG and call
new_run() to start an editable run.
The DAG owns every source, model, and resolver you define afterwards. A run is
the stored snapshot of that DAG inside the collection. The
new_run() method creates the run that you
populate and sync.
2. Defining sources¶
Add each dataset with source(). See
Source for the full argument list.
from sqlalchemy import create_engine
from matchbox.client import RelationalDBLocation
# Configure a warehouse location
engine = create_engine("postgresql://user:password@host:port/database")
warehouse = RelationalDBLocation(name="warehouse").set_client(engine)
# Add sources
companies_house = dag.source(
name="companies_house",
location=warehouse,
extract_transform="""
select
id,
company_number::text as company_number,
company_name,
upper(postcode) as postcode
from companieshouse.companies
""",
infer_types=True,
index_fields=["company_name", "company_number", "postcode"],
key_field="id",
)
exporters = dag.source(
name="hmrc_exporters",
location=warehouse,
extract_transform="""
select
id,
company_name,
upper(postcode) as postcode
from hmrc.trade__exporters
""",
infer_types=True,
index_fields=["company_name", "postcode"],
key_field="id",
)
Each source needs:
- A
location, such asRelationalDBLocation. - An
extract_transformstatement that produces the source data. - A list of
index_fieldsthat Matchbox is allowed to match on. - A
key_fieldthat uniquely identifies each record in the source output.
3. Preparing query data¶
To prepare model inputs, call query()
on a source or query() on a
resolver. See Query for the full argument
list.
Cleaning query data¶
The cleaning dictionary controls which columns flow into the model.
- The dictionary key becomes the output column name.
- The dictionary value is a DuckDB SQL expression.
- Only the cleaned columns you declare are passed through, plus
id,leaf_id, and key columns.
Field qualification¶
Without cleaning, Matchbox qualifies source fields with the source name.
companies_house.query().data()
# Columns: id, companies_house_key, companies_house_company_name, ...
With cleaning, the result columns use the aliases you define.
companies_house.query(
cleaning={
"name": f"lower({companies_house.f('company_name')})",
"number": companies_house.f("company_number"),
}
).data()
# Columns: id, companies_house_key, name, number
Use source.f() inside cleaning expressions when you need a qualified reference.
Alias fields for better model settings
Want to avoid writing source.f() in your model_settings?
Use the cleaning dictionary to alias the field, even if you don’t clean it.
# Without aliasing — source.f() required in model_settings
companies_house
.query()
.deduper(
...
model_settings={
"unique_fields": [companies_house.f("company_name")]
}
)
# With aliasing — use the simple string name instead
companies_house
.query(
cleaning={"name": companies_house.f("company_name")}
)
.deduper(
...
model_settings={"unique_fields": ["name"]}
)
4. Creating models¶
To create a model, call deduper() or
linker() on a query. See
Model for the full argument list.
Choosing model methodologies¶
Matchbox includes deterministic, weighted, and learned linkers, as well as dedupers.
NaiveDedupergroups records by identical cleaned fields.DeterministicLinkerlinks records with explicit DuckDB comparison rules.WeightedDeterministicLinkercombines several deterministic checks into a weighted score.SplinkLinkeremits learned scores using Splink.
See the models API for full settings.
Creating source-level dedupers¶
To create a deduper, call query() on a
source and then deduper().
from matchbox.client.models.dedupers.naive import NaiveDeduper
dedupe_companies_house = companies_house.query(
cleaning={
"company_name": f"lower({companies_house.f('company_name')})",
"company_number": companies_house.f("company_number"),
}
).deduper(
name="dedupe_companies_house",
description="Deduplicate Companies House companies",
model_class=NaiveDeduper,
model_settings={
"unique_fields": ["company_name", "company_number"],
},
)
dedupe_exporters = exporters.query(
cleaning={
"company_name": f"lower({exporters.f('company_name')})",
"postcode": exporters.f("postcode"),
}
).deduper(
name="dedupe_exporters",
description="Deduplicate exporter records",
model_class=NaiveDeduper,
model_settings={
"unique_fields": ["company_name", "postcode"],
},
)
5. Creating resolvers and a linker¶
To create a resolver, call
resolver() on a model and, if
needed, pass extra model inputs that should contribute to the same clustering
policy. See Resolver for the full
argument list.
Resolvers can sit between model layers. Call
query() on a resolver when you
want the next model layer to work from a resolved entity view.
from matchbox.client.models.linkers import DeterministicLinker
from matchbox.client.resolvers import Components, ComponentsSettings
resolve_companies_house = dedupe_companies_house.resolver(
name="resolve_companies_house",
description="Resolve Companies House duplicates",
resolver_class=Components,
resolver_settings=ComponentsSettings(
thresholds={dedupe_companies_house.name: 1.0}
),
)
resolve_exporters = dedupe_exporters.resolver(
name="resolve_exporters",
description="Resolve exporter duplicates",
resolver_class=Components,
# no resolver_settings defaults to {}
# the Components methodology with default to thresholds of 0.0
)
link_resolved = resolve_companies_house.query().linker(
resolve_exporters.query(),
name="link_resolved_companies",
description="Link resolved company views",
model_class=DeterministicLinker,
model_settings={
"left_id": "id",
"right_id": "id",
"comparisons": [
"l.company_name = r.company_name and l.postcode = r.postcode"
],
},
)
companies_resolver = link_resolved.resolver(
dedupe_companies_house,
dedupe_exporters,
name="companies_resolver",
description="Resolve company entities across both sources",
resolver_class=Components,
resolver_settings=ComponentsSettings(
thresholds={
dedupe_companies_house.name: 1.0,
dedupe_exporters.name: 1.0,
link_resolved.name: 0.8,
}
),
)
This pattern separates scoring from clustering.
- Models focus on generating candidate matches and scores.
- Resolvers decide which model edges are strong enough to merge and how several model outputs work together.
- A DAG can include several resolvers over the same model graph, each representing a different clustering policy.
6. Running and publishing the DAG¶
Run every step in execution order with
run_and_sync().
Publish the run with set_default()
when other users and services should query it.
set_default() marks the run as the published version that load_default() retrieves. It requires all steps in the DAG to be reachable from a single final resolver. Setting a run as default deletes the previous default.
Visualising execution¶
Use dag.draw() to inspect the pipeline, or dag.draw(mode="list") to see the
execution order.
7. Querying resolved entities¶
Use get_matches() to fetch
ResolverMatches for the default
resolver, or for a named resolver if you pass one explicitly.
You can also call query() on a
resolver and then data() if you want
the tabular entity view behind that resolver.
8. Re-running a published DAG¶
You can load a stored DAG from the server with
load_default() or
load_pending(), attach a warehouse
client with set_client(), and then
call new_run() to reuse the same
structure for another run.