Skip to content

API reference

matchbox.server.uploads

Worker logic to process user uploads.

Classes:

Functions:

Attributes:

celery_logger module-attribute

celery_logger = get_task_logger(__name__)

CELERY_SETTINGS module-attribute

CELERY_SETTINGS = get_backend_settings(backend_type)()

CELERY_BACKEND module-attribute

CELERY_BACKEND: MatchboxDBAdapter | None = None

CELERY_TRACKER module-attribute

CELERY_TRACKER: UploadTracker | None = None

celery module-attribute

celery = Celery('matchbox', broker=redis_uri)

UploadEntry

Bases: BaseModel

Entry in upload tracker, combining private metadata and public upload status.

Attributes:

status instance-attribute

status: UploadStatus

path instance-attribute

UploadTracker

Bases: ABC

Abstract class for upload tracker.

Methods:

  • add_source

    Register source resolution and return ID.

  • add_model

    Register model resolution and return ID.

  • get

    Retrieve entry by ID if not expired.

  • update

    Update the stage and details for an upload.

add_source

add_source(path: ResolutionPath) -> str

Register source resolution and return ID.

add_model

add_model(path: ResolutionPath) -> str

Register model resolution and return ID.

get abstractmethod

get(upload_id: str) -> UploadEntry | None

Retrieve entry by ID if not expired.

update abstractmethod

update(upload_id: str, stage: str, details: str | None = None) -> None

Update the stage and details for an upload.

Raises:

InMemoryUploadTracker

InMemoryUploadTracker()

Bases: UploadTracker

In-memory upload tracker, only usable with single server instance.

Methods:

  • get

    Retrieve entry by ID if not expired.

  • update

    Update the stage and details for an upload.

  • add_source

    Register source resolution and return ID.

  • add_model

    Register model resolution and return ID.

get

get(upload_id: str) -> UploadEntry | None

Retrieve entry by ID if not expired.

update

update(upload_id: str, stage: str, details: str | None = None) -> None

Update the stage and details for an upload.

Raises:

add_source

add_source(path: ResolutionPath) -> str

Register source resolution and return ID.

add_model

add_model(path: ResolutionPath) -> str

Register model resolution and return ID.

RedisUploadTracker

RedisUploadTracker(redis_url: str, expiry_minutes: int, key_space: str = 'upload')

Bases: UploadTracker

Upload tracker backed by Redis.

Methods:

  • get

    Retrieve entry by ID if not expired.

  • update

    Update the stage and details for an upload.

  • add_source

    Register source resolution and return ID.

  • add_model

    Register model resolution and return ID.

Attributes:

expiry_minutes instance-attribute

expiry_minutes = expiry_minutes

redis instance-attribute

redis = from_url(redis_url)

key_prefix instance-attribute

key_prefix = f'{key_space}:'

get

get(upload_id: str) -> UploadEntry | None

Retrieve entry by ID if not expired.

update

update(upload_id: str, stage: str, details: str | None = None) -> None

Update the stage and details for an upload.

Raises:

add_source

add_source(path: ResolutionPath) -> str

Register source resolution and return ID.

add_model

add_model(path: ResolutionPath) -> str

Register model resolution and return ID.

settings_to_upload_tracker

settings_to_upload_tracker(settings: MatchboxServerSettings) -> UploadTracker

Initialise an upload tracker from server settings.

table_to_s3

table_to_s3(client: S3Client, bucket: str, key: str, file: UploadFile, expected_schema: Schema) -> str

Upload a PyArrow Table to S3 and return the key.

Parameters:

  • client

    (S3Client) –

    The S3 client to use.

  • bucket

    (str) –

    The S3 bucket to upload to.

  • key

    (str) –

    The key to upload to.

  • file

    (UploadFile) –

    The file to upload.

  • expected_schema

    (Schema) –

    The schema that the file should match.

Raises:

  • MatchboxServerFileError

    If the file is not a valid Parquet file or the schema does not match the expected schema.

Returns:

  • str

    The key of the uploaded file.

s3_to_recordbatch

s3_to_recordbatch(client: S3Client, bucket: str, key: str, batch_size: int = 1000) -> Generator[RecordBatch, None, None]

Download a PyArrow Table from S3 and stream it as RecordBatches.

initialise_celery_worker

initialise_celery_worker()

Initialise backend and tracker for celery worker.

process_upload

process_upload(backend: MatchboxDBAdapter, tracker: UploadTracker, s3_client: S3Client, upload_type: str, resolution_name: str, upload_id: str, bucket: str, filename: str) -> None

Generic task to process uploaded file, usable by FastAPI BackgroundTasks.

process_upload_celery

process_upload_celery(self: Task, upload_type: str, resolution_name: str, upload_id: str, bucket: str, filename: str) -> None

Celery task to process uploaded file, with only serialisable arguments.