orynxml-agents / app /agent /sandbox_agent.py
Speedofmastery's picture
Upload folder using huggingface_hub
88f3fce verified
from typing import Dict, List, Optional
from pydantic import Field, model_validator
from app.agent.browser import BrowserContextHelper
from app.agent.toolcall import ToolCallAgent
from app.config import config
from app.daytona.sandbox import create_sandbox, delete_sandbox
from app.daytona.tool_base import SandboxToolsBase
from app.logger import logger
from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.tool import Terminate, ToolCollection
from app.tool.ask_human import AskHuman
from app.tool.mcp import MCPClients, MCPClientTool
from app.tool.sandbox.sb_browser_tool import SandboxBrowserTool
from app.tool.sandbox.sb_files_tool import SandboxFilesTool
from app.tool.sandbox.sb_shell_tool import SandboxShellTool
from app.tool.sandbox.sb_vision_tool import SandboxVisionTool
class SandboxManus(ToolCallAgent):
"""A versatile general-purpose agent with support for both local and MCP tools."""
name: str = "SandboxManus"
description: str = "A versatile agent that can solve various tasks using multiple sandbox-tools including MCP-based tools"
system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root)
next_step_prompt: str = NEXT_STEP_PROMPT
max_observe: int = 10000
max_steps: int = 20
# MCP clients for remote tool access
mcp_clients: MCPClients = Field(default_factory=MCPClients)
# Add general-purpose tools to the tool collection
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
# PythonExecute(),
# BrowserUseTool(),
# StrReplaceEditor(),
AskHuman(),
Terminate(),
)
)
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name])
browser_context_helper: Optional[BrowserContextHelper] = None
# Track connected MCP servers
connected_servers: Dict[str, str] = Field(
default_factory=dict
) # server_id -> url/command
_initialized: bool = False
sandbox_link: Optional[dict[str, dict[str, str]]] = Field(default_factory=dict)
@model_validator(mode="after")
def initialize_helper(self) -> "SandboxManus":
"""Initialize basic components synchronously."""
self.browser_context_helper = BrowserContextHelper(self)
return self
@classmethod
async def create(cls, **kwargs) -> "SandboxManus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
await instance.initialize_sandbox_tools()
instance._initialized = True
return instance
async def initialize_sandbox_tools(
self,
password: str = config.daytona.VNC_password,
) -> None:
try:
# 创建新沙箱
if password:
sandbox = create_sandbox(password=password)
self.sandbox = sandbox
else:
raise ValueError("password must be provided")
vnc_link = sandbox.get_preview_link(6080)
website_link = sandbox.get_preview_link(8080)
vnc_url = vnc_link.url if hasattr(vnc_link, "url") else str(vnc_link)
website_url = (
website_link.url if hasattr(website_link, "url") else str(website_link)
)
# Get the actual sandbox_id from the created sandbox
actual_sandbox_id = sandbox.id if hasattr(sandbox, "id") else "new_sandbox"
if not self.sandbox_link:
self.sandbox_link = {}
self.sandbox_link[actual_sandbox_id] = {
"vnc": vnc_url,
"website": website_url,
}
logger.info(f"VNC URL: {vnc_url}")
logger.info(f"Website URL: {website_url}")
SandboxToolsBase._urls_printed = True
sb_tools = [
SandboxBrowserTool(sandbox),
SandboxFilesTool(sandbox),
SandboxShellTool(sandbox),
SandboxVisionTool(sandbox),
]
self.available_tools.add_tools(*sb_tools)
except Exception as e:
logger.error(f"Error initializing sandbox tools: {e}")
raise
async def initialize_mcp_servers(self) -> None:
"""Initialize connections to configured MCP servers."""
for server_id, server_config in config.mcp_config.servers.items():
try:
if server_config.type == "sse":
if server_config.url:
await self.connect_mcp_server(server_config.url, server_id)
logger.info(
f"Connected to MCP server {server_id} at {server_config.url}"
)
elif server_config.type == "stdio":
if server_config.command:
await self.connect_mcp_server(
server_config.command,
server_id,
use_stdio=True,
stdio_args=server_config.args,
)
logger.info(
f"Connected to MCP server {server_id} using command {server_config.command}"
)
except Exception as e:
logger.error(f"Failed to connect to MCP server {server_id}: {e}")
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url
# Update available tools with only the new tools from this server
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)
async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
await self.mcp_clients.disconnect(server_id)
if server_id:
self.connected_servers.pop(server_id, None)
else:
self.connected_servers.clear()
# Rebuild available tools without the disconnected server's tools
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
self.available_tools = ToolCollection(*base_tools)
self.available_tools.add_tools(*self.mcp_clients.tools)
async def delete_sandbox(self, sandbox_id: str) -> None:
"""Delete a sandbox by ID."""
try:
await delete_sandbox(sandbox_id)
logger.info(f"Sandbox {sandbox_id} deleted successfully")
if sandbox_id in self.sandbox_link:
del self.sandbox_link[sandbox_id]
except Exception as e:
logger.error(f"Error deleting sandbox {sandbox_id}: {e}")
raise e
async def cleanup(self):
"""Clean up Manus agent resources."""
if self.browser_context_helper:
await self.browser_context_helper.cleanup_browser()
# Disconnect from all MCP servers only if we were initialized
if self._initialized:
await self.disconnect_mcp_server()
await self.delete_sandbox(self.sandbox.id if self.sandbox else "unknown")
self._initialized = False
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
if not self._initialized:
await self.initialize_mcp_servers()
self._initialized = True
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == SandboxBrowserTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return result