Skip to content

Sources

matchbox.common.sources

Classes and functions for working with data sources in Matchbox.

Classes:

  • SourceColumn

    A column in a dataset that can be indexed in the Matchbox database.

  • SourceAddress

    A unique identifier for a dataset in a warehouse.

  • Source

    A dataset that can, or has been indexed on the backend.

  • Match

    A match between primary keys in the Matchbox database.

Functions:

Attributes:

SerialisableBytes module-attribute

SerialisableBytes = Annotated[
    bytes,
    PlainValidator(b64_bytes_validator),
    PlainSerializer(lambda v: hash_to_base64(v)),
    WithJsonSchema(
        {
            "type": "string",
            "format": "base64",
            "description": "Base64 encoded bytes",
        }
    ),
]

SourceColumn

Bases: BaseModel

A column in a dataset that can be indexed in the Matchbox database.

Attributes:

name instance-attribute

name: str

type class-attribute instance-attribute

type: str | None = Field(
    default=None,
    description="The type to cast the column to before hashing data.",
)

SourceAddress

Bases: BaseModel

A unique identifier for a dataset in a warehouse.

Methods:

  • compose

    Generate a SourceAddress from a SQLAlchemy Engine and full source name.

  • format_column

    Outputs a full SQLAlchemy column representation.

Attributes:

full_name instance-attribute

full_name: str

warehouse_hash instance-attribute

warehouse_hash: SerialisableBytes

pretty property

pretty: str

Return a pretty representation of the address.

warehouse_hash_b64 property

warehouse_hash_b64: str

Return warehouse hash as a base64 encoded string.

compose classmethod

compose(engine: Engine, full_name: str) -> SourceAddress

Generate a SourceAddress from a SQLAlchemy Engine and full source name.

format_column

format_column(column: str) -> str

Outputs a full SQLAlchemy column representation.

Parameters:

  • column
    (str) –

    the name of the column

Returns:

  • str

    A string representing the table name and column

Source

Bases: BaseModel

A dataset that can, or has been indexed on the backend.

Methods:

  • set_engine

    Adds engine, and use it to validate current columns.

  • get_remote_columns

    Returns a dictionary of column names and SQLAlchemy types.

  • default_columns

    Returns a new source with default columns.

  • to_table

    Returns the dataset as a SQLAlchemy Table object.

  • check_columns

    Check that columns are available in the warehouse and correctly typed.

  • to_arrow

    Returns the dataset as a PyArrow Table or an iterator of PyArrow Tables.

  • to_polars

    Returns the dataset as a PyArrow Table or an iterator of PyArrow Tables.

  • to_pandas

    Returns the dataset as a pandas DataFrame or an iterator of DataFrames.

  • hash_data

    Retrieve and hash a dataset from its warehouse, ready to be inserted.

Attributes:

address instance-attribute

address: SourceAddress

resolution_name class-attribute instance-attribute

resolution_name: str = Field(
    default_factory=lambda data: str(data["address"])
)

db_pk instance-attribute

db_pk: str

columns class-attribute instance-attribute

columns: tuple[SourceColumn, ...] | None = None

engine property

engine: Engine | None

The SQLAlchemy Engine used to connect to the dataset.

set_engine

set_engine(engine: Engine)

Adds engine, and use it to validate current columns.

get_remote_columns

get_remote_columns(exclude_pk=False) -> dict[str, str]

Returns a dictionary of column names and SQLAlchemy types.

default_columns

default_columns() -> Source

Returns a new source with default columns.

Default columns are all from the source warehouse other than self.db_pk. All other attributes are copied, and its engine (if present) is set.

to_table

to_table() -> Table

Returns the dataset as a SQLAlchemy Table object.

check_columns

check_columns(columns: list[str] | None = None) -> None

Check that columns are available in the warehouse and correctly typed.

Parameters:

  • columns
    (list[str] | None, default: None ) –

    List of column names to check. If None, it will check self.columns

to_arrow

to_arrow(
    fields: list[str] | None = None,
    pks: list[T] | None = None,
    limit: int | None = None,
    *,
    return_batches: bool = False,
    batch_size: int | None = None,
    schema_overrides: dict[str, Any] | None = None,
    execute_options: dict[str, Any] | None = None,
) -> Table | Iterator[Table]

Returns the dataset as a PyArrow Table or an iterator of PyArrow Tables.

Parameters:

  • fields
    (list[str] | None, default: None ) –

    List of column names to retrieve. If None, retrieves all columns.

  • pks
    (list[T] | None, default: None ) –

    List of primary keys to filter by. If None, retrieves all rows.

  • limit
    (int | None, default: None ) –

    Maximum number of rows to retrieve. If None, retrieves all rows.

  • return_batches
    (bool, default: False ) –
    • If True, return an iterator that yields each batch separately
    • If False, return a single Table with all results
  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when processing data in batches.

  • schema_overrides
    (dict[str, Any] | None, default: None ) –

    A dictionary mapping column names to dtypes.

  • execute_options
    (dict[str, Any] | None, default: None ) –

    These options will be passed through into the underlying query execution method as kwargs.

