Skip to content

API Reference

openfang

OpenFang - AI agent gateway with pluggable adapters.

Usage

from src.spikes.deps import Deps, User, agent, chat, create_deps

Run modes

python -m src.spikes.deps # CLI help python -m src.spikes.deps chat # Interactive chat python -m src.spikes.deps telegram # Telegram bot python -m src.spikes.deps cron # Cron runner

FileSessions

File-based session storage using JSONL format.

Each session is stored as a separate .jsonl file with: - Header line: {"type": "session", "version": 1, "id": "...", "timestamp": "..."} - Message lines: {"type": "message", "message": {...}}

Similar to OpenClaw's transcript format.

Source code in src/openfang/capabilities/file_sessions.py
class FileSessions:
    """File-based session storage using JSONL format.

    Each session is stored as a separate .jsonl file with:
    - Header line: {"type": "session", "version": 1, "id": "...", "timestamp": "..."}
    - Message lines: {"type": "message", "message": {...}}

    Similar to OpenClaw's transcript format.
    """

    def __init__(self, directory: str | Path) -> None:
        """Initialize file-based sessions.

        Args:
            directory: Directory to store session files (e.g. ~/.openfang/sessions)
        """
        self.directory = Path(directory).expanduser()
        self.directory.mkdir(parents=True, exist_ok=True)

    def _session_path(self, session_id: str) -> Path:
        """Get the file path for a session.

        Sanitizes the session_id to be filesystem-safe.
        """
        # Replace unsafe characters with underscores
        safe_id = session_id.replace("/", "_").replace("\\", "_").replace(":", "_")
        return self.directory / f"{safe_id}.jsonl"

    async def get(self, session_id: str) -> list[ModelMessage]:
        """Get all messages for a session."""
        path = self._session_path(session_id)
        if not path.exists():
            return []

        messages: list[ModelMessage] = []
        try:
            with path.open() as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    record = json.loads(line)
                    if record.get("type") == "message":
                        # Deserialize the message using pydantic-ai's adapter
                        msg_data = record.get("message")
                        if msg_data:
                            # Wrap in list for the type adapter, then extract
                            parsed = _message_adapter.validate_python([msg_data])
                            messages.extend(parsed)
        except Exception as e:
            logger.error(f"Error reading session {session_id}: {e}")
            return []

        return messages

    async def save(self, session_id: str, messages: list[ModelMessage]) -> None:
        """Save (replace) all messages for a session."""
        path = self._session_path(session_id)

        try:
            with path.open("w") as f:
                # Write header
                header = {
                    "type": "session",
                    "version": TRANSCRIPT_VERSION,
                    "id": session_id,
                    "timestamp": datetime.now(UTC).isoformat(),
                }
                f.write(json.dumps(header) + "\n")

                # Write messages - use dump_json for proper datetime handling
                for msg in messages:
                    # Serialize single message by wrapping in list
                    msg_json = _message_adapter.dump_json([msg])
                    # Parse back to get the dict, then extract first item
                    msg_data = json.loads(msg_json)[0]
                    record = {"type": "message", "message": msg_data}
                    f.write(json.dumps(record) + "\n")
        except Exception as e:
            logger.error(f"Error saving session {session_id}: {e}")
            raise

    async def append(self, session_id: str, messages: list[ModelMessage]) -> None:
        """Append messages to a session."""
        path = self._session_path(session_id)

        try:
            # If file doesn't exist, write header first
            if not path.exists():
                with path.open("w") as f:
                    header = {
                        "type": "session",
                        "version": TRANSCRIPT_VERSION,
                        "id": session_id,
                        "timestamp": datetime.now(UTC).isoformat(),
                    }
                    f.write(json.dumps(header) + "\n")

            # Append messages
            with path.open("a") as f:
                for msg in messages:
                    msg_json = _message_adapter.dump_json([msg])
                    msg_data = json.loads(msg_json)[0]
                    record = {"type": "message", "message": msg_data}
                    f.write(json.dumps(record) + "\n")
        except Exception as e:
            logger.error(f"Error appending to session {session_id}: {e}")
            raise

    async def delete(self, session_id: str) -> None:
        """Delete a session and its file."""
        path = self._session_path(session_id)
        if path.exists():
            path.unlink()

    async def list(self, user_id: int | None = None) -> list[str]:
        """List all session IDs.

        Note: user_id filtering is not supported for file-based storage.
        All sessions are returned.
        """
        sessions = []
        for path in self.directory.glob("*.jsonl"):
            # Read the header to get the actual session ID
            try:
                with path.open() as f:
                    first_line = f.readline().strip()
                    if first_line:
                        header = json.loads(first_line)
                        if header.get("type") == "session":
                            sessions.append(header.get("id", path.stem))
                        else:
                            # Old format or no header, use filename
                            sessions.append(path.stem)
            except Exception:
                # Fallback to filename
                sessions.append(path.stem)

        return sessions

__init__(directory)

Initialize file-based sessions.

Parameters:

Name Type Description Default
directory str | Path

Directory to store session files (e.g. ~/.openfang/sessions)

required
Source code in src/openfang/capabilities/file_sessions.py
def __init__(self, directory: str | Path) -> None:
    """Initialize file-based sessions.

    Args:
        directory: Directory to store session files (e.g. ~/.openfang/sessions)
    """
    self.directory = Path(directory).expanduser()
    self.directory.mkdir(parents=True, exist_ok=True)

get(session_id) async

Get all messages for a session.

