Source code for ccflow_celery.examples.fibonacci

"""Fibonacci example using ccflow-celery with local (eager) execution.

Demonstrates:
- Defining a CallableModel with __deps__ for dependency-graph discovery
- Using CeleryGraphEvaluator in eager mode (no message broker required)
- Graph-parallel dispatch of Fibonacci sub-problems

Usage:
    python -m ccflow_celery.examples.fibonacci
"""

from ccflow import CallableModel, Flow, FlowOptionsOverride, GenericContext, GenericResult
from ccflow.evaluators import MemoryCacheEvaluator, MultiEvaluator

from ccflow_celery import CeleryApp, CeleryConfig, CeleryGraphEvaluator

__all__ = (
    "FibonacciModel",
    "main",
)


[docs] class FibonacciModel(CallableModel): """Compute Fibonacci numbers via ccflow dependency graphs. The graph evaluator calls __deps__ to discover the full dependency tree, then dispatches each node as a Celery task in topological order. """
[docs] @Flow.call def __call__(self, context: GenericContext[int]) -> GenericResult[int]: n = context.value if n <= 1: return GenericResult[int](value=n) a = self(GenericContext[int](value=n - 1)).value b = self(GenericContext[int](value=n - 2)).value return GenericResult[int](value=a + b)
[docs] @Flow.deps def __deps__(self, context: GenericContext[int]): if context.value <= 1: return [] return [ ( self, [ GenericContext[int](value=context.value - 2), GenericContext[int](value=context.value - 1), ], ) ]
[docs] def main(): # Eager mode: tasks run in-process, no Redis/RabbitMQ needed config = CeleryConfig( broker_url="memory://", result_backend="cache+memory://", task_always_eager=True, ) app = CeleryApp(name="fibonacci", config=config) evaluator = MultiEvaluator( evaluators=[ CeleryGraphEvaluator(app=app), MemoryCacheEvaluator(), ] ) model = FibonacciModel() with FlowOptionsOverride(options={"cacheable": True, "evaluator": evaluator}): for n in range(11): result = model(GenericContext[int](value=n)) print(f"fib({n}) = {result.value}")
if __name__ == "__main__": main()