Skip to content

Base

Base models module.

BaseArtifact

Bases: BaseSQLModel

Base artifact model.

Attributes:

Name Type Description
storage_key str

Storage key.

size int

Size.

hash str

Hash.

Source code in devpipe/models/base.py
class BaseArtifact(BaseSQLModel):
    """Base artifact model.

    Attributes:
        storage_key (str): Storage key.
        size (int): Size.
        hash (str): Hash.
    """

    storage_key: str = Field(max_length=1024)
    size: int = Field(ge=0)
    hash: str = Field(max_length=64)

    @classmethod
    def from_object(cls, obj: Any) -> tuple["BaseArtifact", bytes]:
        """Create artifact from object.

        Args:
            obj: Object.

        Returns:
            Artifact.
            Pickled object bytes.
        """
        bytes_obj, obj_hash = cls._pickle_and_hash(obj)
        artifact = cls(
            storage_key=f"{uuid4().hex}.pkl",
            size=len(bytes_obj),
            hash=obj_hash,
        )
        return artifact, bytes_obj

    def read(self) -> Any:
        """Read object from storage.

        Returns:
            Object.
        """
        storage = _get_storage()
        outputs = storage.read(self.storage_key)
        return self._unpickle(outputs)

    def save(self, obj_bytes: bytes) -> "BaseArtifact":
        """Save object to storage.

        Args:
            obj_bytes: Object bytes.

        Returns:
            Artifact.
        """
        storage = _get_storage()
        storage.save(self.storage_key, obj_bytes)
        return self

    @staticmethod
    def _pickle_and_hash(obj: Any) -> tuple[bytes, str]:
        """Pickle an object and hash it.

        Args:
            obj (Any): _description_

        Returns:
            Pickled object.
            Object hash.
        """
        pickled_obj = pickle.dumps(obj)
        obj_hash = sha256(pickled_obj).hexdigest()
        return pickled_obj, obj_hash

    @staticmethod
    def _unpickle(obj: bytes) -> Any:
        """Unpickle an object pickled by `pickle_and_hash`.

        Args:
            obj (bytes): Pickled object.

        Returns:
            Unpickled object.
        """
        return pickle.loads(obj)

from_object(obj) classmethod

Create artifact from object.

Parameters:

Name Type Description Default
obj Any

Object.

required

Returns:

Type Description
'BaseArtifact'

Artifact.

bytes

Pickled object bytes.

Source code in devpipe/models/base.py
@classmethod
def from_object(cls, obj: Any) -> tuple["BaseArtifact", bytes]:
    """Create artifact from object.

    Args:
        obj: Object.

    Returns:
        Artifact.
        Pickled object bytes.
    """
    bytes_obj, obj_hash = cls._pickle_and_hash(obj)
    artifact = cls(
        storage_key=f"{uuid4().hex}.pkl",
        size=len(bytes_obj),
        hash=obj_hash,
    )
    return artifact, bytes_obj

read()

Read object from storage.

Returns:

Type Description
Any

Object.

Source code in devpipe/models/base.py
def read(self) -> Any:
    """Read object from storage.

    Returns:
        Object.
    """
    storage = _get_storage()
    outputs = storage.read(self.storage_key)
    return self._unpickle(outputs)

save(obj_bytes)

Save object to storage.

Parameters:

Name Type Description Default
obj_bytes bytes

Object bytes.

required

Returns:

Type Description
'BaseArtifact'

Artifact.

Source code in devpipe/models/base.py
def save(self, obj_bytes: bytes) -> "BaseArtifact":
    """Save object to storage.

    Args:
        obj_bytes: Object bytes.

    Returns:
        Artifact.
    """
    storage = _get_storage()
    storage.save(self.storage_key, obj_bytes)
    return self

BaseExecution

Bases: BaseSQLModel

Base execution model.

Attributes:

Name Type Description
started_at Optional[datetime]

Execution start time.

finished_at Optional[datetime]

Execution finish time.

