Skip to content

Pipeline

Pipeline decorator module.

pipeline(entrypoint=None, name=None, rerun=False, cache=True)

Pipeline decorator.

Parameters:

Name Type Description Default
entrypoint Optional[Callable | Awaitable]

Pipeline entrypoint.

None
name Optional[str]

Pipeline name. Defaults to None. If not provided, the function name will be used.

None
rerun bool

Re-run pipeline even if cache exists.

False
cache bool

Use cache.

True

Returns:

Type Description
Callable | Awaitable

Decorator or decorated function.

Example
import devpipe as dp

# Decorator without arguments
@dp.pipeline
def my_pipeline():
    ...

# Decorator with arguments, equivalent to the previous example
@dp.pipeline(name="my_pipeline", rerun=False, cache=True)
def my_pipeline():
    ...
Source code in devpipe/core/pipeline.py
def pipeline(
    entrypoint: Optional[Callable | Awaitable] = None,
    name: Optional[str] = None,
    rerun: bool = False,
    cache: bool = True,
) -> Callable | Awaitable:
    """Pipeline decorator.

    Args:
        entrypoint (Optional[Callable | Awaitable], optional): Pipeline entrypoint.
        name (Optional[str], optional): Pipeline name. Defaults to None. If
            not provided, the function name will be used.
        rerun (bool, optional): Re-run pipeline even if cache exists.
        cache (bool, optional): Use cache.

    Returns:
        Decorator or decorated function.

    Example:
        ```python
        import devpipe as dp

        # Decorator without arguments
        @dp.pipeline
        def my_pipeline():
            ...

        # Decorator with arguments, equivalent to the previous example
        @dp.pipeline(name="my_pipeline", rerun=False, cache=True)
        def my_pipeline():
            ...
        ```
    """

    def decorator(fn: Callable | Awaitable) -> Callable | Awaitable:
        pname = name or f"{fn.__name__}"

        if not iscoroutinefunction(fn):

            @wraps(fn)
            def wrapper(*args, **kwargs):
                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    pipe = Pipeline.get(pname, session)
                    if not cache:
                        logger.debug(
                            f"Pipeline `{pname}` is not using cache. "
                            f"Returning pipeline results directly."
                        )
                        pipe_exec = PipelineExecution(pipeline=pipe)
                        return pipe_exec._run(fn, *args, **kwargs)
                    inputs, i_bytes = PipelineInputs.from_object((args, kwargs))
                    pipe = Pipeline.get(pname, session)
                    if not rerun:
                        pipe_exec = PipelineExecution.get(pipe, inputs, session)
                        if pipe_exec:
                            logger.info(f"Cache hit for {repr(pipe)}. ")
                            logger.info(f"Returning from {repr(pipe_exec)}.")
                            return pipe_exec.outputs.read()

                pipe_exec = PipelineExecution(pipeline=pipe)
                logger.info(f"Starting execution for {repr(pipe)}.")
                result = pipe_exec._run(fn, *args, **kwargs)
                logger.info(f"Execution finished for {repr(pipe)}.")

                outputs, o_bytes = PipelineOutputs.from_object(result)
                logger.debug(f"Saving inputs for {repr(pipe_exec)}.")
                inputs.save(i_bytes)
                logger.debug(f"Saving outputs for {repr(pipe_exec)}.")
                outputs.save(o_bytes)
                pipe_exec.inputs = inputs
                pipe_exec.outputs = outputs
                logger.debug(f"Saving {repr(pipe_exec)} to database.")
                with _get_session() as session:
                    session.add(pipe_exec)
                    session.commit()

                return result

        else:

            @wraps(fn)
            async def wrapper(*args, **kwargs):
                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    pipe = Pipeline.get(pname, session)
                    if not cache:
                        logger.debug(
                            f"Pipeline `{pname}` is not using cache. "
                            f"Returning pipeline results directly."
                        )
                        pipe_exec = PipelineExecution(pipeline=pipe)
                        return await pipe_exec._arun(fn, *args, **kwargs)
                    inputs, i_bytes = PipelineInputs.from_object((args, kwargs))
                    pipe = Pipeline.get(pname, session)
                    if not rerun:
                        pipe_exec = PipelineExecution.get(pipe, inputs, session)
                        if pipe_exec:
                            logger.info(f"Cache hit for pipeline {pipe.uuid}.")
                            return pipe_exec.outputs.read()

                pipe_exec = PipelineExecution(pipeline=pipe)
                logger.info(f"Starting pipeline execution {pipe_exec.uuid}.")
                result = await pipe_exec._arun(fn, *args, **kwargs)
                logger.info(f"Pipeline execution {pipe_exec.uuid} finished.")

                outputs, o_bytes = PipelineOutputs.from_object(result)
                inputs.save(i_bytes)
                outputs.save(o_bytes)
                pipe_exec.inputs = inputs
                pipe_exec.outputs = outputs
                with _get_session() as session:
                    session.add(pipe_exec)
                    session.commit()

                return result

        return wrapper

    return decorator(entrypoint) if entrypoint else decorator