Source code in src/openfang/capabilities/file_sessions.py
async def get(self, session_id: str) -> list[ModelMessage]:
    """Get all messages for a session."""
    path = self._session_path(session_id)
    if not path.exists():
        return []

    messages: list[ModelMessage] = []
    try:
        with path.open() as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                record = json.loads(line)
                if record.get("type") == "message":
                    # Deserialize the message using pydantic-ai's adapter
                    msg_data = record.get("message")
                    if msg_data:
                        # Wrap in list for the type adapter, then extract
                        parsed = _message_adapter.validate_python([msg_data])
                        messages.extend(parsed)
    except Exception as e:
        logger.error(f"Error reading session {session_id}: {e}")
        return []

    return messages

save(session_id, messages) async

Save (replace) all messages for a session.

Source code in src/openfang/capabilities/file_sessions.py
async def save(self, session_id: str, messages: list[ModelMessage]) -> None:
    """Save (replace) all messages for a session."""
    path = self._session_path(session_id)

    try:
        with path.open("w") as f:
            # Write header
            header = {
                "type": "session",
                "version": TRANSCRIPT_VERSION,
                "id": session_id,
                "timestamp": datetime.now(UTC).isoformat(),
            }
            f.write(json.dumps(header) + "\n")

            # Write messages - use dump_json for proper datetime handling
            for msg in messages:
                # Serialize single message by wrapping in list
                msg_json = _message_adapter.dump_json([msg])
                # Parse back to get the dict, then extract first item
                msg_data = json.loads(msg_json)[0]
                record = {"type": "message", "message": msg_data}
                f.write(json.dumps(record) + "\n")
    except Exception as e:
        logger.error(f"Error saving session {session_id}: {e}")
        raise

append(session_id, messages) async

Append messages to a session.

Source code in src/openfang/capabilities/file_sessions.py
async def append(self, session_id: str, messages: list[ModelMessage]) -> None:
    """Append messages to a session."""
    path = self._session_path(session_id)

    try:
        # If file doesn't exist, write header first
        if not path.exists():
            with path.open("w") as f:
                header = {
                    "type": "session",
                    "version": TRANSCRIPT_VERSION,
                    "id": session_id,
                    "timestamp": datetime.now(UTC).isoformat(),
                }
                f.write(json.dumps(header) + "\n")

        # Append messages
        with path.open("a") as f:
            for msg in messages:
                msg_json = _message_adapter.dump_json([msg])
                msg_data = json.loads(msg_json)[0]
                record = {"type": "message", "message": msg_data}
                f.write(json.dumps(record) + "\n")
    except Exception as e:
        logger.error(f"Error appending to session {session_id}: {e}")
        raise

delete(session_id) async

Delete a session and its file.

Source code in src/openfang/capabilities/file_sessions.py
async def delete(self, session_id: str) -> None:
    """Delete a session and its file."""
    path = self._session_path(session_id)
    if path.exists():
        path.unlink()

list(user_id=None) async

List all session IDs.

Note: user_id filtering is not supported for file-based storage. All sessions are returned.

Source code in src/openfang/capabilities/file_sessions.py
async def list(self, user_id: int | None = None) -> list[str]:
    """List all session IDs.

    Note: user_id filtering is not supported for file-based storage.
    All sessions are returned.
    """
    sessions = []
    for path in self.directory.glob("*.jsonl"):
        # Read the header to get the actual session ID
        try:
            with path.open() as f:
                first_line = f.readline().strip()
                if first_line:
                    header = json.loads(first_line)
                    if header.get("type") == "session":
                        sessions.append(header.get("id", path.stem))
                    else:
                        # Old format or no header, use filename
                        sessions.append(path.stem)
        except Exception:
            # Fallback to filename
            sessions.append(path.stem)

    return sessions

InMemoryCron

In-memory cron job storage.

Source code in src/openfang/capabilities/cron.py
class InMemoryCron:
    """In-memory cron job storage."""

    def __init__(self):
        self._jobs: dict[str, CronJob] = {}
        self._counter = 0

    async def create(self, schedule: str, task: str, user_id: int | None = None) -> CronJob:
        self._counter += 1
        job = CronJob(
            id=f"job-{self._counter}",
            schedule=schedule,
            task=task,
            enabled=True,
            created_by=user_id,
        )
        self._jobs[job.id] = job
        return job

    async def get(self, job_id: str) -> CronJob | None:
        return self._jobs.get(job_id)

    async def list(self, user_id: int | None = None) -> list[CronJob]:
        jobs = list(self._jobs.values())
        if user_id is not None:
            jobs = [j for j in jobs if j.created_by == user_id]
        return jobs

    async def delete(self, job_id: str) -> bool:
        return self._jobs.pop(job_id, None) is not None

    async def enable(self, job_id: str) -> bool:
        job = self._jobs.get(job_id)
        if job:
            job.enabled = True
            return True
        return False

    async def disable(self, job_id: str) -> bool:
        job = self._jobs.get(job_id)
        if job:
            job.enabled = False
            return True
        return False

InMemoryMemory

In-memory key-value store.

Source code in src/openfang/capabilities/memory.py
class InMemoryMemory:
    """In-memory key-value store."""

    def __init__(self):
        self._store: dict[str, str] = {}

    async def get(self, key: str) -> str | None:
        return self._store.get(key)

    async def set(self, key: str, value: str) -> None:
        self._store[key] = value

    async def delete(self, key: str) -> None:
        self._store.pop(key, None)

    async def keys(self, prefix: str = "") -> list[str]:
        return [k for k in self._store if k.startswith(prefix)]

