File size: 8,777 Bytes
88f3fce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
from typing import Dict, List, Optional
from pydantic import Field, model_validator
from app.agent.browser import BrowserContextHelper
from app.agent.toolcall import ToolCallAgent
from app.config import config
from app.daytona.sandbox import create_sandbox, delete_sandbox
from app.daytona.tool_base import SandboxToolsBase
from app.logger import logger
from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.tool import Terminate, ToolCollection
from app.tool.ask_human import AskHuman
from app.tool.mcp import MCPClients, MCPClientTool
from app.tool.sandbox.sb_browser_tool import SandboxBrowserTool
from app.tool.sandbox.sb_files_tool import SandboxFilesTool
from app.tool.sandbox.sb_shell_tool import SandboxShellTool
from app.tool.sandbox.sb_vision_tool import SandboxVisionTool
class SandboxManus(ToolCallAgent):
"""A versatile general-purpose agent with support for both local and MCP tools."""
name: str = "SandboxManus"
description: str = "A versatile agent that can solve various tasks using multiple sandbox-tools including MCP-based tools"
system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root)
next_step_prompt: str = NEXT_STEP_PROMPT
max_observe: int = 10000
max_steps: int = 20
# MCP clients for remote tool access
mcp_clients: MCPClients = Field(default_factory=MCPClients)
# Add general-purpose tools to the tool collection
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
# PythonExecute(),
# BrowserUseTool(),
# StrReplaceEditor(),
AskHuman(),
Terminate(),
)
)
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name])
browser_context_helper: Optional[BrowserContextHelper] = None
# Track connected MCP servers
connected_servers: Dict[str, str] = Field(
default_factory=dict
) # server_id -> url/command
_initialized: bool = False
sandbox_link: Optional[dict[str, dict[str, str]]] = Field(default_factory=dict)
@model_validator(mode="after")
def initialize_helper(self) -> "SandboxManus":
"""Initialize basic components synchronously."""
self.browser_context_helper = BrowserContextHelper(self)
return self
@classmethod
async def create(cls, **kwargs) -> "SandboxManus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
await instance.initialize_sandbox_tools()
instance._initialized = True
return instance
async def initialize_sandbox_tools(
self,
password: str = config.daytona.VNC_password,
) -> None:
try:
# 创建新沙箱
if password:
sandbox = create_sandbox(password=password)
self.sandbox = sandbox
else:
raise ValueError("password must be provided")
vnc_link = sandbox.get_preview_link(6080)
website_link = sandbox.get_preview_link(8080)
vnc_url = vnc_link.url if hasattr(vnc_link, "url") else str(vnc_link)
website_url = (
website_link.url if hasattr(website_link, "url") else str(website_link)
)
# Get the actual sandbox_id from the created sandbox
actual_sandbox_id = sandbox.id if hasattr(sandbox, "id") else "new_sandbox"
if not self.sandbox_link:
self.sandbox_link = {}
self.sandbox_link[actual_sandbox_id] = {
"vnc": vnc_url,
"website": website_url,
}
logger.info(f"VNC URL: {vnc_url}")
logger.info(f"Website URL: {website_url}")
SandboxToolsBase._urls_printed = True
sb_tools = [
SandboxBrowserTool(sandbox),
SandboxFilesTool(sandbox),
SandboxShellTool(sandbox),
SandboxVisionTool(sandbox),
]
self.available_tools.add_tools(*sb_tools)
except Exception as e:
logger.error(f"Error initializing sandbox tools: {e}")
raise
async def initialize_mcp_servers(self) -> None:
"""Initialize connections to configured MCP servers."""
for server_id, server_config in config.mcp_config.servers.items():
try:
if server_config.type == "sse":
if server_config.url:
await self.connect_mcp_server(server_config.url, server_id)
logger.info(
f"Connected to MCP server {server_id} at {server_config.url}"
)
elif server_config.type == "stdio":
if server_config.command:
await self.connect_mcp_server(
server_config.command,
server_id,
use_stdio=True,
stdio_args=server_config.args,
)
logger.info(
f"Connected to MCP server {server_id} using command {server_config.command}"
)
except Exception as e:
logger.error(f"Failed to connect to MCP server {server_id}: {e}")
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url
# Update available tools with only the new tools from this server
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)
async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
await self.mcp_clients.disconnect(server_id)
if server_id:
self.connected_servers.pop(server_id, None)
else:
self.connected_servers.clear()
# Rebuild available tools without the disconnected server's tools
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
self.available_tools = ToolCollection(*base_tools)
self.available_tools.add_tools(*self.mcp_clients.tools)
async def delete_sandbox(self, sandbox_id: str) -> None:
"""Delete a sandbox by ID."""
try:
await delete_sandbox(sandbox_id)
logger.info(f"Sandbox {sandbox_id} deleted successfully")
if sandbox_id in self.sandbox_link:
del self.sandbox_link[sandbox_id]
except Exception as e:
logger.error(f"Error deleting sandbox {sandbox_id}: {e}")
raise e
async def cleanup(self):
"""Clean up Manus agent resources."""
if self.browser_context_helper:
await self.browser_context_helper.cleanup_browser()
# Disconnect from all MCP servers only if we were initialized
if self._initialized:
await self.disconnect_mcp_server()
await self.delete_sandbox(self.sandbox.id if self.sandbox else "unknown")
self._initialized = False
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
if not self._initialized:
await self.initialize_mcp_servers()
self._initialized = True
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == SandboxBrowserTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return result
|