DAGs
matchbox.client.dags
¶
Objects to define a DAG which indexes, deduplicates and links data.
Classes:
-
Step
–Abstract base class defining what a step needs to support.
-
StepInput
–Input to a DAG step, generated by a previous node in the DAG.
-
IndexStep
–Index step.
-
ModelStep
–Base class for model steps.
-
DedupeStep
–Deduplication step.
-
LinkStep
–Linking step.
-
DAG
–Self-sufficient pipeline of indexing, deduping and linking steps.
Step
¶
StepInput
¶
Bases: BaseModel
Input to a DAG step, generated by a previous node in the DAG.
Methods:
-
validate_all_input
–Verify select statement is valid given previous node.
Attributes:
-
prev_node
(Step
) – -
select
(dict[Source, list[str]]
) – -
cleaners
(dict[str, dict[str, Any]]
) – -
batch_size
(int | None
) – -
threshold
(float | None
) – -
name
(str
) –Resolution name for node generating this input for the next step.
IndexStep
¶
Bases: Step
Index step.
Methods:
-
source_to_atrtibutes
–Convert source to name and sources attributes.
-
run
–Run indexing step.
Attributes:
-
source
(Source
) – -
batch_size
(int | None
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
-
name
(str
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) –
source_to_atrtibutes
¶
Convert source to name and sources attributes.
ModelStep
¶
Bases: Step
Base class for model steps.
Methods:
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
-
run
–Run the step.
Attributes:
-
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) – -
name
(str
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
query
¶
query(step_input: StepInput) -> DataFrame
DedupeStep
¶
Bases: ModelStep
Deduplication step.
Methods:
-
run
–Run full deduping pipeline and store results.
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
Attributes:
-
model_class
(type[Deduper]
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
-
name
(str
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) –
query
¶
query(step_input: StepInput) -> DataFrame
LinkStep
¶
Bases: ModelStep
Linking step.
Methods:
-
run
–Run whole linking step.
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
Attributes:
-
model_class
(type[Linker]
) – -
right
(StepInput
) – -
inputs
(list[StepInput]
) –Return all
StepInputs
to this step. -
name
(str
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) –
query
¶
query(step_input: StepInput) -> DataFrame
DAG
¶
Self-sufficient pipeline of indexing, deduping and linking steps.
Methods:
-
add_sources
–Add sources to DAG.
-
add_steps
–Add dedupers and linkers to DAG, and register sources available to steps.
-
prepare
–Determine order of execution of steps.
-
draw
–Create a string representation of the DAG as a tree structure.
-
run
–Run entire DAG.
Attributes:
draw
¶
draw(
start_time: datetime | None = None,
doing: str | None = None,
skipped: list[str] | None = None,
) -> str
Create a string representation of the DAG as a tree structure.
If start_time
is provided, it will show the status of each node
based on the last run time. The status indicators are:
- ✅ Done
- 🔄 Working
- ⏸️ Awaiting
- ⏭️ Skipped
Parameters:
-
start_time
¶datetime | None
, default:None
) –Start time of the DAG run. Used to calculate node status.
-
doing
¶str | None
, default:None
) –Name of the node currently being processed (if any).
-
skipped
¶list[str] | None
, default:None
) –List of node names that were skipped.
Returns:
-
str
–String representation of the DAG with status indicators.