InMemorySessions

In-memory session storage.

Source code in src/openfang/capabilities/sessions.py
class InMemorySessions:
    """In-memory session storage."""

    def __init__(self):
        self._sessions: dict[str, list[ModelMessage]] = {}
        self._user_sessions: dict[int, set[str]] = {}

    async def get(self, session_id: str) -> list[ModelMessage]:
        return self._sessions.get(session_id, [])

    async def save(self, session_id: str, messages: list[ModelMessage]) -> None:
        self._sessions[session_id] = list(messages)

    async def append(self, session_id: str, messages: list[ModelMessage]) -> None:
        if session_id not in self._sessions:
            self._sessions[session_id] = []
        self._sessions[session_id].extend(messages)

    async def delete(self, session_id: str) -> None:
        self._sessions.pop(session_id, None)
        for user_sessions in self._user_sessions.values():
            user_sessions.discard(session_id)

    async def list(self, user_id: int | None = None) -> list[str]:
        if user_id is None:
            return list(self._sessions.keys())
        return list(self._user_sessions.get(user_id, set()))

    def associate_user(self, session_id: str, user_id: int) -> None:
        if user_id not in self._user_sessions:
            self._user_sessions[user_id] = set()
        self._user_sessions[user_id].add(session_id)

LocalFiles

File operations scoped to a root directory.

All paths are resolved relative to root unless absolute. Methods are async for protocol compatibility with remote implementations.

Source code in src/openfang/capabilities/files.py
class LocalFiles:
    """File operations scoped to a root directory.

    All paths are resolved relative to root unless absolute.
    Methods are async for protocol compatibility with remote implementations.
    """

    def __init__(self, root: Path):
        self.root = root

    def _resolve(self, path: str) -> Path:
        p = Path(path)
        return p if p.is_absolute() else self.root / p

    async def read(self, path: str) -> str:
        """Read file contents. Async for protocol compliance."""
        return self._resolve(path).read_text()

    async def write(self, path: str, content: str) -> None:
        """Write content to file, creating parent dirs. Async for protocol compliance."""
        p = self._resolve(path)
        p.parent.mkdir(parents=True, exist_ok=True)
        p.write_text(content)

    async def list(self, pattern: str = "**/*") -> list[str]:
        """List files matching glob pattern. Excludes .git and .venv."""
        return [
            str(p.relative_to(self.root))
            for p in self.root.glob(pattern)
            if p.is_file() and ".git" not in p.parts and ".venv" not in p.parts
        ]

    async def search(self, pattern: str, file_glob: str = "**/*") -> list[Match]:
        """Search file contents with regex. Returns up to MAX_SEARCH_RESULTS matches."""
        matches = []
        regex = re.compile(pattern)
        for p in self.root.glob(file_glob):
            if not p.is_file() or ".git" in p.parts or ".venv" in p.parts:
                continue
            try:
                for i, line in enumerate(p.read_text().splitlines(), 1):
                    if regex.search(line):
                        matches.append(Match(str(p.relative_to(self.root)), i, line.strip()))
            except (UnicodeDecodeError, PermissionError):
                continue
        return matches[:MAX_SEARCH_RESULTS]

read(path) async

Read file contents. Async for protocol compliance.

Source code in src/openfang/capabilities/files.py
async def read(self, path: str) -> str:
    """Read file contents. Async for protocol compliance."""
    return self._resolve(path).read_text()

write(path, content) async

Write content to file, creating parent dirs. Async for protocol compliance.

