Skip to content

DAGs

matchbox.client.dags

Objects to define a DAG which indexes, deduplicates and links data.

Classes:

  • DAGNodeExecutionStatus

    Enumeration of node execution statuses.

  • DAG

    Self-sufficient pipeline of indexing, deduping and linking steps.

Attributes:

DAGExecutionStatus module-attribute

DAGExecutionStatus: TypeAlias = dict[str, DAGNodeExecutionStatus]

DAGNodeExecutionStatus

Bases: StrEnum

Enumeration of node execution statuses.

Attributes:

SKIPPED class-attribute instance-attribute

SKIPPED = 'skipped'

DONE class-attribute instance-attribute

DONE = 'done'

DOING class-attribute instance-attribute

DOING = 'doing'

DAG

DAG(name: str)

Self-sufficient pipeline of indexing, deduping and linking steps.

Methods:

  • list_all

    List available DAG names on the server.

  • source

    Create Source and add it to the DAG.

  • model

    Create Model and add it to the DAG.

  • add_resolution

    Convert a resolution from the server to a Source or Model and add to DAG.

  • get_source

    Get a source by name from the DAG.

  • get_model

    Get a model by name from the DAG.

  • query

    Create Query object.

  • draw

    Create a string representation of the DAG as a tree structure.

  • new_run

    Start a new run.

  • set_client

    Assign a client to all sources at once.

  • load_default

    Attach to default run in this collection, loading all DAG nodes.

  • load_pending

    Attach to the pending run in this collection, loading all DAG nodes.

  • run_and_sync

    Run entire DAG and send results to server.

  • set_default

    Set the current run as the default for the collection.

  • lookup_key

    Matches IDs against the selected backend.

  • resolve

    Returns ResolvedMatches, optionally filtering.

Attributes:

name instance-attribute

nodes instance-attribute

nodes: dict[ResolutionName, Source | Model] = {}

graph instance-attribute

run property writable

run: RunID

Return run ID if available, else error.

final_steps property

final_steps: list[Source | Model]

Returns all apex nodes in the DAG.

Returns:

final_step property

final_step: Source | Model

Returns the root node in the DAG.

Returns:

Raises:

  • ValueError

    If the DAG does not have exactly one final step

list_all classmethod

list_all() -> list[CollectionName]

List available DAG names on the server.

source

source(*args: Any, **kwargs: Any) -> Source

Create Source and add it to the DAG.

model

model(*args: Any, **kwargs: Any) -> Model

Create Model and add it to the DAG.

add_resolution

add_resolution(name: ResolutionName, resolution: Resolution) -> None

Convert a resolution from the server to a Source or Model and add to DAG.

get_source

get_source(name: ResolutionName) -> Source

Get a source by name from the DAG.

Parameters:

Returns:

  • Source

    The Source object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Source.

get_model

get_model(name: ResolutionName) -> Model

Get a model by name from the DAG.

Parameters:

Returns:

  • Model

    The Model object.

Raises:

  • ValueError

    If the name doesn’t exist in the DAG or isn’t a Model.

query

query(*args: Any, **kwargs: Any) -> Query

Create Query object.

draw

draw(status: DAGExecutionStatus | None = None) -> str

Create a string representation of the DAG as a tree structure.

If status is provided, it will show the status of each node. The status indicators are:

  • ✅ Done
  • 🔄 Working
  • ⏸️ Awaiting
  • ⏭️ Skipped

Parameters:

  • status
    (DAGExecutionStatus | None, default: None ) –

    Object describing the status of each node.

Returns:

  • str

    String representation of the DAG with status indicators.

new_run

new_run() -> Self

Start a new run.

set_client

set_client(client: Any) -> Self

Assign a client to all sources at once.

load_default

load_default() -> Self

Attach to default run in this collection, loading all DAG nodes.

load_pending

load_pending() -> Self

Attach to the pending run in this collection, loading all DAG nodes.

Pending is defined as the last non-default run.

run_and_sync

run_and_sync(start: str | None = None, finish: str | None = None, low_memory: bool = False, batch_size: int | None = None, profile: bool = False) -> None

Run entire DAG and send results to server.

Parameters:

  • start
    (str | None, default: None ) –

    Name of first node to run

  • finish
    (str | None, default: None ) –

    Name of last node to run

  • low_memory
    (bool, default: False ) –

    Whether to delete data for each node after it is run

  • batch_size
    (int | None, default: None ) –

    The size used for internal batching. Overrides environment variable if set.

  • profile
    (bool, default: False ) –

    whether to log to INFO level the memory usage

set_default

set_default() -> None

Set the current run as the default for the collection.

Makes it immutable, then moves the default pointer to it.

lookup_key

lookup_key(from_source: str, to_sources: list[str], key: str, threshold: float | None = None) -> dict[str, list[str]]

Matches IDs against the selected backend.

Parameters:

  • from_source
    (str) –

    Name of source the provided key belongs to

  • to_sources
    (list[str]) –

    Names of sources to find keys in

  • key
    (str) –

    The value to match from the source. Usually a primary key

  • threshold
    (optional, default: None ) –

    The threshold to use for creating clusters. If None, uses the resolutions’ default threshold If a float, uses that threshold for the specified resolution, and the resolution’s cached thresholds for its ancestors

Returns:

  • dict[str, list[str]]

    Dictionary mapping source names to list of keys within that source.

Examples:

dag.lookup_key(
    from_source="companies_house",
    to_sources=[
        "datahub_companies",
        "hmrc_exporters",
    ]
    key="8534735",
)

resolve

resolve(node: ResolutionName | None = None, source_filter: list[str] | None = None, location_names: list[str] | None = None, threshold: float | None = None) -> ResolvedMatches

Returns ResolvedMatches, optionally filtering.

Parameters:

  • node
    (ResolutionName | None, default: None ) –

    Name of source or model to resolve within DAG. If not provided, will look for an apex.

  • source_filter
    (list[str] | None, default: None ) –

    An optional list of source resolution names to filter by.

  • location_names
    (list[str] | None, default: None ) –

    An optional list of location names to filter by.

  • threshold
    (optional, default: None ) –

    The threshold to use for creating clusters. If None, uses the resolutions’ default threshold If a float, uses that threshold for the specified resolution, and the resolution’s cached thresholds for its ancestors