Skip to content

DAGs

matchbox.client.dags

Objects to define a DAG which indexes, deduplicates and links data.

Classes:

  • Step

    Abstract base class defining what a step needs to support.

  • StepInput

    Input to a DAG step, generated by a previous node in the DAG.

  • IndexStep

    Index step.

  • ModelStep

    Base class for model steps.

  • DedupeStep

    Deduplication step.

  • LinkStep

    Linking step.

  • DAG

    Self-sufficient pipeline of indexing, deduping and linking steps.

Step

Bases: BaseModel, ABC

Abstract base class defining what a step needs to support.

Methods:

  • run

    Run the step.

Attributes:

name instance-attribute

name: str

sources class-attribute instance-attribute

sources: set[str] = Field(default_factory=set)

last_run class-attribute instance-attribute

last_run: datetime | None = Field(default=None)

inputs abstractmethod property

inputs: list[StepInput]

Return all inputs to this step.

run abstractmethod

run() -> None

Run the step.

StepInput

Bases: BaseModel

Input to a DAG step, generated by a previous node in the DAG.

Methods:

Attributes:

prev_node instance-attribute

prev_node: Step

select instance-attribute

select: dict[Source, list[str]]

cleaners class-attribute instance-attribute

cleaners: dict[str, dict[str, Any]] = {}

batch_size class-attribute instance-attribute

batch_size: int | None = None

threshold class-attribute instance-attribute

threshold: float | None = None

name property

name: str

Resolution name for node generating this input for the next step.

validate_all_input

validate_all_input() -> StepInput

Verify select statement is valid given previous node.

IndexStep

Bases: Step

Index step.

Methods:

Attributes:

source instance-attribute

source: Source

batch_size class-attribute instance-attribute

batch_size: int | None = Field(default=None)

inputs property

inputs: list[StepInput]

Return all inputs to this step.

name instance-attribute

name: str

sources class-attribute instance-attribute

sources: set[str] = Field(default_factory=set)

last_run class-attribute instance-attribute

last_run: datetime | None = Field(default=None)

source_to_atrtibutes

source_to_atrtibutes(
    data: dict[str, Any],
) -> dict[str, Any]

Convert source to name and sources attributes.

run

run() -> None

Run indexing step.

ModelStep

Bases: Step

Base class for model steps.

Methods:

  • init_sources

    Add sources inherited from all inputs.

  • query

    Retrieve data for declared step input.

  • run

    Run the step.

Attributes:

description instance-attribute

description: str

left instance-attribute

left: StepInput

settings instance-attribute

settings: dict[str, Any]

truth instance-attribute

truth: float

name instance-attribute

name: str

sources class-attribute instance-attribute

sources: set[str] = Field(default_factory=set)

last_run class-attribute instance-attribute

last_run: datetime | None = Field(default=None)

inputs abstractmethod property

inputs: list[StepInput]

Return all inputs to this step.

init_sources

init_sources() -> ModelStep

Add sources inherited from all inputs.

query

query(step_input: StepInput) -> DataFrame

Retrieve data for declared step input.

Parameters:

  • step_input
    (StepInput) –

    Declared input to this DAG step.

Returns:

  • DataFrame

    Pandas dataframe with retrieved results.

run abstractmethod

run() -> None

Run the step.

DedupeStep

Bases: ModelStep

Deduplication step.

Methods:

  • run

    Run full deduping pipeline and store results.

  • init_sources

    Add sources inherited from all inputs.

  • query

    Retrieve data for declared step input.

Attributes:

model_class instance-attribute

model_class: type[Deduper]

inputs property

inputs: list[StepInput]

Return all inputs to this step.

name instance-attribute

name: str

sources class-attribute instance-attribute

sources: set[str] = Field(default_factory=set)

last_run class-attribute instance-attribute

last_run: datetime | None = Field(default=None)

description instance-attribute

description: str

left instance-attribute

left: StepInput

settings instance-attribute

settings: dict[str, Any]

truth instance-attribute

truth: float

run

run() -> None

Run full deduping pipeline and store results.

init_sources

init_sources() -> ModelStep

Add sources inherited from all inputs.

query

query(step_input: StepInput) -> DataFrame

Retrieve data for declared step input.

Parameters:

  • step_input
    (StepInput) –

    Declared input to this DAG step.

Returns:

  • DataFrame

    Pandas dataframe with retrieved results.

LinkStep

Bases: ModelStep

Linking step.

Methods:

  • run

    Run whole linking step.

  • init_sources

    Add sources inherited from all inputs.

  • query

    Retrieve data for declared step input.

Attributes:

model_class instance-attribute

model_class: type[Linker]

right instance-attribute

right: StepInput

inputs property

inputs: list[StepInput]

Return all StepInputs to this step.

name instance-attribute

name: str

sources class-attribute instance-attribute

sources: set[str] = Field(default_factory=set)

last_run class-attribute instance-attribute

last_run: datetime | None = Field(default=None)

description instance-attribute

description: str

left instance-attribute

left: StepInput

settings instance-attribute

settings: dict[str, Any]

truth instance-attribute

truth: float

run

run() -> None

Run whole linking step.

init_sources

init_sources() -> ModelStep

Add sources inherited from all inputs.

query

query(step_input: StepInput) -> DataFrame

Retrieve data for declared step input.

Parameters:

  • step_input
    (StepInput) –

    Declared input to this DAG step.

Returns:

  • DataFrame

    Pandas dataframe with retrieved results.

DAG

DAG()

Self-sufficient pipeline of indexing, deduping and linking steps.

Methods:

  • add_sources

    Add sources to DAG.

  • add_steps

    Add dedupers and linkers to DAG, and register sources available to steps.

  • prepare

    Determine order of execution of steps.

  • draw

    Create a string representation of the DAG as a tree structure.

  • run

    Run entire DAG.

Attributes:

nodes instance-attribute

nodes: dict[str, Step] = {}

graph instance-attribute

graph: dict[str, list[str]] = {}

sequence instance-attribute

sequence: list[str] = []

add_sources

add_sources(
    *sources: Source, batch_size: int | None = None
) -> tuple[IndexStep]

Add sources to DAG.

Parameters:

  • sources
    (Source, default: () ) –

    All sources to add.

  • batch_size
    (int | None, default: None ) –

    Batch size for indexing.

add_steps

add_steps(*steps: Step) -> None

Add dedupers and linkers to DAG, and register sources available to steps.

Parameters:

  • steps
    (Step, default: () ) –

    Dedupe and link steps.

prepare

prepare() -> None

Determine order of execution of steps.

draw

draw(
    start_time: datetime | None = None,
    doing: str | None = None,
    skipped: list[str] | None = None,
) -> str

Create a string representation of the DAG as a tree structure.

If start_time is provided, it will show the status of each node based on the last run time. The status indicators are:

  • ✅ Done
  • 🔄 Working
  • ⏸️ Awaiting
  • ⏭️ Skipped

Parameters:

  • start_time
    (datetime | None, default: None ) –

    Start time of the DAG run. Used to calculate node status.

  • doing
    (str | None, default: None ) –

    Name of the node currently being processed (if any).

  • skipped
    (list[str] | None, default: None ) –

    List of node names that were skipped.

Returns:

  • str

    String representation of the DAG with status indicators.

run

run(start: str | None = None)

Run entire DAG.

Parameters:

  • start
    (str | None, default: None ) –

    Name of the step to start from (if not from the beginning)