Source code in src/openfang/capabilities/files.py
async def write(self, path: str, content: str) -> None:
    """Write content to file, creating parent dirs. Async for protocol compliance."""
    p = self._resolve(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(content)

list(pattern='**/*') async

List files matching glob pattern. Excludes .git and .venv.

Source code in src/openfang/capabilities/files.py
async def list(self, pattern: str = "**/*") -> list[str]:
    """List files matching glob pattern. Excludes .git and .venv."""
    return [
        str(p.relative_to(self.root))
        for p in self.root.glob(pattern)
        if p.is_file() and ".git" not in p.parts and ".venv" not in p.parts
    ]

search(pattern, file_glob='**/*') async

Search file contents with regex. Returns up to MAX_SEARCH_RESULTS matches.

Source code in src/openfang/capabilities/files.py
async def search(self, pattern: str, file_glob: str = "**/*") -> list[Match]:
    """Search file contents with regex. Returns up to MAX_SEARCH_RESULTS matches."""
    matches = []
    regex = re.compile(pattern)
    for p in self.root.glob(file_glob):
        if not p.is_file() or ".git" in p.parts or ".venv" in p.parts:
            continue
        try:
            for i, line in enumerate(p.read_text().splitlines(), 1):
                if regex.search(line):
                    matches.append(Match(str(p.relative_to(self.root)), i, line.strip()))
        except (UnicodeDecodeError, PermissionError):
            continue
    return matches[:MAX_SEARCH_RESULTS]

LocalProjects

Manage multiple local projects.

Source code in src/openfang/capabilities/projects.py
class LocalProjects:
    """Manage multiple local projects."""

    def __init__(self):
        self._projects: dict[str, Path] = {}
        self._current: str | None = None

    async def current(self) -> tuple[str, Path] | None:
        if self._current is None:
            return None
        return (self._current, self._projects[self._current])

    async def switch(self, project_id: str) -> Path:
        if project_id not in self._projects:
            raise KeyError(f"Project not found: {project_id}")
        self._current = project_id
        return self._projects[project_id]

    async def list(self) -> list[str]:
        return list(self._projects.keys())

    async def register(self, project_id: str, path: Path) -> None:
        self._projects[project_id] = path.resolve()
        if self._current is None:
            self._current = project_id

LocalShell

Execute commands locally.

Source code in src/openfang/capabilities/shell.py
class LocalShell:
    """Execute commands locally."""

    def __init__(self, default_cwd: Path | None = None):
        self._cwd = default_cwd or Path.cwd()

    async def exec(self, cmd: str, cwd: Path | None = None) -> Result:
        proc = await asyncio.create_subprocess_shell(
            cmd,
            cwd=cwd or self._cwd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()
        return Result(
            code=proc.returncode or 0,
            stdout=stdout.decode(errors="replace"),
            stderr=stderr.decode(errors="replace"),
        )

PlaywrightSession

Stateful browser session using Playwright.

Source code in src/openfang/capabilities/web.py
class PlaywrightSession:
    """Stateful browser session using Playwright."""

    def __init__(self, playwright, browser, page):
        self._pw = playwright
        self._browser = browser
        self._page = page

    async def goto(self, url: str) -> Page:
        await self._page.goto(url, wait_until="domcontentloaded")
        return Page(
            url=self._page.url,
            title=await self._page.title(),
            text=await self._page.inner_text("body"),
            html=await self._page.content(),
        )

    async def click(self, selector: str) -> Page:
        await self._page.click(selector)
        await self._page.wait_for_load_state("domcontentloaded")
        return Page(
            url=self._page.url,
            title=await self._page.title(),
            text=await self._page.inner_text("body"),
            html=await self._page.content(),
        )

    async def type(self, selector: str, text: str) -> None:
        await self._page.fill(selector, text)

    async def screenshot(self) -> bytes:
        return await self._page.screenshot()

    async def close(self) -> None:
        await self._browser.close()
        await self._pw.stop()

PlaywrightWeb

Web access using Playwright.

Source code in src/openfang/capabilities/web.py
class PlaywrightWeb:
    """Web access using Playwright."""

    async def fetch(self, url: str) -> str:
        async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
            resp = await client.get(url)
            return resp.text

    async def browse(self, url: str) -> Page:
        async with async_playwright() as pw:
            browser = await pw.chromium.launch(headless=True)
            page = await browser.new_page()
            await page.goto(url, wait_until="domcontentloaded")
            result = Page(
                url=page.url,
                title=await page.title(),
                text=await page.inner_text("body"),
                html=await page.content(),
            )
            await browser.close()
            return result

    async def screenshot(self, url: str) -> bytes:
        async with async_playwright() as pw:
            browser = await pw.chromium.launch(headless=True)
            page = await browser.new_page()
            await page.goto(url, wait_until="domcontentloaded")
            data = await page.screenshot()
            await browser.close()
            return data

    async def session(self) -> PlaywrightSession:
        pw = await async_playwright().start()
        browser = await pw.chromium.launch(headless=True)
        page = await browser.new_page()
        return PlaywrightSession(pw, browser, page)

CommsAdapters dataclass

Communication and capabilities.

Source code in src/openfang/deps.py
@dataclass
class CommsAdapters:
    """Communication and capabilities."""

    channels: Channels
    skills: Skills

    @classmethod
    def default(cls) -> CommsAdapters:
        return cls(channels=ChannelRegistry.default(), skills=SkillRegistry.default())

Deps dataclass

All adapters + user context for an agent.

Source code in src/openfang/deps.py
@dataclass
class Deps:
    """All adapters + user context for an agent."""

    user: User
    storage: StorageAdapters
    web_adapters: WebAdapters
    workspace: WorkspaceAdapters
    scheduling: SchedulingAdapters
    comms: CommsAdapters
    current_page: str | None = None
    session_id: str | None = None

    # Subagent context
    is_subagent: bool = False
    """True if this is a subagent execution (prevents recursive spawning)."""

    parent_run_id: str | None = None
    """If subagent, the parent's run ID."""

    subagents: SubagentRegistry | None = None
    """Registry of available subagents."""

    subagent_executor: SubagentExecutor | None = None
    """Executor for spawning subagents."""

    # Session management delegates to web_adapters
    def new_session_id(self) -> str:
        return self.web_adapters.new_session_id()

    def get_session(self, session_id: str) -> WebSession | None:
        return self.web_adapters.get_session(session_id)

    def store_session(self, session_id: str, session: WebSession) -> None:
        self.web_adapters.store_session(session_id, session)

    def remove_session(self, session_id: str) -> WebSession | None:
        return self.web_adapters.remove_session(session_id)

    async def switch_project(self, project_id: str) -> None:
        """Switch to a different project, updating file/shell roots."""
        path = await self.workspace.projects.switch(project_id)
        self.storage = StorageAdapters(
            files=LocalFiles(path),
            memory=self.storage.memory,
            sessions=self.storage.sessions,
        )
        self.workspace = WorkspaceAdapters(
            shell=LocalShell(path),  # nosec B604
            projects=self.workspace.projects,
        )

    async def close(self) -> None:
        await self.web_adapters.close()

is_subagent = False class-attribute instance-attribute

True if this is a subagent execution (prevents recursive spawning).

parent_run_id = None class-attribute instance-attribute

If subagent, the parent's run ID.

subagents = None class-attribute instance-attribute

Registry of available subagents.

subagent_executor = None class-attribute instance-attribute

Executor for spawning subagents.

switch_project(project_id) async

Switch to a different project, updating file/shell roots.

Source code in src/openfang/deps.py
async def switch_project(self, project_id: str) -> None:
    """Switch to a different project, updating file/shell roots."""
    path = await self.workspace.projects.switch(project_id)
    self.storage = StorageAdapters(
        files=LocalFiles(path),
        memory=self.storage.memory,
        sessions=self.storage.sessions,
    )
    self.workspace = WorkspaceAdapters(
        shell=LocalShell(path),  # nosec B604
        projects=self.workspace.projects,
    )

SchedulingAdapters dataclass

Time-based operations.

Source code in src/openfang/deps.py
@dataclass
class SchedulingAdapters:
    """Time-based operations."""

    cron: Cron

    @classmethod
    def default(cls, cron: Cron | None = None) -> SchedulingAdapters:
        return cls(cron=cron or InMemoryCron())

StorageAdapters dataclass

Data persistence adapters.

Source code in src/openfang/deps.py
@dataclass
class StorageAdapters:
    """Data persistence adapters."""

    files: Files
    memory: Memory
    sessions: Sessions

    @classmethod
    def default(cls, root: Path, sessions: Sessions | None = None) -> StorageAdapters:
        if sessions is None:
            sessions = FileSessions(settings.sessions_dir)
        return cls(
            files=LocalFiles(root),
            memory=InMemoryMemory(),
            sessions=sessions,
        )

WebAdapters dataclass

Browser-based operations with session management.

Source code in src/openfang/deps.py
@dataclass
class WebAdapters:
    """Browser-based operations with session management."""

    web: Web
    _sessions: dict[str, WebSession] = field(default_factory=dict)
    _session_counter: int = 0

    @classmethod
    def default(cls) -> WebAdapters:
        return cls(web=PlaywrightWeb())

    def new_session_id(self) -> str:
        self._session_counter += 1
        return f"session-{self._session_counter}"

    def get_session(self, session_id: str) -> WebSession | None:
        return self._sessions.get(session_id)

    def store_session(self, session_id: str, session: WebSession) -> None:
        self._sessions[session_id] = session

    def remove_session(self, session_id: str) -> WebSession | None:
        return self._sessions.pop(session_id, None)

    async def close(self) -> None:
        for session in self._sessions.values():
            await session.close()
        self._sessions.clear()

WorkspaceAdapters dataclass

Local environment adapters.

Source code in src/openfang/deps.py
@dataclass
class WorkspaceAdapters:
    """Local environment adapters."""

    shell: Shell
    projects: Projects

    @classmethod
    def default(cls, root: Path) -> WorkspaceAdapters:
        return cls(shell=LocalShell(root), projects=LocalProjects())  # nosec B604

CronJob dataclass

A scheduled job.

Source code in src/openfang/models.py
@dataclass
class CronJob:
    """A scheduled job."""

    id: str
    schedule: str
    task: str
    enabled: bool = True
    created_by: int | None = None

User dataclass

Current user context.

Source code in src/openfang/models.py
@dataclass
class User:
    """Current user context."""

    id: int
    email: str
    roles: set[str] = field(default_factory=set)
    org_id: int | None = None
    phone: str | None = None
    telegram_id: str | None = None

    def has_role(self, role: str) -> bool:
        return role in self.roles

    @property
    def is_admin(self) -> bool:
        return Role.ADMIN in self.roles

Cron

Bases: Protocol

Schedule and manage cron jobs.

Source code in src/openfang/protocols.py
@runtime_checkable
class Cron(Protocol):
    """Schedule and manage cron jobs."""

    async def create(self, schedule: str, task: str, user_id: int | None = None) -> CronJob:
        """Create a new scheduled job."""
        ...

    async def get(self, job_id: str) -> CronJob | None:
        """Get a job by ID, or None if not found."""
        ...

    async def list(self, user_id: int | None = None) -> list[CronJob]:
        """List all jobs, optionally filtered by user."""
        ...

    async def delete(self, job_id: str) -> bool:
        """Delete a job. Returns True if deleted, False if not found."""
        ...

    async def enable(self, job_id: str) -> bool:
        """Enable a job. Returns True if enabled, False if not found."""
        ...

    async def disable(self, job_id: str) -> bool:
        """Disable a job. Returns True if disabled, False if not found."""
        ...

create(schedule, task, user_id=None) async

Create a new scheduled job.

Source code in src/openfang/protocols.py
async def create(self, schedule: str, task: str, user_id: int | None = None) -> CronJob:
    """Create a new scheduled job."""
    ...

get(job_id) async

Get a job by ID, or None if not found.

Source code in src/openfang/protocols.py
async def get(self, job_id: str) -> CronJob | None:
    """Get a job by ID, or None if not found."""
    ...

list(user_id=None) async

List all jobs, optionally filtered by user.

Source code in src/openfang/protocols.py
async def list(self, user_id: int | None = None) -> list[CronJob]:
    """List all jobs, optionally filtered by user."""
    ...

delete(job_id) async

Delete a job. Returns True if deleted, False if not found.

Source code in src/openfang/protocols.py
async def delete(self, job_id: str) -> bool:
    """Delete a job. Returns True if deleted, False if not found."""
    ...

enable(job_id) async

Enable a job. Returns True if enabled, False if not found.

Source code in src/openfang/protocols.py
async def enable(self, job_id: str) -> bool:
    """Enable a job. Returns True if enabled, False if not found."""
    ...

disable(job_id) async

Disable a job. Returns True if disabled, False if not found.

Source code in src/openfang/protocols.py
async def disable(self, job_id: str) -> bool:
    """Disable a job. Returns True if disabled, False if not found."""
    ...

Files

Bases: Protocol

File operations scoped to a root directory.

Source code in src/openfang/protocols.py
@runtime_checkable
class Files(Protocol):
    """File operations scoped to a root directory."""

    root: Path

    async def read(self, path: str) -> str:
        """Read file contents as text.

        Args:
            path: Relative to root, or absolute path.
        """
        ...

    async def write(self, path: str, content: str) -> None:
        """Write content to file, creating parent directories if needed.

        Args:
            path: Relative to root, or absolute path.
            content: Text content to write.
        """
        ...

    async def list(self, pattern: str = "**/*") -> list[str]:
        """List files matching a glob pattern.

        Args:
            pattern: Glob pattern like "*.py" or "**/*.ts".
        """
        ...

    async def search(self, pattern: str, file_glob: str = "**/*") -> list[Match]:
        """Search file contents using regex.

        Args:
            pattern: Regular expression to search for.
            file_glob: Glob pattern to filter which files to search.
        """
        ...

read(path) async

Read file contents as text.

Parameters:

Name Type Description Default
path str

Relative to root, or absolute path.

required
Source code in src/openfang/protocols.py
async def read(self, path: str) -> str:
    """Read file contents as text.

    Args:
        path: Relative to root, or absolute path.
    """
    ...

write(path, content) async

Write content to file, creating parent directories if needed.

Parameters:

Name Type Description Default
path str

Relative to root, or absolute path.

required
content str

Text content to write.

required
Source code in src/openfang/protocols.py
async def write(self, path: str, content: str) -> None:
    """Write content to file, creating parent directories if needed.

    Args:
        path: Relative to root, or absolute path.
        content: Text content to write.
    """
    ...

list(pattern='**/*') async

List files matching a glob pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern like ".py" or "/.ts".

'**/*'
Source code in src/openfang/protocols.py
async def list(self, pattern: str = "**/*") -> list[str]:
    """List files matching a glob pattern.

    Args:
        pattern: Glob pattern like "*.py" or "**/*.ts".
    """
    ...

search(pattern, file_glob='**/*') async

Search file contents using regex.

Parameters:

Name Type Description Default
pattern str

Regular expression to search for.

required
file_glob str

Glob pattern to filter which files to search.

'**/*'
Source code in src/openfang/protocols.py
async def search(self, pattern: str, file_glob: str = "**/*") -> list[Match]:
    """Search file contents using regex.

    Args:
        pattern: Regular expression to search for.
        file_glob: Glob pattern to filter which files to search.
    """
    ...

Memory

Bases: Protocol

Key-value store.

Source code in src/openfang/protocols.py
@runtime_checkable
class Memory(Protocol):
    """Key-value store."""

    async def get(self, key: str) -> str | None:
        """Retrieve a value by key, or None if not found."""
        ...

    async def set(self, key: str, value: str) -> None:
        """Store a value under the given key."""
        ...

    async def delete(self, key: str) -> None:
        """Delete a key from memory."""
        ...

    async def keys(self, prefix: str = "") -> list[str]:
        """List all keys, optionally filtered by prefix."""
        ...

get(key) async

Retrieve a value by key, or None if not found.

Source code in src/openfang/protocols.py
async def get(self, key: str) -> str | None:
    """Retrieve a value by key, or None if not found."""
    ...

set(key, value) async

Store a value under the given key.

Source code in src/openfang/protocols.py
async def set(self, key: str, value: str) -> None:
    """Store a value under the given key."""
    ...

delete(key) async

Delete a key from memory.

Source code in src/openfang/protocols.py
async def delete(self, key: str) -> None:
    """Delete a key from memory."""
    ...

keys(prefix='') async

List all keys, optionally filtered by prefix.

Source code in src/openfang/protocols.py
async def keys(self, prefix: str = "") -> list[str]:
    """List all keys, optionally filtered by prefix."""
    ...

Projects

Bases: Protocol

Project management.

Source code in src/openfang/protocols.py
@runtime_checkable
class Projects(Protocol):
    """Project management."""

    async def current(self) -> tuple[str, Path] | None:
        """Get the current project ID and path, or None if none selected."""
        ...

    async def switch(self, project_id: str) -> Path:
        """Switch to a project and return its root path."""
        ...

    async def list(self) -> list[str]:
        """List all registered project IDs."""
        ...

    async def register(self, project_id: str, path: Path) -> None:
        """Register a new project with the given ID and path."""
        ...

current() async

Get the current project ID and path, or None if none selected.

Source code in src/openfang/protocols.py
async def current(self) -> tuple[str, Path] | None:
    """Get the current project ID and path, or None if none selected."""
    ...

switch(project_id) async

Switch to a project and return its root path.

Source code in src/openfang/protocols.py
async def switch(self, project_id: str) -> Path:
    """Switch to a project and return its root path."""
    ...

list() async

List all registered project IDs.

Source code in src/openfang/protocols.py
async def list(self) -> list[str]:
    """List all registered project IDs."""
    ...

register(project_id, path) async

Register a new project with the given ID and path.

Source code in src/openfang/protocols.py
async def register(self, project_id: str, path: Path) -> None:
    """Register a new project with the given ID and path."""
    ...

Sessions

Bases: Protocol

Store and retrieve session message histories (transcripts).

Source code in src/openfang/protocols.py
@runtime_checkable
class Sessions(Protocol):
    """Store and retrieve session message histories (transcripts)."""

    async def get(self, session_id: str) -> list[ModelMessage]:
        """Get all messages for a session."""
        ...

    async def save(self, session_id: str, messages: list[ModelMessage]) -> None:
        """Save (replace) all messages for a session."""
        ...

    async def append(self, session_id: str, messages: list[ModelMessage]) -> None:
        """Append messages to a session."""
        ...

    async def delete(self, session_id: str) -> None:
        """Delete a session and all its messages."""
        ...

    async def list(self, user_id: int | None = None) -> list[str]:
        """List all session IDs, optionally filtered by user."""
        ...

get(session_id) async

Get all messages for a session.

Source code in src/openfang/protocols.py
async def get(self, session_id: str) -> list[ModelMessage]:
    """Get all messages for a session."""
    ...

save(session_id, messages) async

Save (replace) all messages for a session.

Source code in src/openfang/protocols.py
async def save(self, session_id: str, messages: list[ModelMessage]) -> None:
    """Save (replace) all messages for a session."""
    ...

append(session_id, messages) async

Append messages to a session.

Source code in src/openfang/protocols.py
async def append(self, session_id: str, messages: list[ModelMessage]) -> None:
    """Append messages to a session."""
    ...

delete(session_id) async

Delete a session and all its messages.

Source code in src/openfang/protocols.py
async def delete(self, session_id: str) -> None:
    """Delete a session and all its messages."""
    ...

list(user_id=None) async

List all session IDs, optionally filtered by user.

Source code in src/openfang/protocols.py
async def list(self, user_id: int | None = None) -> list[str]:
    """List all session IDs, optionally filtered by user."""
    ...

Shell

Bases: Protocol

Command execution.

Source code in src/openfang/protocols.py
@runtime_checkable
class Shell(Protocol):
    """Command execution."""

    async def exec(self, cmd: str, cwd: Path | None = None) -> Result:
        """Execute a shell command and return the result."""
        ...

exec(cmd, cwd=None) async

Execute a shell command and return the result.

Source code in src/openfang/protocols.py
async def exec(self, cmd: str, cwd: Path | None = None) -> Result:
    """Execute a shell command and return the result."""
    ...

Web

Bases: Protocol

Web access - stateless + session factory.

Source code in src/openfang/protocols.py
@runtime_checkable
class Web(Protocol):
    """Web access - stateless + session factory."""

    async def fetch(self, url: str) -> str:
        """Fetch URL content as plain text (no JavaScript rendering)."""
        ...

    async def browse(self, url: str) -> Page:
        """Browse URL with full JavaScript rendering."""
        ...

    async def screenshot(self, url: str) -> bytes:
        """Take a screenshot of a URL."""
        ...

    async def session(self) -> WebSession:
        """Create a new stateful browser session."""
        ...

fetch(url) async

Fetch URL content as plain text (no JavaScript rendering).

Source code in src/openfang/protocols.py
async def fetch(self, url: str) -> str:
    """Fetch URL content as plain text (no JavaScript rendering)."""
    ...

browse(url) async

Browse URL with full JavaScript rendering.

Source code in src/openfang/protocols.py
async def browse(self, url: str) -> Page:
    """Browse URL with full JavaScript rendering."""
    ...

screenshot(url) async

Take a screenshot of a URL.

Source code in src/openfang/protocols.py
async def screenshot(self, url: str) -> bytes:
    """Take a screenshot of a URL."""
    ...

session() async

Create a new stateful browser session.

Source code in src/openfang/protocols.py
async def session(self) -> WebSession:
    """Create a new stateful browser session."""
    ...

WebSession

Bases: Protocol

Stateful browser session.

Source code in src/openfang/protocols.py
@runtime_checkable
class WebSession(Protocol):
    """Stateful browser session."""

    async def goto(self, url: str) -> Page:
        """Navigate to a URL and return the page content."""
        ...

    async def click(self, selector: str) -> Page:
        """Click an element and return the updated page content."""
        ...

    async def type(self, selector: str, text: str) -> None:
        """Type text into an input element."""
        ...

    async def screenshot(self) -> bytes:
        """Take a screenshot of the current page."""
        ...

    async def close(self) -> None:
        """Close the browser session and free resources."""
        ...

goto(url) async

Navigate to a URL and return the page content.

Source code in src/openfang/protocols.py
async def goto(self, url: str) -> Page:
    """Navigate to a URL and return the page content."""
    ...

click(selector) async

Click an element and return the updated page content.

Source code in src/openfang/protocols.py
async def click(self, selector: str) -> Page:
    """Click an element and return the updated page content."""
    ...

type(selector, text) async

Type text into an input element.

Source code in src/openfang/protocols.py
async def type(self, selector: str, text: str) -> None:
    """Type text into an input element."""
    ...

screenshot() async

Take a screenshot of the current page.

Source code in src/openfang/protocols.py
async def screenshot(self) -> bytes:
    """Take a screenshot of the current page."""
    ...

close() async

Close the browser session and free resources.

Source code in src/openfang/protocols.py
async def close(self) -> None:
    """Close the browser session and free resources."""
    ...

Match dataclass

Search result from grep.

Source code in src/openfang/types.py
@dataclass
class Match:
    """Search result from grep."""

    path: str
    line: int
    text: str

Page dataclass

Snapshot of a web page.

Source code in src/openfang/types.py
@dataclass
class Page:
    """Snapshot of a web page."""

    url: str
    title: str
    text: str
    html: str

Result dataclass

Shell command result.

Source code in src/openfang/types.py
@dataclass
class Result:
    """Shell command result."""

    code: int
    stdout: str
    stderr: str

    @property
    def ok(self) -> bool:
        return self.code == 0

create_agent(model=None, system_prompt='You are a helpful assistant. Be concise.')

Create a configured agent with all tools registered.

Parameters:

Name Type Description Default
model str | None

Model string (e.g. 'openai:gpt-4o'). If None, uses settings.get_model() which auto-prefixes with 'gateway/' if PYDANTIC_AI_GATEWAY_API_KEY is set.

None
system_prompt str

Base system prompt for the agent.

'You are a helpful assistant. Be concise.'
Source code in src/openfang/agent.py
def create_agent(
    model: str | None = None,
    system_prompt: str = "You are a helpful assistant. Be concise.",
) -> Agent[Deps, str]:
    """Create a configured agent with all tools registered.

    Args:
        model: Model string (e.g. 'openai:gpt-4o'). If None, uses settings.get_model()
               which auto-prefixes with 'gateway/' if PYDANTIC_AI_GATEWAY_API_KEY is set.
        system_prompt: Base system prompt for the agent.
    """
    effective_model = model or settings.get_model()
    agent: Agent[Deps, str] = Agent(
        effective_model,
        deps_type=Deps,
        system_prompt=system_prompt,
        prepare_tools=filter_tools_by_permission,
    )

    # Register tools
    for tool_func in ALL_TOOLS:
        agent.tool(tool_func)

    # Register dynamic instructions
    @agent.instructions
    async def context(ctx: RunContext[Deps]) -> str:
        """Build dynamic context from user, page, project, permissions, and skills."""
        user = ctx.deps.user
        lines = []

        # User
        roles = ", ".join(user.roles) if user.roles else "none"
        lines.append(f"User: {user.email} (roles: {roles})")

        # Project
        cur = await ctx.deps.workspace.projects.current()
        lines.append(f"Project: {cur[0]} at {cur[1]}" if cur else "No project selected.")

        # Page
        if ctx.deps.current_page:
            lines.append(f"Viewing: {ctx.deps.current_page}")

        # Tools hint
        if user.is_admin:
            lines.append("Access: admin (all tools including shell)")
        elif user.has_role(Role.DEVELOPER):
            lines.append("Access: developer (file and project tools)")
        else:
            lines.append("Access: standard (web and memory tools)")

        # Skills
        skills_prompt = ctx.deps.comms.skills.format_for_prompt()
        if skills_prompt:
            lines.append("")
            lines.append(skills_prompt)

        # Subagents (only for main agent, not subagents)
        if not ctx.deps.is_subagent and ctx.deps.subagents:
            subagents_prompt = ctx.deps.subagents.list_for_prompt()
            if subagents_prompt:
                lines.append("")
                lines.append(subagents_prompt)

        return "\n".join(lines)

    return agent

start_session(deps, session_id) async

Start or resume a session.

Source code in src/openfang/chat.py
async def start_session(deps: Deps, session_id: str) -> None:
    """Start or resume a session."""
    deps.session_id = session_id
    if isinstance(deps.storage.sessions, InMemorySessions):
        deps.storage.sessions.associate_user(session_id, deps.user.id)

create_deps(user, root=None, sessions=None, cron=None, current_page=None, session_id=None, enable_subagents=True) async

Create Deps with sensible defaults. Use as async context manager.

Parameters:

Name Type Description Default
user User

User context for the agent.

required
root Path | None

Root directory for file operations.

None
sessions Sessions | None

Session storage adapter.

None
cron Cron | None

Cron adapter.

None
current_page str | None

Current page context.

None
session_id str | None

Current session ID.

None
enable_subagents bool

Whether to enable subagent delegation.

True
Source code in src/openfang/deps.py
@asynccontextmanager
async def create_deps(
    user: User,
    root: Path | None = None,
    sessions: Sessions | None = None,
    cron: Cron | None = None,
    current_page: str | None = None,
    session_id: str | None = None,
    enable_subagents: bool = True,
) -> AsyncIterator[Deps]:
    """Create Deps with sensible defaults. Use as async context manager.

    Args:
        user: User context for the agent.
        root: Root directory for file operations.
        sessions: Session storage adapter.
        cron: Cron adapter.
        current_page: Current page context.
        session_id: Current session ID.
        enable_subagents: Whether to enable subagent delegation.
    """
    from .subagents import SubagentExecutor, SubagentRegistry

    root = root or Path.cwd()

    # Initialize subagent system if enabled
    subagents = SubagentRegistry.default() if enable_subagents else None
    subagent_executor = SubagentExecutor() if enable_subagents else None

    deps = Deps(
        user=user,
        storage=StorageAdapters.default(root, sessions),
        web_adapters=WebAdapters.default(),
        workspace=WorkspaceAdapters.default(root),
        scheduling=SchedulingAdapters.default(cron),
        comms=CommsAdapters.default(),
        current_page=current_page,
        session_id=session_id,
        subagents=subagents,
        subagent_executor=subagent_executor,
    )
    try:
        yield deps
    finally:
        await deps.close()