Skip to content

Sources

matchbox.common.sources

Classes and functions for working with data sources in Matchbox.

Classes:

  • Location

    A location for a data source.

  • RelationalDBLocation

    A location for a relational database.

  • SourceField

    A field in a source that can be indexed in the Matchbox database.

  • SourceConfig

    Configuration of a source that can, or has been, indexed in the backend.

  • Match

    A match between primary keys in the Matchbox database.

Functions:

  • requires_client

    Decorator that checks if client is set before executing a method.

Attributes:

  • LocationType

    Type for Location class. Currently only supports RelationalDBLocation.

  • LocationTypeStr

    String literal type for Location class. Currently only supports “rdbms”.

LocationType module-attribute

LocationType = Union['RelationalDBLocation']

Type for Location class. Currently only supports RelationalDBLocation.

LocationTypeStr module-attribute

LocationTypeStr = Union[Literal['rdbms']]

String literal type for Location class. Currently only supports “rdbms”.

Location

Bases: ABC, BaseModel

A location for a data source.

Methods:

  • add_client

    Adds client to the location.

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Validate ET logic against this location’s query language.

  • infer_types

    Extract all data types from the ET logic.

  • execute

    Execute ET logic against location and return batches.

Attributes:

type instance-attribute

name instance-attribute

name: str

client class-attribute instance-attribute

client: Any | None = Field(exclude=True, default=None)

add_client abstractmethod

add_client(client: Any) -> Self

Adds client to the location.

connect abstractmethod

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform abstractmethod

validate_extract_transform(extract_transform: str) -> bool

Validate ET logic against this location’s query language.

Raises:

  • MatchboxSourceExtractTransformError

    If the ET logic is invalid.

infer_types abstractmethod

infer_types(extract_transform: str) -> dict[str, DataTypes]

Extract all data types from the ET logic.

execute abstractmethod

execute(
    extract_transform: str,
    batch_size: int | None = None,
    rename: dict[str, str] | Callable | None = None,
    return_type: ReturnTypeStr = "polars",
    keys: tuple[str, list[str]] | None = None,
) -> Iterator[QueryReturnType]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (tuple[str, list[str]] | None, default: None ) –

    Rule to only retrieve rows by specific keys. The key of the dictionary is a field name on which to filter. Filters source entries where the key field is in the dict values.

Raises:

RelationalDBLocation

Bases: Location

A location for a relational database.

Methods:

  • add_client

    Adds client to the location.

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Validate ET logic against this location’s query language.

  • infer_types

    Extract all data types from the ET logic.

  • execute

    Execute ET logic against location and return batches.

Attributes:

type class-attribute instance-attribute

type: Literal['rdbms'] = 'rdbms'

name instance-attribute

name: str

client class-attribute instance-attribute

client: Engine | None = Field(
    exclude=True,
    default=None,
    description="The client for a relational database is a SQLAlchemy Engine.",
)

add_client

add_client(client: Engine) -> None

Adds client to the location.

connect

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform

validate_extract_transform(extract_transform: str) -> bool

Validate ET logic against this location’s query language.

Raises:

  • MatchboxSourceExtractTransformError

    If the ET logic is invalid.

infer_types

infer_types(extract_transform: str) -> dict[str, DataTypes]

Extract all data types from the ET logic.

execute

execute(
    extract_transform: str,
    batch_size: int | None = None,
    rename: dict[str, str] | Callable | None = None,
    return_type: ReturnTypeStr = "polars",
    keys: tuple[str, list[str]] | None = None,
    schema_overrides: dict[str, DataType] | None = None,
) -> Generator[QueryReturnType, None, None]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (tuple[str, list[str]] | None, default: None ) –

    Rule to only retrieve rows by specific keys. The key of the dictionary is a field name on which to filter. Filters source entries where the key field is in the dict values.

Raises:

SourceField

Bases: BaseModel

A field in a source that can be indexed in the Matchbox database.

Attributes:

name class-attribute instance-attribute

name: str = Field(
    description="The name of the field in the source after the extract/transform logic has been applied."
)

type class-attribute instance-attribute

type: DataTypes = Field(
    description="The cached field type. Used to ensure a stable hash."
)

SourceConfig

Bases: BaseModel

Configuration of a source that can, or has been, indexed in the backend.

SourceConfigs are used to configure source resolutions. They are foundational processes on top of which linking and deduplication models can build new resolutions.

