Skip to content

Reference

devpipe is a simple pipeline framework for Python.

pipeline(entrypoint=None, name=None, rerun=False, cache=True)

Pipeline decorator.

Parameters:

Name Type Description Default
entrypoint Optional[Callable | Awaitable]

Pipeline entrypoint.

None
name Optional[str]

Pipeline name. Defaults to None. If not provided, the function name will be used.

None
rerun bool

Re-run pipeline even if cache exists.

False
cache bool

Use cache.

True

Returns:

Type Description
Callable | Awaitable

Decorator or decorated function.

Example
import devpipe as dp

# Decorator without arguments
@dp.pipeline
def my_pipeline():
    ...

# Decorator with arguments, equivalent to the previous example
@dp.pipeline(name="my_pipeline", rerun=False, cache=True)
def my_pipeline():
    ...
Source code in devpipe/core/pipeline.py
def pipeline(
    entrypoint: Optional[Callable | Awaitable] = None,
    name: Optional[str] = None,
    rerun: bool = False,
    cache: bool = True,
) -> Callable | Awaitable:
    """Pipeline decorator.

    Args:
        entrypoint (Optional[Callable | Awaitable], optional): Pipeline entrypoint.
        name (Optional[str], optional): Pipeline name. Defaults to None. If
            not provided, the function name will be used.
        rerun (bool, optional): Re-run pipeline even if cache exists.
        cache (bool, optional): Use cache.

    Returns:
        Decorator or decorated function.

    Example:
        ```python
        import devpipe as dp

        # Decorator without arguments
        @dp.pipeline
        def my_pipeline():
            ...

        # Decorator with arguments, equivalent to the previous example
        @dp.pipeline(name="my_pipeline", rerun=False, cache=True)
        def my_pipeline():
            ...
        ```
    """

    def decorator(fn: Callable | Awaitable) -> Callable | Awaitable:
        pname = name or f"{fn.__name__}"

        if not iscoroutinefunction(fn):

            @wraps(fn)
            def wrapper(*args, **kwargs):
                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    pipe = Pipeline.get(pname, session)
                    if not cache:
                        logger.debug(
                            f"Pipeline `{pname}` is not using cache. "
                            f"Returning pipeline results directly."
                        )
                        pipe_exec = PipelineExecution(pipeline=pipe)
                        return pipe_exec._run(fn, *args, **kwargs)
                    inputs, i_bytes = PipelineInputs.from_object((args, kwargs))
                    pipe = Pipeline.get(pname, session)
                    if not rerun:
                        pipe_exec = PipelineExecution.get(pipe, inputs, session)
                        if pipe_exec:
                            logger.info(f"Cache hit for {repr(pipe)}. ")
                            logger.info(f"Returning from {repr(pipe_exec)}.")
                            return pipe_exec.outputs.read()

                pipe_exec = PipelineExecution(pipeline=pipe)
                logger.info(f"Starting execution for {repr(pipe)}.")
                result = pipe_exec._run(fn, *args, **kwargs)
                logger.info(f"Execution finished for {repr(pipe)}.")

                outputs, o_bytes = PipelineOutputs.from_object(result)
                logger.debug(f"Saving inputs for {repr(pipe_exec)}.")
                inputs.save(i_bytes)
                logger.debug(f"Saving outputs for {repr(pipe_exec)}.")
                outputs.save(o_bytes)
                pipe_exec.inputs = inputs
                pipe_exec.outputs = outputs
                logger.debug(f"Saving {repr(pipe_exec)} to database.")
                with _get_session() as session:
                    session.add(pipe_exec)
                    session.commit()

                return result

        else:

            @wraps(fn)
            async def wrapper(*args, **kwargs):
                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    pipe = Pipeline.get(pname, session)
                    if not cache:
                        logger.debug(
                            f"Pipeline `{pname}` is not using cache. "
                            f"Returning pipeline results directly."
                        )
                        pipe_exec = PipelineExecution(pipeline=pipe)
                        return await pipe_exec._arun(fn, *args, **kwargs)
                    inputs, i_bytes = PipelineInputs.from_object((args, kwargs))
                    pipe = Pipeline.get(pname, session)
                    if not rerun:
                        pipe_exec = PipelineExecution.get(pipe, inputs, session)
                        if pipe_exec:
                            logger.info(f"Cache hit for pipeline {pipe.uuid}.")
                            return pipe_exec.outputs.read()

                pipe_exec = PipelineExecution(pipeline=pipe)
                logger.info(f"Starting pipeline execution {pipe_exec.uuid}.")
                result = await pipe_exec._arun(fn, *args, **kwargs)
                logger.info(f"Pipeline execution {pipe_exec.uuid} finished.")

                outputs, o_bytes = PipelineOutputs.from_object(result)
                inputs.save(i_bytes)
                outputs.save(o_bytes)
                pipe_exec.inputs = inputs
                pipe_exec.outputs = outputs
                with _get_session() as session:
                    session.add(pipe_exec)
                    session.commit()

                return result

        return wrapper

    return decorator(entrypoint) if entrypoint else decorator

step(entrypoint=None, name=None, rerun=False, cache=True)

Step decorator. Pico

Parameters:

Name Type Description Default
entrypoint Optional[Callable | Awaitable]

Step entrypoint.

None
name Optional[str]

Step name. Defaults to None. If not provided, the function name will be used.

None
rerun bool

Re-run step even if cache exists.

