Skip to content

Sources

matchbox.common.sources

Classes and functions for working with data sources in Matchbox.

Classes:

  • Location

    A location for a data source.

  • RelationalDBLocation

    A location for a relational database.

  • SourceField

    A field in a source that can be indexed in the Matchbox database.

  • SourceConfig

    Configuration of a source that can, or has been, indexed in the backend.

  • Match

    A match between primary keys in the Matchbox database.

Functions:

Attributes:

  • LocationType

    Type for Location class. Currently only supports RelationalDBLocation.

  • LocationTypeStr

    String literal type for Location class. Currently only supports “rdbms”.

LocationType module-attribute

LocationType = Union['RelationalDBLocation']

Type for Location class. Currently only supports RelationalDBLocation.

LocationTypeStr module-attribute

LocationTypeStr = Union[Literal['rdbms']]

String literal type for Location class. Currently only supports “rdbms”.

Location

Bases: ABC, BaseModel

A location for a data source.

Methods:

  • add_credentials

    Adds credentials to the location.

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Validate SQL ET logic against this location’s query language.

  • head

    Extract lightweight data sample using ET logic.

  • execute

    Execute ET logic against location and return batches.

Attributes:

type instance-attribute

uri instance-attribute

uri: AnyUrl

credentials class-attribute instance-attribute

credentials: Any | None = Field(exclude=True, default=None)

add_credentials abstractmethod

add_credentials(credentials: Any) -> None

Adds credentials to the location.

connect abstractmethod

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform abstractmethod

validate_extract_transform(extract_transform: str) -> bool

Validate SQL ET logic against this location’s query language.

Raises:

  • MatchboxSourceExtractTransformError

    If the ET logic is invalid.

head abstractmethod

head(extract_transform: str) -> list

Extract lightweight data sample using ET logic.

execute abstractmethod

execute(
    extract_transform: str,
    batch_size: int | None = None,
    rename: dict[str, str] | Callable | None = None,
    return_type: ReturnTypeStr = "polars",
) -> Iterator[QueryReturnType]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

Raises:

RelationalDBLocation

Bases: Location

A location for a relational database.

Methods:

  • validate_uri

    Ensure no credentials, query params, or fragments are in the URI.

  • add_credentials

    Adds credentials to the location.

  • connect

    Establish connection to the data location.

  • validate_extract_transform

    Validate SQL ET logic against this location’s query language.

  • head

    Extract lightweight data sample using ET logic.

  • execute

    Execute ET logic against location and return batches.

  • from_engine

    Create a RelationalDBLocation from a SQLAlchemy Engine.

Attributes:

type class-attribute instance-attribute

type: Literal['rdbms'] = 'rdbms'

uri instance-attribute

uri: AnyUrl

credentials class-attribute instance-attribute

credentials: Engine | None = Field(
    exclude=True,
    default=None,
    description="The credentials for a relational database are a SQLAlchemy Engine.",
)

validate_uri classmethod

validate_uri(value: AnyUrl) -> AnyUrl

Ensure no credentials, query params, or fragments are in the URI.

add_credentials

add_credentials(credentials: Engine) -> None

Adds credentials to the location.

connect

connect() -> bool

Establish connection to the data location.

Raises:

validate_extract_transform

validate_extract_transform(extract_transform: str) -> bool

Validate SQL ET logic against this location’s query language.

Raises:

  • MatchboxSourceExtractTransformError

    If the ET logic is invalid.

head

head(extract_transform: str) -> DataFrame

Extract lightweight data sample using ET logic.

execute

execute(
    extract_transform: str,
    batch_size: int | None = None,
    rename: dict[str, str] | Callable | None = None,
    return_type: ReturnTypeStr = "polars",
) -> Generator[QueryReturnType, None, None]

Execute ET logic against location and return batches.

Parameters:

  • extract_transform
    (str) –

    The ET logic to execute.

  • batch_size
    (int | None, default: None ) –

    The size of the batches to return.

  • rename
    (dict[str, str] | Callable | None, default: None ) –

    Renaming to apply after the ET logic is executed.

    • If a dictionary is provided, it will be used to rename the columns.
    • If a callable is provided, it will take the old name as input and return the new name.
  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

Raises:

from_engine classmethod

from_engine(engine: Engine) -> RelationalDBLocation

Create a RelationalDBLocation from a SQLAlchemy Engine.

SourceField

Bases: BaseModel

A field in a source that can be indexed in the Matchbox database.

Attributes:

name class-attribute instance-attribute

name: str = Field(
    description="The name of the field in the source after the extract/transform logic has been applied."
)

type class-attribute instance-attribute

type: DataTypes = Field(
    description="The cached field type. Used to ensure a stable hash."
)

