Source code for ccflow_celery.app
from typing import Any, Dict, Optional
from ccflow import BaseModel
from celery import Celery
from pydantic import Field
__all__ = (
"CeleryConfig",
"CeleryApp",
)
[docs]
class CeleryConfig(BaseModel):
"""Configuration for a Celery application."""
broker_url: str = Field(default="redis://localhost:6379/0", description="Celery broker URL")
result_backend: Optional[str] = Field(default="redis://localhost:6379/0", description="Celery result backend URL")
task_serializer: str = "json"
result_serializer: str = "json"
accept_content: list = Field(default_factory=lambda: ["json"])
task_track_started: bool = True
task_default_queue: str = "default"
task_routes: Optional[Dict[str, str]] = None
worker_concurrency: Optional[int] = None
task_always_eager: bool = Field(default=False, description="If True, tasks execute locally without a broker")
task_eager_propagates: bool = Field(default=True, description="If True, eager tasks propagate exceptions")
[docs]
class CeleryApp(BaseModel):
"""Wrapper around a Celery application instance."""
name: str = Field(default="ccflow", description="Celery app name")
config: CeleryConfig = Field(default_factory=CeleryConfig)
_app: Any = None
[docs]
def get_app(self):
if self._app is None:
self._app = Celery(self.name)
self._app.conf.update(
broker_url=self.config.broker_url,
result_backend=self.config.result_backend,
task_serializer=self.config.task_serializer,
result_serializer=self.config.result_serializer,
accept_content=self.config.accept_content,
task_track_started=self.config.task_track_started,
task_default_queue=self.config.task_default_queue,
)
if self.config.task_routes:
self._app.conf.task_routes = self.config.task_routes
if self.config.worker_concurrency:
self._app.conf.worker_concurrency = self.config.worker_concurrency
if self.config.task_always_eager:
self._app.conf.task_always_eager = True
self._app.conf.task_eager_propagates = self.config.task_eager_propagates
# Register the default task so it can be invoked by name
# (required for eager mode; harmless in broker mode)
from .tasks import execute_model_task
self._app.task(execute_model_task, name="ccflow_celery.tasks.execute_model_task")
return self._app