Source code for ccflow_celery.tasks

import importlib
from logging import getLogger

_log = getLogger(__name__)

__all__ = ("execute_model_task",)


[docs] def execute_model_task(model_class: str, model_config: dict, context_class: str, context_config: dict) -> dict: """Execute a ccflow CallableModel on a Celery worker. This is the Celery task function that reconstructs a model and context from their serialized configurations and executes the model. Args: model_class: Fully qualified class name (e.g., "ccflow_s3.S3Model") model_config: Dict of model configuration (pydantic model_dump) context_class: Fully qualified class name for the context context_config: Dict of context configuration (pydantic model_dump) Returns: Dict representation of the result (model_dump) """ def _import_class(class_path: str): module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) model_cls = _import_class(model_class) context_cls = _import_class(context_class) model = model_cls(**model_config) context = context_cls(**context_config) _log.info("Executing %s with context %s", model_class, context_class) result = model(context) if hasattr(result, "model_dump"): # Exclude type_ to avoid ccflow's polymorphic validator interfering on reconstruction return result.model_dump(exclude={"type_"}) return {"value": result}