False
cache bool

Use cache.

True

Returns:

Type Description
Callable | Awaitable

Decorator or decorated function.

Example
import devpipe as dp

# Decorator without arguments
@dp.step
def my_step():
    ...

# Decorator with arguments, equivalent to the previous example
@dp.step(name="my_step", rerun=False, cache=True)
def my_step():
    ...

Info

A step with rerun=True will not run if there is a cache hit for its parent pipeline. This is because the pipeline cache is checked first.

Warning

A step will not be cached if it is not called from a pipeline.

Source code in devpipe/core/step.py
def step(
    entrypoint: Optional[Callable | Awaitable] = None,
    name: Optional[str] = None,
    rerun: bool = False,
    cache: bool = True,
) -> Callable | Awaitable:
    """Step decorator. Pico

    Args:
        entrypoint (Optional[Callable | Awaitable], optional): Step entrypoint.
        name (Optional[str], optional): Step name. Defaults to None. If
            not provided, the function name will be used.
        rerun (bool, optional): Re-run step even if cache exists.
        cache (bool, optional): Use cache.

    Returns:
        Decorator or decorated function.

    Example:
        ```python
        import devpipe as dp

        # Decorator without arguments
        @dp.step
        def my_step():
            ...

        # Decorator with arguments, equivalent to the previous example
        @dp.step(name="my_step", rerun=False, cache=True)
        def my_step():
            ...
        ```

    !!! info
        A step with `rerun=True` will not run if there is a cache hit for its
        parent pipeline. This is because the pipeline cache is checked first.

    !!! warning
        A step will not be cached if it is not called from a pipeline.
    """

    def decorator(fn: Callable | Awaitable) -> Callable | Awaitable:
        sname = name or f"{fn.__name__}"

        if not iscoroutinefunction(fn):

            @wraps(fn)
            def wrapper(*args, **kwargs):
                pipe_exec = PipelineExecution._from_context()
                if pipe_exec is None:
                    logger.warning(
                        f"Step `{sname}` not called from a pipeline. "
                        f"The result will not be cached."
                    )
                    return fn(*args, **kwargs)

                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    step = Step.get(pipe_exec.pipeline, sname, session)
                    if not cache:
                        logger.debug(
                            f"Step `{sname}` is not using cache. "
                            f"Returning step results directly."
                        )
                        return fn(*args, **kwargs)
                    inputs, i_bytes = StepInputs.from_object((args, kwargs))
                    step = Step.get(pipe_exec.pipeline, sname, session)
                    if not rerun:
                        step_exec = StepExecution.get(step, inputs, session)
                        if step_exec:
                            logger.info(f"Cache hit for {repr(step)}.")
                            logger.info(f"Returning from {repr(step_exec)}.")
                            pipe_exec.step_executions.append(step_exec)
                            return step_exec.outputs.read()

                step_exec = StepExecution(step_id=step.uuid)
                logger.info(f"Starting execution for {repr(step)}.")
                result = step_exec._run(fn, *args, **kwargs)
                logger.info(f"Execution finished for {repr(step)}.")

                outputs, o_bytes = StepOutputs.from_object(result)
                logger.debug(f"Saving inputs for {repr(step_exec)}.")
                inputs.save(i_bytes)
                logger.debug(f"Saving outputs for {repr(step_exec)}.")
                outputs.save(o_bytes)
                step_exec.inputs = inputs
                step_exec.outputs = outputs
                logger.debug(f"Saving {repr(step_exec)} to database.")
                with _get_session() as session:
                    session.add(step_exec)
                    session.commit()
                    session.refresh(step_exec)

                pipe_exec.step_executions.append(step_exec)

                return result

        else:

            @wraps(fn)
            async def wrapper(*args, **kwargs):
                pipe_exec = PipelineExecution._from_context()
                if pipe_exec is None:
                    logger.warning(
                        f"Step `{sname}` not called from a pipeline. "
                        f"The result will not be cached."
                    )
                    return await fn(*args, **kwargs)

                _init_devpipe()
                with _get_session(autoflush=False) as session:
                    step = Step.get(pipe_exec.pipeline, sname, session)
                    if not cache:
                        logger.debug(
                            f"Step `{sname}` is not using cache. "
                            f"Returning step results directly."
                        )
                        return await fn(*args, **kwargs)
                    inputs, i_bytes = StepInputs.from_object((args, kwargs))
                    step = Step.get(pipe_exec.pipeline, sname, session)
                    if not rerun:
                        step_exec = StepExecution.get(step, inputs, session)
                        if step_exec:
                            logger.info(f"Cache hit for step {step.uuid}.")
                            pipe_exec.step_executions.append(step_exec)
                            return step_exec.outputs.read()

                step_exec = StepExecution(step_id=step.uuid)
                logger.info(f"Starting step execution {step_exec.uuid}.")
                result = await step_exec._arun(fn, *args, **kwargs)
                logger.info(f"Step execution {step_exec.uuid} finished.")

                outputs, o_bytes = StepOutputs.from_object(result)
                inputs.save(i_bytes)
                outputs.save(o_bytes)
                step_exec.inputs = inputs
                step_exec.outputs = outputs
                with _get_session() as session:
                    session.add(step_exec)
                    session.commit()
                    session.refresh(step_exec)

                pipe_exec.step_executions.append(step_exec)

                return result

        return wrapper

    return decorator(entrypoint) if entrypoint else decorator