Skip to content

Sources

matchbox.client.sources

Interface to locations where source data is stored.

Classes:

Functions:

Location

Location(name: str, client: Any)

Bases: ABC

A location for a data source.

Methods:

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Validate ET logic against this location’s query language.

  • infer_types

    Extract all data types from the ET logic.

  • execute

    Execute ET logic against location and return batches.

  • from_config

    Initialise location from a location config and an appropriate client.

Attributes:

config instance-attribute

config = LocationConfig(type=location_type, name=name)

client instance-attribute

client = client

location_type abstractmethod property

location_type: LocationType

Output location type string.

connect abstractmethod

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform abstractmethod

validate_extract_transform(extract_transform: str) -> bool

Validate ET logic against this location’s query language.

Raises:

infer_types abstractmethod

infer_types(extract_transform: str) -> dict[str, DataTypes]

Extract all data types from the ET logic.

execute abstractmethod

execute(extract_transform: str, batch_size: int | None = None, rename: dict[str, str] | Callable | None = None, return_type: QueryReturnType = POLARS, keys: tuple[str, list[str]] | None = None) -> Iterator[QueryReturnClass]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (QueryReturnType, default: POLARS ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (tuple[str, list[str]] | None, default: None ) –

    Rule to only retrieve rows by specific keys. The key of the dictionary is a field name on which to filter. Filters source entries where the key field is in the dict values.

Raises:

from_config

from_config(config: LocationConfig, client: Any) -> Self

Initialise location from a location config and an appropriate client.

RelationalDBLocation

RelationalDBLocation(name: str, client: Any)

Bases: Location

A location for a relational database.

Methods:

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Check that the SQL statement only contains a single data-extracting command.

  • infer_types

    Extract all data types from the ET logic.

  • execute

    Execute ET logic against location and return batches.

  • from_config

    Initialise location from a location config and an appropriate client.

Attributes:

client instance-attribute

client: Engine

location_type class-attribute instance-attribute

location_type: LocationType = RDBMS

Output location type string.

config instance-attribute

config = LocationConfig(type=location_type, name=name)

connect

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform

validate_extract_transform(extract_transform: str) -> bool

Check that the SQL statement only contains a single data-extracting command.

We are NOT attempting a full sanitisation of the SQL statement

Validation is done purely to stop accidental mistakes, not malicious actors
Users should only run indexing using SourceConfigs they trust and have read,
using least privilege credentials

Parameters:

  • extract_transform
    (str) –

    The SQL statement to validate

Returns:

  • bool ( bool ) –

    True if the SQL statement is valid

Raises:

infer_types

infer_types(extract_transform: str) -> dict[str, DataTypes]

Extract all data types from the ET logic.

execute

execute(extract_transform: str, batch_size: int | None = None, rename: dict[str, str] | Callable | None = None, return_type: QueryReturnType = POLARS, keys: tuple[str, list[str]] | None = None, schema_overrides: dict[str, DataType] | None = None) -> Generator[QueryReturnClass, None, None]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (QueryReturnType, default: POLARS ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (tuple[str, list[str]] | None, default: None ) –

    Rule to only retrieve rows by specific keys. The key of the dictionary is a field name on which to filter. Filters source entries where the key field is in the dict values.

Raises:

from_config

from_config(config: LocationConfig, client: Any) -> Self

Initialise location from a location config and an appropriate client.

Source

Client-side wrapper for source configs.

Parameters:

  • dag

    (DAG) –

    DAG containing the source.

  • location

    (Location) –

    The location where the source data is stored.

  • name

    (str) –

    The name of the source.

  • description

    (str | None, default: None ) –

    An optional description of the source.

  • extract_transform

    (str) –

    The extract/transform logic to apply to the source data.

  • key_field

    (str | SourceField) –

    The name of the field to use as the key, or a SourceField instance defining the key field. This is the unique identifier we’ll use to refer to matched data in the source.

  • index_fields

    (list[str] | list[SourceField]) –

    The names of the fields to use as index fields, or a list of SourceField instances defining the index fields. These are the fields you plan to match on.

  • infer_types

    (bool, default: False ) –

    Whether to infer data types for the fields from the source. If False, you must provide SourceField instances for key_field and index_fields.

Methods:

  • to_resolution

    Convert to Resolution for API calls.

  • from_resolution

    Reconstruct from Resolution.

  • fetch

    Applies the extract/transform logic to the source and returns the results.

  • run

    Hash a dataset from its warehouse, ready to be inserted, and cache hashes.

  • qualify_field

    Qualify field names with the source name.

  • f

    Qualify one or more field names with the source name.

  • sync

    Send the source config and hashes to the server.

  • query

    Generate a query for this source.

Attributes:

last_run instance-attribute

last_run: datetime | None = None

location instance-attribute

location = location

dag instance-attribute

dag = dag

name instance-attribute

name = name

description instance-attribute

description = description

extract_transform instance-attribute

extract_transform = extract_transform

key_field instance-attribute

key_field = SourceField(name=key_field, type=STRING)

index_fields instance-attribute

index_fields = tuple((remote_fields[field]) for field in index_fields)

config property

config: SourceConfig

Generate SourceConfig from Source.

dependencies property

dependencies: list[ResolutionPath]

Returns all resolution paths this source needs.

Provided to match the interface of Model objects.

prefix property

prefix: str

Get the prefix for the source.

qualified_key property

qualified_key: str

Get the qualified key for the source.

qualified_index_fields property

qualified_index_fields: list[str]

Get the qualified index fields for the source.

to_resolution

to_resolution() -> Resolution

Convert to Resolution for API calls.

from_resolution classmethod

from_resolution(resolution: Resolution, resolution_name: str, dag: DAG, location: Location) -> Source

Reconstruct from Resolution.

fetch

fetch(qualify_names: bool = False, batch_size: int | None = None, return_type: QueryReturnType = POLARS, keys: list[str] | None = None) -> Generator[QueryReturnClass, None, None]

Applies the extract/transform logic to the source and returns the results.

Parameters:

  • qualify_names
    (bool, default: False ) –

    If True, qualify the names of the columns with the source name.

  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when fetching data in batches.

  • return_type
    (QueryReturnType, default: POLARS ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (list[str] | None, default: None ) –

    List of keys to select a subset of all source entries.

Returns:

  • None

    The requested data in the specified format, as an iterator of tables.

run

run(batch_size: int | None = None, full_rerun: bool = False) -> Table

Hash a dataset from its warehouse, ready to be inserted, and cache hashes.

Hashes the index fields defined in the source based on the extract/transform logic.

Does not hash the key field.

Parameters:

  • batch_size
    (int | None, default: None ) –

    If set, process data in batches internally. Indicates the size of each batch.

  • full_rerun
    (bool, default: False ) –

    Whether to force a re-run even if the hashes are cached

Returns:

  • Table

    A PyArrow Table containing source keys and their hashes.

qualify_field

qualify_field(field: str) -> str

Qualify field names with the source name.

Parameters:

  • field
    (str) –

    The field name to qualify.

Returns:

  • str

    A single qualified field.

f

f(fields: str | Iterable[str]) -> str | list[str]

Qualify one or more field names with the source name.

Parameters:

  • fields
    (str | Iterable[str]) –

    The field name to qualify, or a list of field names.

Returns:

  • str | list[str]

    A single qualified field, or a list of qualified field names.

sync

sync() -> None

Send the source config and hashes to the server.

query

query(**kwargs) -> Query

Generate a query for this source.

requires_client

requires_client(method: Callable[..., T]) -> Callable[..., T]

Decorator that checks if client is set before executing a method.

A helper method for Location subclasses.

Raises:

location_type_to_class

location_type_to_class(location_type: LocationType) -> type[Location]

Map location type string to the corresponding class.