Methods:

  • f

    Qualify one or more field names with the source name.

  • validate_name

    Ensure the name is a valid source resolution name.

  • validate_key_field

    Ensure that the key field is a string and not in the index fields.

  • new

    Create a new SourceConfig for an indexing operation.

  • query

    Applies the extract/transform logic to the source and returns the results.

  • hash_data

    Retrieve and hash a dataset from its warehouse, ready to be inserted.

Attributes:

location class-attribute instance-attribute

location: LocationType = Field(
    discriminator="type",
    description="The location of the source. Used to run the extract/tansform logic.",
)

name class-attribute instance-attribute

name: SourceResolutionName = Field(
    description="A unique, human-readable name of the source resolution this object configures."
)

extract_transform class-attribute instance-attribute

extract_transform: str = Field(
    description="Logic to extract and transform data from the source. Language is location dependent."
)

key_field class-attribute instance-attribute

key_field: SourceField = Field(
    description=dedent(
        "\n            The key field. This is the source's key for unique\n            entities, such as a primary key in a relational database.\n\n            Keys must ALWAYS be a string.\n\n            For example, if the source describes companies, it may have used\n            a Companies House number as its key.\n\n            This key is ALWAYS correct. It should be something generated and\n            owned by the source being indexed.\n            \n            For example, your organisation's CRM ID is a key field within the CRM.\n            \n            A CRM ID entered by hand in another dataset shouldn't be used \n            as a key field.\n        "
    )
)

index_fields class-attribute instance-attribute

index_fields: tuple[SourceField, ...] = Field(
    default=None,
    description=dedent(
        "\n            The fields to index in this source, after the extract/transform logic \n            has been applied. \n\n            This is usually set manually, and should map onto the columns that the\n            extract/transform logic returns.\n            "
    ),
)

prefix property

prefix: str

Get the prefix for the source.

qualified_key property

qualified_key: str

Get the qualified key for the source.

qualified_fields property

qualified_fields: list[str]

Get the qualified fields for the source.

f

f(fields: str | Iterable[str]) -> str | list[str]

Qualify one or more field names with the source name.

Parameters:

  • fields
    (str | Iterable[str]) –

    The field name to qualify, or a list of field names.

Returns:

  • str | list[str]

    A single qualified field, or a list of qualified field names.

validate_name classmethod

validate_name(value: str) -> str

Ensure the name is a valid source resolution name.

Raises:

  • ValueError

    If the name is not a valid source resolution name.

validate_key_field

validate_key_field() -> Self

Ensure that the key field is a string and not in the index fields.

new classmethod

new(
    location: Location,
    name: str,
    extract_transform: str,
    key_field: str,
    index_fields: list[str],
) -> SourceConfig

Create a new SourceConfig for an indexing operation.

query

query(
    qualify_names: bool = False,
    batch_size: int | None = None,
    return_type: ReturnTypeStr = "polars",
    keys: list[str] | None = None,
) -> Generator[QueryReturnType, None, None]

Applies the extract/transform logic to the source and returns the results.

Parameters:

  • qualify_names
    (bool, default: False ) –

    If True, qualify the names of the columns with the source name.

  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when processing data in batches.

  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

  • keys
    (list[str] | None, default: None ) –

    List of keys to select a subset of all source entries.

Returns:

  • None

    The requested data in the specified format, as an iterator of tables.

hash_data

hash_data(batch_size: int | None = None) -> Table

Retrieve and hash a dataset from its warehouse, ready to be inserted.

Hashes the index fields defined in the source based on the extract/transform logic.

Does not hash the key field.

Parameters:

  • batch_size
    (int | None, default: None ) –

    If set, process data in batches internally. Indicates the size of each batch.

Returns:

  • Table

    A PyArrow Table containing source keys and their hashes.

Match

Bases: BaseModel

A match between primary keys in the Matchbox database.

Methods:

  • found_or_none

    Ensure that a match has sources and a cluster if target was found.

Attributes:

cluster instance-attribute

cluster: int | None

source instance-attribute

source_id class-attribute instance-attribute

source_id: set[str] = Field(default_factory=set)

target instance-attribute

target_id class-attribute instance-attribute

target_id: set[str] = Field(default_factory=set)

found_or_none

found_or_none() -> Match

Ensure that a match has sources and a cluster if target was found.

requires_client

requires_client(
    method: Callable[..., T],
) -> Callable[..., T]

Decorator that checks if client is set before executing a method.

A helper method for Location subclasses.

Raises: