File size: 5,618 Bytes
88f3fce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
"""File operation interfaces and implementations for local and sandbox environments."""
import asyncio
from pathlib import Path
from typing import Optional, Protocol, Tuple, Union, runtime_checkable
from app.config import SandboxSettings
from app.exceptions import ToolError
from app.sandbox.client import SANDBOX_CLIENT
PathLike = Union[str, Path]
@runtime_checkable
class FileOperator(Protocol):
"""Interface for file operations in different environments."""
async def read_file(self, path: PathLike) -> str:
"""Read content from a file."""
...
async def write_file(self, path: PathLike, content: str) -> None:
"""Write content to a file."""
...
async def is_directory(self, path: PathLike) -> bool:
"""Check if path points to a directory."""
...
async def exists(self, path: PathLike) -> bool:
"""Check if path exists."""
...
async def run_command(
self, cmd: str, timeout: Optional[float] = 120.0
) -> Tuple[int, str, str]:
"""Run a shell command and return (return_code, stdout, stderr)."""
...
class LocalFileOperator(FileOperator):
"""File operations implementation for local filesystem."""
encoding: str = "utf-8"
async def read_file(self, path: PathLike) -> str:
"""Read content from a local file."""
try:
return Path(path).read_text(encoding=self.encoding)
except Exception as e:
raise ToolError(f"Failed to read {path}: {str(e)}") from None
async def write_file(self, path: PathLike, content: str) -> None:
"""Write content to a local file."""
try:
Path(path).write_text(content, encoding=self.encoding)
except Exception as e:
raise ToolError(f"Failed to write to {path}: {str(e)}") from None
async def is_directory(self, path: PathLike) -> bool:
"""Check if path points to a directory."""
return Path(path).is_dir()
async def exists(self, path: PathLike) -> bool:
"""Check if path exists."""
return Path(path).exists()
async def run_command(
self, cmd: str, timeout: Optional[float] = 120.0
) -> Tuple[int, str, str]:
"""Run a shell command locally."""
process = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
return (
process.returncode or 0,
stdout.decode(),
stderr.decode(),
)
except asyncio.TimeoutError as exc:
try:
process.kill()
except ProcessLookupError:
pass
raise TimeoutError(
f"Command '{cmd}' timed out after {timeout} seconds"
) from exc
class SandboxFileOperator(FileOperator):
"""File operations implementation for sandbox environment."""
def __init__(self):
self.sandbox_client = SANDBOX_CLIENT
async def _ensure_sandbox_initialized(self):
"""Ensure sandbox is initialized."""
if not self.sandbox_client.sandbox:
await self.sandbox_client.create(config=SandboxSettings())
async def read_file(self, path: PathLike) -> str:
"""Read content from a file in sandbox."""
await self._ensure_sandbox_initialized()
try:
return await self.sandbox_client.read_file(str(path))
except Exception as e:
raise ToolError(f"Failed to read {path} in sandbox: {str(e)}") from None
async def write_file(self, path: PathLike, content: str) -> None:
"""Write content to a file in sandbox."""
await self._ensure_sandbox_initialized()
try:
await self.sandbox_client.write_file(str(path), content)
except Exception as e:
raise ToolError(f"Failed to write to {path} in sandbox: {str(e)}") from None
async def is_directory(self, path: PathLike) -> bool:
"""Check if path points to a directory in sandbox."""
await self._ensure_sandbox_initialized()
result = await self.sandbox_client.run_command(
f"test -d {path} && echo 'true' || echo 'false'"
)
return result.strip() == "true"
async def exists(self, path: PathLike) -> bool:
"""Check if path exists in sandbox."""
await self._ensure_sandbox_initialized()
result = await self.sandbox_client.run_command(
f"test -e {path} && echo 'true' || echo 'false'"
)
return result.strip() == "true"
async def run_command(
self, cmd: str, timeout: Optional[float] = 120.0
) -> Tuple[int, str, str]:
"""Run a command in sandbox environment."""
await self._ensure_sandbox_initialized()
try:
stdout = await self.sandbox_client.run_command(
cmd, timeout=int(timeout) if timeout else None
)
return (
0, # Always return 0 since we don't have explicit return code from sandbox
stdout,
"", # No stderr capture in the current sandbox implementation
)
except TimeoutError as exc:
raise TimeoutError(
f"Command '{cmd}' timed out after {timeout} seconds in sandbox"
) from exc
except Exception as exc:
return 1, "", f"Error executing command in sandbox: {str(exc)}"
|