Skip to content

API reference

matchbox.server.uploads

Worker logic to process user uploads.

Classes:

Functions:

Attributes:

CELERY_SETTINGS module-attribute

CELERY_SETTINGS = get_backend_settings(backend_type)()

CELERY_BACKEND module-attribute

CELERY_BACKEND: MatchboxDBAdapter | None = None

CELERY_TRACKER module-attribute

CELERY_TRACKER: UploadTracker | None = None

celery module-attribute

celery = Celery('matchbox', broker=redis_uri)

UploadEntry

Bases: BaseModel

Entry in upload tracker, combining private metadata and public upload status.

Attributes:

status instance-attribute

status: UploadStatus

metadata instance-attribute

UploadTracker

Bases: ABC

Abstract class for upload tracker.

Methods:

  • add_source

    Register source metadata and return ID.

  • add_model

    Register model results metadata and return ID.

  • get

    Retrieve metadata by ID if not expired.

  • update

    Update the stage and details for an upload.

add_source

add_source(metadata: SourceConfig) -> str

Register source metadata and return ID.

add_model

add_model(metadata: ModelConfig) -> str

Register model results metadata and return ID.

get abstractmethod

get(upload_id: str) -> UploadEntry | None

Retrieve metadata by ID if not expired.

update abstractmethod

update(
    upload_id: str, stage: str, details: str | None = None
) -> None

Update the stage and details for an upload.

Raises:

InMemoryUploadTracker

InMemoryUploadTracker()

Bases: UploadTracker

In-memory upload tracker, only usable with single server instance.

Methods:

  • get

    Retrieve metadata by ID if not expired.

  • update

    Update the stage and details for an upload.

  • add_source

    Register source metadata and return ID.

  • add_model

    Register model results metadata and return ID.

get

get(upload_id: str) -> UploadEntry | None

Retrieve metadata by ID if not expired.

update

update(
    upload_id: str, stage: str, details: str | None = None
) -> None

Update the stage and details for an upload.

Raises:

add_source

add_source(metadata: SourceConfig) -> str

Register source metadata and return ID.

add_model

add_model(metadata: ModelConfig) -> str

Register model results metadata and return ID.

RedisUploadTracker

RedisUploadTracker(
    redis_url: str,
    expiry_minutes: int,
    key_space: str = "upload",
)

Bases: UploadTracker

Upload tracker backed by Redis.

Methods:

  • get

    Retrieve metadata by ID if not expired.

  • update

    Update the stage and details for an upload.

  • add_source

    Register source metadata and return ID.

  • add_model

    Register model results metadata and return ID.

Attributes:

expiry_minutes instance-attribute

expiry_minutes = expiry_minutes

redis instance-attribute

redis = from_url(redis_url)

key_prefix instance-attribute

key_prefix = f'{key_space}:'

get

get(upload_id: str) -> UploadEntry | None

Retrieve metadata by ID if not expired.

update

update(
    upload_id: str, stage: str, details: str | None = None
) -> None

Update the stage and details for an upload.

Raises:

add_source

add_source(metadata: SourceConfig) -> str

Register source metadata and return ID.

add_model

add_model(metadata: ModelConfig) -> str

Register model results metadata and return ID.

settings_to_upload_tracker

settings_to_upload_tracker(
    settings: MatchboxServerSettings,
) -> UploadTracker

Initialise an upload tracker from server settings.

table_to_s3

table_to_s3(
    client: S3Client,
    bucket: str,
    key: str,
    file: UploadFile,
    expected_schema: Schema,
) -> str

Upload a PyArrow Table to S3 and return the key.

Parameters:

  • client

    (S3Client) –

    The S3 client to use.

  • bucket

    (str) –

    The S3 bucket to upload to.

  • key

    (str) –

    The key to upload to.

  • file

    (UploadFile) –

    The file to upload.

  • expected_schema

    (Schema) –

    The schema that the file should match.

Raises:

  • MatchboxServerFileError

    If the file is not a valid Parquet file or the schema does not match the expected schema.

Returns:

  • str

    The key of the uploaded file.

s3_to_recordbatch

s3_to_recordbatch(
    client: S3Client,
    bucket: str,
    key: str,
    batch_size: int = 1000,
) -> Generator[RecordBatch, None, None]

Download a PyArrow Table from S3 and stream it as RecordBatches.

initialise_celery_worker

initialise_celery_worker()

Initialise backend and tracker for celery worker.

process_upload

process_upload(
    backend: MatchboxDBAdapter,
    tracker: UploadTracker,
    s3_client: S3Client,
    upload_type: str,
    resolution_name: str,
    upload_id: str,
    bucket: str,
    filename: str,
) -> None

Generic task to process uploaded file, usable by FastAPI BackgroundTasks.

process_upload_celery

process_upload_celery(
    upload_type: str,
    resolution_name: str,
    upload_id: str,
    bucket: str,
    filename: str,
) -> None

Celery task to process uploaded file, with only serialisable arguments.