Skip to content

API reference

matchbox.server.uploads

Worker logic to process user uploads.

Classes:

Functions:

Attributes:

celery_logger module-attribute

celery_logger = get_task_logger(__name__)

CELERY_SETTINGS module-attribute

CELERY_SETTINGS = get_backend_settings(backend_type)()

CELERY_BACKEND module-attribute

CELERY_BACKEND: MatchboxDBAdapter | None = None

CELERY_TRACKER module-attribute

CELERY_TRACKER: UploadTracker | None = None

celery module-attribute

celery = Celery('matchbox', broker=redis_uri)

UploadTracker

Bases: ABC

Upload error tracker.

Methods:

  • get

    Retrieve error message from tracker.

  • set

    Add error message to tracker.

get abstractmethod

get(upload_id: str) -> str | None

Retrieve error message from tracker.

set abstractmethod

set(upload_id: str, message: str) -> None

Add error message to tracker.

InMemoryUploadTracker

InMemoryUploadTracker()

In-memory error tracker.

Methods:

Attributes:

tracker instance-attribute

tracker: dict[str, str] = {}

get

get(upload_id: str) -> str | None

set

set(upload_id: str, message: str) -> None

RedisUploadTracker

RedisUploadTracker(redis_url: str, expiry_minutes: int, key_space: str = 'upload')

Error tracker backed by Redis.

Methods:

Attributes:

expiry_minutes instance-attribute

expiry_minutes = expiry_minutes

redis instance-attribute

redis = from_url(redis_url)

key_prefix instance-attribute

key_prefix = f'{key_space}:'

get

get(upload_id: str) -> str | None

set

set(upload_id: str, message: str) -> None

settings_to_upload_tracker

settings_to_upload_tracker(settings: MatchboxServerSettings) -> UploadTracker

Initialise an upload tracker from server settings.

file_to_s3

file_to_s3(client: S3Client, bucket: str, key: str, file: UploadFile) -> str

Upload a PyArrow Table to S3 and return the key.

Parameters:

  • client

    (S3Client) –

    The S3 client to use.

  • bucket

    (str) –

    The S3 bucket to upload to.

  • key

    (str) –

    The key to upload to.

  • file

    (UploadFile) –

    The file to upload.

Raises:

  • MatchboxServerFileError

    If the file is not a valid Parquet file or the schema does not match the expected schema.

Returns:

  • str

    The key of the uploaded file.

s3_to_recordbatch

s3_to_recordbatch(client: S3Client, bucket: str, key: str, batch_size: int = 1000) -> Generator[RecordBatch, None, None]

Download a PyArrow Table from S3 and stream it as RecordBatches.

initialise_celery_worker

initialise_celery_worker() -> None

Initialise backend and tracker for celery worker.

process_upload

process_upload(backend: MatchboxDBAdapter, tracker: UploadTracker, s3_client: S3Client, resolution_path: ResolutionPath, upload_id: str, bucket: str, filename: str) -> None

Generic task to process uploaded file, usable by FastAPI BackgroundTasks.

process_upload_celery

process_upload_celery(self: Task, resolution_path_json: str, upload_id: str, bucket: str, filename: str) -> None

Celery task to process uploaded file, with only serialisable arguments.