Skip to content

Local

Local storage module.

LocalStorage

Local storage client.

Source code in devpipe/storage/local.py
class LocalStorage:
    """Local storage client."""

    def __init__(self):  # noqa: D107
        CONFIG.working_dir.mkdir(parents=True, exist_ok=True)
        gitignore = CONFIG.working_dir / ".gitignore"
        if not gitignore.exists():
            with open(gitignore, "w") as f:
                f.write("*")

    def read(self, storage_key: str) -> bytes:
        """Read data from storage.

        Args:
            storage_key (str): Storage key.

        Returns:
            Stored data.
        """
        filename = CONFIG.working_dir / storage_key
        with open(filename, "rb") as f:
            return f.read()

    def save(self, storage_key: str, data: bytes) -> None:
        """Save data to storage.

        Args:
            storage_key (str): Storage key.
            data (bytes): Data to store.
        """
        filename = CONFIG.working_dir / storage_key
        filename.parent.mkdir(parents=True, exist_ok=True)
        with open(filename, "wb") as f:
            f.write(data)

read(storage_key)

Read data from storage.

Parameters:

Name Type Description Default
storage_key str

Storage key.

required

Returns:

Type Description
bytes

Stored data.

Source code in devpipe/storage/local.py
def read(self, storage_key: str) -> bytes:
    """Read data from storage.

    Args:
        storage_key (str): Storage key.

    Returns:
        Stored data.
    """
    filename = CONFIG.working_dir / storage_key
    with open(filename, "rb") as f:
        return f.read()

save(storage_key, data)

Save data to storage.

Parameters:

Name Type Description Default
storage_key str

Storage key.

required
data bytes

Data to store.

required
Source code in devpipe/storage/local.py
def save(self, storage_key: str, data: bytes) -> None:
    """Save data to storage.

    Args:
        storage_key (str): Storage key.
        data (bytes): Data to store.
    """
    filename = CONFIG.working_dir / storage_key
    filename.parent.mkdir(parents=True, exist_ok=True)
    with open(filename, "wb") as f:
        f.write(data)