DAGs
matchbox.client.dags
¶
Objects to define a DAG which indexes, deduplicates and links data.
Classes:
-
Step
–Abstract base class defining what a step needs to support.
-
StepInput
–Input to a DAG step, generated by a previous node in the DAG.
-
IndexStep
–Index step.
-
ModelStep
–Base class for model steps.
-
DedupeStep
–Deduplication step.
-
LinkStep
–Linking step.
-
DAG
–Self-sufficient pipeline of indexing, deduping and linking steps.
Step
¶
StepInput
¶
Bases: BaseModel
Input to a DAG step, generated by a previous node in the DAG.
Methods:
-
validate_all_input
–Verify select statement is valid given previous node.
Attributes:
-
prev_node
(Step
) – -
select
(dict[SourceConfig, list[str]]
) – -
cleaners
(dict[str, dict[str, Any]]
) – -
batch_size
(int | None
) – -
threshold
(float | None
) – -
name
(str
) –Resolution name for node generating this input for the next step.
IndexStep
¶
Bases: Step
Index step.
Methods:
-
source_to_atrtibutes
–Convert source config to name and sources attributes.
-
run
–Run indexing step.
Attributes:
-
source_config
(SourceConfig
) – -
batch_size
(int | None
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
-
name
(ResolutionName
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) –
source_to_atrtibutes
¶
Convert source config to name and sources attributes.
ModelStep
¶
Bases: Step
Base class for model steps.
Methods:
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
-
run
–Run the step.
Attributes:
-
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) – -
name
(ResolutionName
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
query
¶
query(step_input: StepInput) -> DataFrame
DedupeStep
¶
Bases: ModelStep
Deduplication step.
Methods:
-
run
–Run full deduping pipeline and store results.
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
Attributes:
-
model_class
(type[Deduper]
) – -
inputs
(list[StepInput]
) –Return all inputs to this step.
-
name
(ResolutionName
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) –
query
¶
query(step_input: StepInput) -> DataFrame
LinkStep
¶
Bases: ModelStep
Linking step.
Methods:
-
run
–Run whole linking step.
-
init_sources
–Add sources inherited from all inputs.
-
query
–Retrieve data for declared step input.
Attributes:
-
model_class
(type[Linker]
) – -
right
(StepInput
) – -
inputs
(list[StepInput]
) –Return all
StepInputs
to this step. -
name
(ResolutionName
) – -
sources
(set[str]
) – -
last_run
(datetime | None
) – -
description
(str
) – -
left
(StepInput
) – -
settings
(dict[str, Any]
) – -
truth
(float
) –
query
¶
query(step_input: StepInput) -> DataFrame
DAG
¶
Self-sufficient pipeline of indexing, deduping and linking steps.
Methods:
-
add_sources
–Add sources to DAG.
-
add_steps
–Add dedupers and linkers to DAG, and register sources available to steps.
-
prepare
–Determine order of execution of steps.
-
draw
–Create a string representation of the DAG as a tree structure.
-
run
–Run entire DAG.
Attributes:
-
nodes
(dict[ResolutionName, Step]
) – -
graph
(dict[ResolutionName, list[ResolutionName]]
) – -
sequence
(list[ResolutionName]
) –
add_sources
¶
add_sources(
*source_configs: SourceConfig,
batch_size: int | None = None,
) -> tuple[IndexStep]
Add sources to DAG.
Parameters:
-
source_configs
¶SourceConfig
, default:()
) –All sources to add.
-
batch_size
¶int | None
, default:None
) –Batch size for indexing.
draw
¶
draw(
start_time: datetime | None = None,
doing: ResolutionName | None = None,
skipped: list[ResolutionName] | None = None,
) -> str
Create a string representation of the DAG as a tree structure.
If start_time
is provided, it will show the status of each node
based on the last run time. The status indicators are:
- ✅ Done
- 🔄 Working
- ⏸️ Awaiting
- ⏭️ Skipped
Parameters:
-
start_time
¶datetime | None
, default:None
) –Start time of the DAG run. Used to calculate node status.
-
doing
¶ResolutionName | None
, default:None
) –Name of the node currently being processed (if any).
-
skipped
¶list[ResolutionName] | None
, default:None
) –List of node names that were skipped.
Returns:
-
str
–String representation of the DAG with status indicators.
run
¶
run(
start: ResolutionName | None = None,
finish: ResolutionName | None = None,
)
Run entire DAG.
Parameters:
-
start
¶ResolutionName | None
, default:None
) –Name of the step to start from (if not from the beginning)
-
finish
¶ResolutionName | None
, default:None
) –Name of the step to finish at (if not to the end)