"""Finance Tracker Agent using MCP Toolbox Docker server via HTTP.""" import json from typing import Any, Dict, List, Optional, Literal from pydantic import BaseModel, Field from src.core.config import config from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import AIMessage, HumanMessage, ToolMessage from toolbox_langchain import ToolboxClient def _to_text(payload: Any) -> str: """Convert model or tool output into a printable string.""" if isinstance(payload, str): return payload try: return json.dumps(payload, ensure_ascii=False) except TypeError: return str(payload) # Pydantic schemas for structured output class PortfolioHolding(BaseModel): """A single stock holding in the portfolio.""" symbol: str = Field(description="Stock ticker symbol (e.g., AAPL, MSFT)") quantity: float = Field(description="Number of shares held") avg_cost_basis: float = Field(description="Average cost per share") total_invested: float = Field(description="Total amount invested (quantity * avg_cost_basis)") realized_gains: Optional[float] = Field(default=None, description="Realized gains/losses from sales") class TransactionRecord(BaseModel): """A stock transaction record.""" symbol: str = Field(description="Stock or Crypto Coin ticker symbol") transaction_type: Literal["BUY", "SELL"] = Field(description="Type of transaction") quantity: float = Field(description="Number of shares") price: float = Field(description="Price per share") transaction_date: str = Field(description="Date of transaction") notes: Optional[str] = Field(default=None, description="Additional notes") class PortfolioResponse(BaseModel): """Structured response for portfolio queries.""" response_type: Literal["portfolio_view", "transaction_added", "transaction_history", "general_response"] = Field( description="Type of response being provided" ) summary: str = Field(description="Brief summary of the response") holdings: Optional[List[PortfolioHolding]] = Field( default=None, description="Current portfolio holdings (for portfolio_view)" ) transaction: Optional[TransactionRecord] = Field( default=None, description="Transaction details (for transaction_added)" ) transactions: Optional[List[TransactionRecord]] = Field( default=None, description="List of transactions (for transaction_history)" ) total_portfolio_value: Optional[float] = Field( default=None, description="Total portfolio value" ) insights: Optional[List[str]] = Field( default=None, description="Key insights or recommendations" ) def format_portfolio_response(response: PortfolioResponse) -> str: """Format structured portfolio response into readable text.""" output = [] # Add summary output.append(response.summary) output.append("") # Format holdings if present if response.holdings: output.append("## Current Portfolio Holdings") output.append("") # Create table header output.append("| Symbol | Shares | Avg Cost | Total Invested | Realized Gains |") output.append("|--------|--------|----------|----------------|----------------|") # Add each holding as a table row for holding in response.holdings: realized_gains = f"${holding.realized_gains:.2f}" if holding.realized_gains is not None else "$0.00" output.append( f"| {holding.symbol} | {holding.quantity} | " f"${holding.avg_cost_basis:.2f} | ${holding.total_invested:.2f} | {realized_gains} |" ) output.append("") # Format single transaction if present if response.transaction: output.append("## Transaction Details") output.append("") t = response.transaction output.append(f"- **Type**: {t.transaction_type}") output.append(f"- **Symbol**: {t.symbol}") output.append(f"- **Quantity**: {t.quantity} shares") output.append(f"- **Price**: ${t.price:.2f} per share") output.append(f"- **Date**: {t.transaction_date}") output.append(f"- **Total Amount**: ${t.quantity * t.price:.2f}") if t.notes: output.append(f"- **Notes**: {t.notes}") output.append("") # Format transaction history if present if response.transactions: output.append("## Transaction History") output.append("") for t in response.transactions: output.append(f"**{t.transaction_date}** - {t.transaction_type} {t.quantity} shares of {t.symbol} @ ${t.price:.2f}") if t.notes: output.append(f" Notes: {t.notes}") output.append("") # Add total portfolio value if present if response.total_portfolio_value is not None: output.append(f"**Total Portfolio Value**: ${response.total_portfolio_value:.2f}") output.append("") # Add insights if present if response.insights: output.append("## Key Insights") output.append("") for insight in response.insights: output.append(f"- {insight}") output.append("") return "\n".join(output).strip() class FinanceTrackerMCP: """ Agent for managing personal investment portfolio using MCP Toolbox HTTP server. Features: - Portfolio Management: Track stock purchases/sales via Cloud SQL - Position Analysis: View holdings, cost basis, realized gains - Performance Metrics: Calculate gains/losses with current prices - Database Operations: Full CRUD via MCP Toolbox Docker container - Secure Access: Uses Google Cloud SQL with MCP Toolbox connector """ def __init__(self): self.name = "Finance Tracker (MCP)" self.description = "Personal portfolio tracking and analysis using Cloud SQL via MCP Toolbox" self.toolbox_client: Optional[ToolboxClient] = None self.model: Optional[ChatGoogleGenerativeAI] = None self.model_with_tools = None self.model_structured = None self.tools: List[Any] = [] self.tool_map: Dict[str, Any] = {} self.initialized = False async def initialize(self) -> None: """Initialize the agent with MCP Toolbox HTTP server.""" print(f"๐Ÿ’ผ Initializing {self.name}...") try: # Connect to MCP Toolbox HTTP server (Docker container) print(f" ๐Ÿ“ก Connecting to MCP Toolbox HTTP server...") print(f" Server URL: {config.MCP_TOOLBOX_SERVER_URL}") # Create toolbox client and enter async context manager self.toolbox_client = ToolboxClient(config.MCP_TOOLBOX_SERVER_URL) # Manually enter the async context manager to keep connection alive await self.toolbox_client.__aenter__() # Load database tools from toolbox print(" Loading tools from MCP Toolbox...") self.tools = self.toolbox_client.load_toolset() if not self.tools: raise RuntimeError( f"No tools available from MCP Toolbox server at {config.MCP_TOOLBOX_SERVER_URL}\n" f"Make sure the Docker container is running: docker-compose up -d" ) self.tool_map = {tool.name: tool for tool in self.tools} # Initialize Gemini chat model bound to tools self.model = ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=0.1, google_api_key=config.GOOGLE_API_KEY, ) self.model_with_tools = self.model.bind_tools(self.tools) # Initialize model with structured output for final responses self.model_structured = self.model.with_structured_output(PortfolioResponse) print(f" โœ… Connected to MCP Toolbox with {len(self.tools)} tools") print(f" ๐Ÿ“‹ Available MCP Toolbox capabilities:") print(f" - Database queries (SELECT, INSERT, UPDATE, DELETE)") print(f" - Schema introspection (tables, columns, relationships)") print(f" - Transaction management") print(f" - Natural language to SQL conversion") print(f" Total: {len(self.tools)} tools available") self.initialized = True print(f" โœ… {self.name} ready!") except Exception as e: import traceback print(f" โŒ Error initializing {self.name}: {e}") print(f" ๐Ÿ“‹ Full error details:") traceback.print_exc() print(f"\n๐Ÿ’ก Troubleshooting:") print(f" 1. Make sure Docker is running") print(f" 2. Start MCP Toolbox container: docker-compose up -d") print(f" 3. Check container status: docker-compose ps") print(f" 4. View container logs: docker-compose logs mcp-toolbox") print(f" 5. Verify server is accessible: curl {config.MCP_TOOLBOX_SERVER_URL}/health") raise async def process(self, query: str, history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]: """Process a query using MCP Toolbox for Cloud SQL operations.""" try: if not self.initialized: await self.initialize() print(f"\n๐Ÿ’ผ {self.name} processing: '{query}'") # Build system message with portfolio context system_message = """You are a Finance Tracker Agent specialized in portfolio management. You have access to a Cloud SQL PostgreSQL database through MCP Toolbox with the following schema: Tables: - stock_transactions: Stores all BUY/SELL transactions (symbol, transaction_type, quantity, price, transaction_date, notes) - portfolio_positions: Current aggregated positions (symbol, total_quantity, avg_cost_basis, total_invested, realized_gains) - portfolio_snapshots: Historical portfolio value tracking - stock_metadata: Cached stock information (company_name, sector, industry, market_cap) When users want to: - ADD a transaction: INSERT into stock_transactions (triggers will update portfolio_positions automatically) - VIEW portfolio: SELECT from portfolio_positions - CHECK transaction history: SELECT from stock_transactions - ANALYZE performance: Query portfolio_positions with calculations Always use the MCP Toolbox database tools to: 1. Query the database for current data 2. Insert/update records as needed 3. Calculate metrics (gains, losses, allocations) 4. Provide clear, actionable insights Be helpful, accurate, and provide investment insights based on their data.""" messages: List[Any] = [HumanMessage(content=system_message)] # Add conversation history if history: trimmed_history = history[-10:] for turn in trimmed_history: user_text = turn.get("user") if user_text: messages.append(HumanMessage(content=user_text)) assistant_text = turn.get("assistant") if assistant_text: messages.append(AIMessage(content=assistant_text)) messages.append(HumanMessage(content=query)) # Tool calling loop - gather data from database tool_calls_info = [] while True: if not self.model_with_tools: raise RuntimeError("Model not initialized with tools") ai_message = await self.model_with_tools.ainvoke(messages) messages.append(ai_message) tool_calls = getattr(ai_message, "tool_calls", []) if not tool_calls: # No more tool calls - exit loop break for call in tool_calls: tool_name = call.get("name") tool_args = call.get("args", {}) tool_call_id = call.get("id") print(f" ๐Ÿ”ง MCP Toolbox call: {tool_name}({json.dumps(tool_args, indent=2)})") tool_calls_info.append(f"๐Ÿ”ง MCP Toolbox call: {tool_name}({json.dumps(tool_args)})") tool = self.tool_map.get(tool_name) if not tool: tool_result = {"error": f"Tool '{tool_name}' not found"} else: tool_result = await tool.ainvoke(tool_args) messages.append( ToolMessage( content=_to_text(tool_result), tool_call_id=tool_call_id or "", ) ) # Generate structured response based on all gathered information print(" ๐Ÿ“Š Generating structured response...") structured_prompt = """Based on the database query results, provide a structured response. Guidelines: - For portfolio viewing: Set response_type to "portfolio_view" and populate holdings list - For adding transactions: Set response_type to "transaction_added" and populate transaction field - For transaction history: Set response_type to "transaction_history" and populate transactions list - For other queries: Set response_type to "general_response" - Always provide a clear summary - Include relevant insights when possible""" messages.append(HumanMessage(content=structured_prompt)) if not self.model_structured: raise RuntimeError("Structured output model not initialized") structured_response = await self.model_structured.ainvoke(messages) # Format the structured response into readable text final_response = format_portfolio_response(structured_response) return { "success": True, "agent": self.name, "response": final_response, "query": query, "structured_data": structured_response.model_dump(), # Include raw structured data "tool_calls": tool_calls_info } except Exception as e: print(f" โŒ Error in {self.name}: {e}") import traceback traceback.print_exc() return { "success": False, "agent": self.name, "error": str(e), "query": query } async def cleanup(self) -> None: """Cleanup resources.""" if self.toolbox_client: try: # Exit the async context manager await self.toolbox_client.__aexit__(None, None, None) except Exception as e: print(f" โš ๏ธ Warning during cleanup: {e}") print(f"๐Ÿงน {self.name} cleaned up")