Skip to content

DAGs

matchbox.client.dags

Objects to define a DAG which indexes, deduplicates and links data.

Classes:

  • DAGNodeExecutionStatus

    Enumeration of node execution statuses.

  • DAG

    Self-sufficient pipeline of indexing, deduping and linking steps.

Attributes:

DAGExecutionStatus module-attribute

DAGExecutionStatus: TypeAlias = dict[str, DAGNodeExecutionStatus]

DAGNodeExecutionStatus

Bases: StrEnum

Enumeration of node execution statuses.

Attributes:

SKIPPED class-attribute instance-attribute

SKIPPED = 'skipped'

DONE class-attribute instance-attribute

DONE = 'done'

DOING class-attribute instance-attribute

DOING = 'doing'

DAG

DAG(name: str)

Self-sufficient pipeline of indexing, deduping and linking steps.

Methods:

  • list_all

    List available DAG names on the server.

  • source

    Create Source and add it to the DAG.

  • model

    Create Model and add it to the DAG.

  • add_resolution

    Convert a resolution from the server to a Source or Model and add to DAG.

  • get_source

    Get a source by name from the DAG.

  • get_model

    Get a model by name from the DAG.

  • query

    Create Query object.

  • draw

    Create a string representation of the DAG as a tree structure.

  • new_run

    Start a new run.

  • set_client

    Assign a client to all sources at once.

  • load_default

    Attach to default run in this collection, loading all DAG nodes.

  • load_pending

    Attach to the pending run in this collection, loading all DAG nodes.

  • run_and_sync

    Run entire DAG and send results to server.

  • set_default

    Set the current run as the default for the collection.

  • lookup_key

    Matches IDs against the selected backend.

  • resolve

    Returns ResolvedMatches, optionally filtering.

Attributes:

name instance-attribute

nodes instance-attribute

nodes: dict[ResolutionName, Source | Model] = {}

graph instance-attribute

run property writable

run: RunID

Return run ID if available, else error.

final_steps property

final_steps: list[Source | Model]

Returns all apex nodes in the DAG.

Returns:

final_step property

final_step: Source | Model

Returns the root node in the DAG.

Returns:

Raises:

  • ValueError

    If the DAG does not have exactly one final step

list_all classmethod

list_all() -> list[CollectionName]

List available DAG names on the server.

source

source(*args: Any, **kwargs: Any) -> Source

Create Source and add it to the DAG.

model

model(*args: Any, **kwargs: Any) -> Model

Create Model and add it to the DAG.

add_resolution

add_resolution(name: ResolutionName, resolution: Resolution) -> None

Convert a resolution from the server to a Source or Model and add to DAG.

get_source

get_source(name: ResolutionName) -> Source

Get a source by name from the DAG.

Parameters:

Returns:

  • Source

    The Source object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Source.

get_model

get_model(name: ResolutionName) -> Model

Get a model by name from the DAG.

Parameters:

Returns:

  • Model

    The Model object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Model.

query

query(*args: Any, **kwargs: Any) -> Query

Create Query object.

draw

draw(status: DAGExecutionStatus | None = None) -> str

Create a string representation of the DAG as a tree structure.

If status is provided, it will show the status of each node. The status indicators are:

  • ✅ Done
  • 🔄 Working
  • ⏸️ Awaiting
  • ⏭️ Skipped

Parameters:

  • status
    (DAGExecutionStatus | None, default: None ) –

    Object describing the status of each node.

Returns:

  • str

    String representation of the DAG with status indicators.

new_run

new_run() -> Self

Start a new run.

set_client

set_client(client: Any) -> Self

Assign a client to all sources at once.

load_default

load_default() -> Self

Attach to default run in this collection, loading all DAG nodes.

load_pending

load_pending() -> Self

Attach to the pending run in this collection, loading all DAG nodes.

Pending is defined as the last non-default run.

run_and_sync

run_and_sync(start: str | None = None, finish: str | None = None) -> None

Run entire DAG and send results to server.

set_default

set_default() -> None

Set the current run as the default for the collection.

Makes it immutable, then moves the default pointer to it.

lookup_key

lookup_key(from_source: str, to_sources: list[str], key: str, threshold: int | None = None) -> dict[str, list[str]]

Matches IDs against the selected backend.

Parameters:

  • from_source
    (str) –

    Name of source the provided key belongs to

  • to_sources
    (list[str]) –

    Names of sources to find keys in

  • key
    (str) –

    The value to match from the source. Usually a primary key

  • threshold
    (optional, default: None ) –

    The threshold to use for creating clusters. If None, uses the resolutions’ default threshold If an integer, uses that threshold for the specified resolution, and the resolution’s cached thresholds for its ancestors

Returns:

  • dict[str, list[str]]

    Dictionary mapping source names to list of keys within that source.

Examples:

dag.lookup_key(
    from_source="companies_house",
    to_sources=[
        "datahub_companies",
        "hmrc_exporters",
    ]
    key="8534735",
)

resolve

resolve(node: ResolutionName | None = None, source_filter: list[str] | None = None, location_names: list[str] | None = None) -> ResolvedMatches

Returns ResolvedMatches, optionally filtering.

Parameters:

  • node
    (ResolutionName | None, default: None ) –

    Name of source or model to resolve within DAG. If not provided, will look for an apex.

  • source_filter
    (list[str] | None, default: None ) –

    An optional list of source resolution names to filter by.

  • location_names
    (list[str] | None, default: None ) –

    An optional list of location names to filter by.