Returns:

  • Table | Iterator[Table]

    The requested data in PyArrow format.

    • If return_batches is False: a PyArrow Table
    • If return_batches is True: an iterator of PyArrow Tables

to_polars

to_polars(
    fields: list[str] | None = None,
    pks: list[T] | None = None,
    limit: int | None = None,
    *,
    return_batches: bool = False,
    batch_size: int | None = None,
    schema_overrides: dict[str, Any] | None = None,
    execute_options: dict[str, Any] | None = None,
) -> DataFrame | Iterator[DataFrame]

Returns the dataset as a PyArrow Table or an iterator of PyArrow Tables.

Parameters:

  • fields
    (list[str] | None, default: None ) –

    List of column names to retrieve. If None, retrieves all columns.

  • pks
    (list[T] | None, default: None ) –

    List of primary keys to filter by. If None, retrieves all rows.

  • limit
    (int | None, default: None ) –

    Maximum number of rows to retrieve. If None, retrieves all rows.

  • return_batches
    (bool, default: False ) –
    • If True, return an iterator that yields each batch separately
    • If False, return a single Table with all results
  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when processing data in batches.

  • schema_overrides
    (dict[str, Any] | None, default: None ) –

    A dictionary mapping column names to dtypes.

  • execute_options
    (dict[str, Any] | None, default: None ) –

    These options will be passed through into the underlying query execution method as kwargs.

Returns:

  • DataFrame | Iterator[DataFrame]

    The requested data in Polars format.

    • If return_batches is False: a Polars DataFrame
    • If return_batches is True: an iterator of Polars DataFrames

to_pandas

to_pandas(
    fields: list[str] | None = None,
    pks: list[T] | None = None,
    limit: int | None = None,
    *,
    return_batches: bool = False,
    batch_size: int | None = None,
    schema_overrides: dict[str, Any] | None = None,
    execute_options: dict[str, Any] | None = None,
) -> DataFrame | Iterator[DataFrame]

Returns the dataset as a pandas DataFrame or an iterator of DataFrames.

Parameters:

  • fields
    (list[str] | None, default: None ) –

    List of column names to retrieve. If None, retrieves all columns.

  • pks
    (list[T] | None, default: None ) –

    List of primary keys to filter by. If None, retrieves all rows.

  • limit
    (int | None, default: None ) –

    Maximum number of rows to retrieve. If None, retrieves all rows.

  • return_batches
    (bool, default: False ) –
    • If True, return an iterator that yields each batch separately
    • If False, return a single Table with all results
  • batch_size
    (int | None, default: None ) –

    Indicate the size of each batch when processing data in batches.

  • schema_overrides
    (dict[str, Any] | None, default: None ) –

    A dictionary mapping column names to dtypes.

  • execute_options
    (dict[str, Any] | None, default: None ) –

    These options will be passed through into the underlying query execution method as kwargs.

Returns:

  • DataFrame | Iterator[DataFrame]

    The requested data in Pandas format.

    • If return_batches is False: a Pandas DataFrame
    • If return_batches is True: an iterator of Pandas DataFrames

hash_data

hash_data(
    *,
    batch_size: int | None = None,
    schema_overrides: dict[str, Any] | None = None,
    execute_options: dict[str, Any] | None = None,
) -> Table

Retrieve and hash a dataset from its warehouse, ready to be inserted.

Parameters:

  • batch_size
    (int | None, default: None ) –

    If set, process data in batches internally. Indicates the size of each batch.

  • schema_overrides
    (dict[str, Any] | None, default: None ) –

    A dictionary mapping column names to dtypes.

  • execute_options
    (dict[str, Any] | None, default: None ) –

    These options will be passed through into the underlying query execution method as kwargs.

Returns:

  • Table

    A PyArrow Table containing source primary keys and their hashes.

Match

Bases: BaseModel

A match between primary keys in the Matchbox database.

Methods:

  • found_or_none

    Ensure that a match has sources and a cluster if target was found.

Attributes:

cluster instance-attribute

cluster: int | None

source instance-attribute

source: SourceAddress

source_id class-attribute instance-attribute

source_id: set[str] = Field(default_factory=set)

target instance-attribute

target: SourceAddress

target_id class-attribute instance-attribute

target_id: set[str] = Field(default_factory=set)

found_or_none

found_or_none() -> Match

Ensure that a match has sources and a cluster if target was found.

b64_bytes_validator

b64_bytes_validator(val: bytes | str) -> bytes

Ensure that a value is a base64 encoded string or bytes.

needs_engine

needs_engine(func: Callable[P, R]) -> Callable[P, R]

Decorator to check that engine is set.