Skip to content

DAGs

matchbox.client.dags

Objects to define a DAG which indexes, deduplicates and links data.

Classes:

  • DAG

    Self-sufficient pipeline of indexing, deduping and linking steps.

DAG

DAG(name: str)

Self-sufficient pipeline of indexing, deduping and linking steps.

Methods:

  • source

    Create Source and add it to the DAG.

  • model

    Create Model and add it to the DAG.

  • add_resolution

    Convert a resolution to a Source or Model and add to DAG.

  • get_source

    Get a source by name from the DAG.

  • get_model

    Get a model by name from the DAG.

  • query

    Create Query object.

  • draw

    Create a string representation of the DAG as a tree structure.

  • new_run

    Start a new run.

  • load_default

    Attach to default run in this collection, loading all DAG nodes.

  • run_and_sync

    Run entire DAG and send results to server.

  • set_default

    Set the current run as the default for the collection.

  • lookup_key

    Matches IDs against the selected backend.

  • extract_lookup

    Return matchbox IDs to source key mapping, optionally filtering.

Attributes:

name instance-attribute

nodes instance-attribute

nodes: dict[ResolutionName, Source | Model] = {}

graph instance-attribute

run property writable

run: RunID

Return run ID if available, else error.

final_step property

final_step: Source | Model

Returns the root node in the DAG.

Returns:

Raises:

  • ValueError

    If the DAG does not have a final step

source

source(*args, **kwargs) -> Source

Create Source and add it to the DAG.

model

model(*args, **kwargs) -> Model

Create Model and add it to the DAG.

add_resolution

add_resolution(name: ResolutionName, resolution: Resolution, location: Location) -> None

Convert a resolution to a Source or Model and add to DAG.

get_source

get_source(name: ResolutionName) -> Source

Get a source by name from the DAG.

Parameters:

Returns:

  • Source

    The Source object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Source.

get_model

get_model(name: ResolutionName) -> Model

Get a model by name from the DAG.

Parameters:

Returns:

  • Model

    The Model object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Model.

query

query(*args, **kwargs) -> Query

Create Query object.

draw

draw(start_time: datetime | None = None, doing: str | None = None, skipped: list[str] | None = None) -> str

Create a string representation of the DAG as a tree structure.

If start_time is provided, it will show the status of each node based on the last run time. The status indicators are:

  • ✅ Done
  • 🔄 Working
  • ⏸️ Awaiting
  • ⏭️ Skipped

Parameters:

  • start_time
    (datetime | None, default: None ) –

    Start time of the DAG run. Used to calculate node status.

  • doing
    (str | None, default: None ) –

    Name of the node currently being processed (if any).

  • skipped
    (list[str] | None, default: None ) –

    List of node names that were skipped.

Returns:

  • str

    String representation of the DAG with status indicators.

new_run

new_run() -> Self

Start a new run.

load_default

load_default(location: Location) -> Self

Attach to default run in this collection, loading all DAG nodes.

Parameters:

  • location
    (Location) –

    The Location object that will be attached to nodes coming from default Run. Can be updated per-source after instantiation if necessary.

run_and_sync

run_and_sync(full_rerun: bool = False, start: str | None = None, finish: str | None = None)

Run entire DAG and send results to server.

set_default

set_default() -> None

Set the current run as the default for the collection.

Makes it immutable, then moves the default pointer to it.

lookup_key

lookup_key(from_source: str, to_sources: list[str], key: str, threshold: int | None = None) -> dict[str, list[str]]

Matches IDs against the selected backend.

Parameters:

  • from_source
    (str) –

    Name of source the provided key belongs to

  • to_sources
    (list[str]) –

    Names of sources to find keys in

  • key
    (str) –

    The value to match from the source. Usually a primary key

  • threshold
    (optional, default: None ) –

    The threshold to use for creating clusters. If None, uses the resolutions’ default threshold If an integer, uses that threshold for the specified resolution, and the resolution’s cached thresholds for its ancestors

Returns:

  • dict[str, list[str]]

    Dictionary mapping source names to list of keys within that source.

Examples:

dag.lookup_key(
    from_source="companies_house",
    to_sources=[
        "datahub_companies",
        "hmrc_exporters",
    ]
    key="8534735",
)

extract_lookup

extract_lookup(source_filter: list[str] | None = None, location_names: list[str] | None = None) -> Table

Return matchbox IDs to source key mapping, optionally filtering.

Parameters:

  • source_filter
    (list[str] | None, default: None ) –

    An optional list of source resolution names to filter by.

  • location_names
    (list[str] | None, default: None ) –

    An optional list of location names to filter by.