Source code in devpipe/models/base.py
class BaseExecution(BaseSQLModel):
    """Base execution model.

    Attributes:
        started_at (Optional[datetime]): Execution start time.
        finished_at (Optional[datetime]): Execution finish time.
    """

    started_at: Optional[datetime] = Field(default=None)
    finished_at: Optional[datetime] = Field(default=None)

    def _run(
        self,
        entrypoint: Callable,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        """Run entrypoint.

        Args:
            entrypoint (Callable): Entrypoint.
            *args (Any): Arguments for entrypoint.
            **kwargs (Any): Keyword arguments for entrypoint.

        Returns:
            Result of the entrypoint.
        """
        self.started_at = datetime.now(timezone.utc)
        result = entrypoint(*args, **kwargs)
        self.finished_at = datetime.now(timezone.utc)
        return result

    async def _arun(
        self,
        entrypoint: Awaitable,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        """Run entrypoint.

        Args:
            entrypoint (Awaitable): Entrypoint.
            *args (Any): Arguments for entrypoint.
            **kwargs (Any): Keyword arguments for entrypoint.

        Returns:
            Result of the entrypoint.
        """
        self.started_at = datetime.now(timezone.utc)
        result = await entrypoint(*args, **kwargs)
        self.finished_at = datetime.now(timezone.utc)
        return result

    @classmethod
    def _from_context(cls) -> Optional["BaseExecution"]:
        """Get execution from the current stack."""
        for rec in inspect.stack():
            if rec.function not in ["_run", "_arun"]:
                continue
            execution = rec.frame.f_locals.get("self")
            if isinstance(execution, cls):
                return execution

BaseSQLModel

Bases: SQLModel

Base SQL model.

Attributes:

Name Type Description
uuid str

UUID.

created_at datetime

Created at.

Source code in devpipe/models/base.py
class BaseSQLModel(SQLModel):
    """Base SQL model.

    Attributes:
        uuid (str): UUID.
        created_at (datetime): Created at.
    """

    uuid: str = Field(
        default_factory=lambda: uuid4().hex,
        max_length=36,
        primary_key=True,
    )
    created_at: datetime = Field(
        sa_column_kwargs={
            "server_default": func.now(),
        },
    )

Tables

Bases: BaseModel

Tables names.

Attributes:

Name Type Description
pipe str

Pipeline table.

pipe_exec str

Pipeline execution table.

pipe_inps str

Pipeline inputs table.

pipe_outs str

Pipeline outputs table.

step str

Step table.

step_exec str

Step execution table.

step_inps str

Step inputs table.

step_outs str

Step outputs table.

pipe_step_link str

Link table for pipelines and steps.

pipe_step_exec_link str

Link table for pipeline executions and step executions.

Source code in devpipe/models/base.py
class Tables(BaseModel):
    """Tables names.

    Attributes:
        pipe (str): Pipeline table.
        pipe_exec (str): Pipeline execution table.
        pipe_inps (str): Pipeline inputs table.
        pipe_outs (str): Pipeline outputs table.
        step (str): Step table.
        step_exec (str): Step execution table.
        step_inps (str): Step inputs table.
        step_outs (str): Step outputs table.
        pipe_step_link (str): Link table for pipelines and steps.
        pipe_step_exec_link (str): Link table for pipeline executions and
            step executions.
    """

    pipe: str = "pipeline"
    pipe_exec: str = "pipeline_execution"
    pipe_inps: str = "pipeline_inputs"
    pipe_outs: str = "pipeline_outputs"
    step: str = "step"
    step_exec: str = "step_execution"
    step_inps: str = "step_inputs"
    step_outs: str = "step_outputs"
    pipe_step_link: str = "link_pipeline_step"
    pipe_step_exec_link: str = "link_pipeline_step_executions"

    @model_validator(mode="after")
    def _add_table_prefix(self):
        for table in self.model_fields:
            base = getattr(self, table)
            name = f"{CONFIG.database_prefix}_{base}"
            setattr(self, table, name)
        return self