SourceConfig

Bases: BaseModel

Configuration of a source that can, or has been, indexed in the backend.

SourceConfigs are used to configure source resolutions. They are foundational processes on top of which linking and deduplication models can build new resolutions.

Methods:

  • f

    Qualify one or more field names with the source name.

  • validate_name

    Ensure the name is a valid source resolution name.

  • validate_key_field

    Ensure that the key field is a string and not in the index fields.

  • new

    Create a new SourceConfig for an indexing operation.

  • query

    Applies the extract/transform logic to the source and returns the results.

  • hash_data

    Retrieve and hash a dataset from its warehouse, ready to be inserted.

Attributes:

location class-attribute instance-attribute

location: LocationType = Field(
    discriminator="type",
    description="The location of the source. Used to run the extract/tansform logic.",
)

name class-attribute instance-attribute

name: SourceResolutionName = Field(
    description="A unique, human-readable name of the source resolution this object configures."
)

extract_transform class-attribute instance-attribute

extract_transform: str = Field(
    description="Logic to extract and transform data from the source. Language is location dependent."
)

key_field class-attribute instance-attribute

key_field: SourceField = Field(
    description=dedent(
        "\n            The key field. This is the source's key for unique\n            entities, such as a primary key in a relational database.\n\n            Keys must ALWAYS be a string.\n\n            For example, if the source describes companies, it may have used\n            a Companies House number as its key.\n\n            This key is ALWAYS correct. It should be something generated and\n            owned by the source being indexed.\n            \n            For example, your organisation's CRM ID is a key field within the CRM.\n            \n            A CRM ID entered by hand in another dataset shouldn't be used \n            as a key field.\n        "
    )
)

index_fields class-attribute instance-attribute

index_fields: tuple[SourceField, ...] = Field(
    default=None,
    description=dedent(
        "\n            The fields to index in this source, after the extract/transform logic \n            has been applied. \n\n            This is usually set manually, and should map onto the columns that the\n            extract/transform logic returns.\n            "
    ),
)

prefix property

prefix: str

Get the prefix for the source.

qualified_key property

qualified_key: str

Get the qualified key for the source.

qualified_fields property

qualified_fields: list[str]

Get the qualified fields for the source.

f

f(fields: str | Iterable[str]) -> str | list[str]

Qualify one or more field names with the source name.

Parameters:

  • fields
    (str | Iterable[str]) –

    The field name to qualify, or a list of field names.

Returns:

  • str | list[str]

    A single qualified field, or a list of qualified field names.

validate_name classmethod

validate_name(value: str) -> str

Ensure the name is a valid source resolution name.

Raises:

  • ValueError

    If the name is not a valid source resolution name.

validate_key_field

validate_key_field() -> Self

Ensure that the key field is a string and not in the index fields.

new classmethod

new(
    location: Location,
    name: str,
    extract_transform: str,
    key_field: str,
    index_fields: list[str],
) -> SourceConfig

Create a new SourceConfig for an indexing operation.

query

query(
    qualify_names: bool = False,
    batch_size: int | None = None,
    return_type: ReturnTypeStr = "polars",
) -> Generator[QueryReturnType, None, None]

Applies the extract/transform logic to the source and returns the results.

Parameters:

  • qualify_names
    (bool, default: False ) –

    If True, qualify the names of the columns with the source name.

  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when processing data in batches.

  • return_type
    (ReturnTypeStr, default: 'polars' ) –

    The type of data to return. Defaults to “polars”.

Returns:

  • None

    The requested data in the specified format, as an iterator of tables.

hash_data

hash_data(batch_size: int | None = None) -> Table

Retrieve and hash a dataset from its warehouse, ready to be inserted.

Hashes the index fields defined in the source based on the extract/transform logic.

Does not hash the key field.

Parameters:

  • batch_size
    (int | None, default: None ) –

    If set, process data in batches internally. Indicates the size of each batch.

Returns:

  • Table

    A PyArrow Table containing source keys and their hashes.

Match

Bases: BaseModel

A match between primary keys in the Matchbox database.

Methods:

  • found_or_none

    Ensure that a match has sources and a cluster if target was found.

Attributes:

cluster instance-attribute

cluster: int | None

source instance-attribute

source_id class-attribute instance-attribute

source_id: set[str] = Field(default_factory=set)

target instance-attribute

target_id class-attribute instance-attribute

target_id: set[str] = Field(default_factory=set)

found_or_none

found_or_none() -> Match

Ensure that a match has sources and a cluster if target was found.

requires_credentials

requires_credentials(
    method: Callable[..., T],
) -> Callable[..., T]

Decorator that checks if credentials are set before executing a method.

A helper method for Location subclasses.

Raises: