ccflow_celery.app

Classes

CeleryConfig

Configuration for a Celery application.

CeleryApp

Wrapper around a Celery application instance.

Module Contents

class ccflow_celery.app.CeleryConfig(/, **data: Any)[source]

Bases: ccflow.BaseModel

Configuration for a Celery application.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

broker_url: str = None
result_backend: str | None = None
task_serializer: str = 'json'
result_serializer: str = 'json'
accept_content: list = None
task_track_started: bool = True
task_default_queue: str = 'default'
task_routes: Dict[str, str] | None = None
worker_concurrency: int | None = None
task_always_eager: bool = None
task_eager_propagates: bool = None
class ccflow_celery.app.CeleryApp(/, **data: Any)[source]

Bases: ccflow.BaseModel

Wrapper around a Celery application instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

name: str = None
config: CeleryConfig = None
_app: Any = None
get_app()[source]