File size: 14,958 Bytes
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c89e824
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72dcf2b
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72dcf2b
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72dcf2b
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72dcf2b
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""Finance Tracker Agent using MCP Toolbox Docker server via HTTP."""
import json
from typing import Any, Dict, List, Optional, Literal
from pydantic import BaseModel, Field

from src.core.config import config
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from toolbox_langchain import ToolboxClient


def _to_text(payload: Any) -> str:
    """Convert model or tool output into a printable string."""
    if isinstance(payload, str):
        return payload
    try:
        return json.dumps(payload, ensure_ascii=False)
    except TypeError:
        return str(payload)


# Pydantic schemas for structured output
class PortfolioHolding(BaseModel):
    """A single stock holding in the portfolio."""
    symbol: str = Field(description="Stock ticker symbol (e.g., AAPL, MSFT)")
    quantity: float = Field(description="Number of shares held")
    avg_cost_basis: float = Field(description="Average cost per share")
    total_invested: float = Field(description="Total amount invested (quantity * avg_cost_basis)")
    realized_gains: Optional[float] = Field(default=None, description="Realized gains/losses from sales")


class TransactionRecord(BaseModel):
    """A stock transaction record."""
    symbol: str = Field(description="Stock or Crypto Coin ticker symbol")
    transaction_type: Literal["BUY", "SELL"] = Field(description="Type of transaction")
    quantity: float = Field(description="Number of shares")
    price: float = Field(description="Price per share")
    transaction_date: str = Field(description="Date of transaction")
    notes: Optional[str] = Field(default=None, description="Additional notes")


class PortfolioResponse(BaseModel):
    """Structured response for portfolio queries."""
    response_type: Literal["portfolio_view", "transaction_added", "transaction_history", "general_response"] = Field(
        description="Type of response being provided"
    )
    summary: str = Field(description="Brief summary of the response")
    holdings: Optional[List[PortfolioHolding]] = Field(
        default=None,
        description="Current portfolio holdings (for portfolio_view)"
    )
    transaction: Optional[TransactionRecord] = Field(
        default=None,
        description="Transaction details (for transaction_added)"
    )
    transactions: Optional[List[TransactionRecord]] = Field(
        default=None,
        description="List of transactions (for transaction_history)"
    )
    total_portfolio_value: Optional[float] = Field(
        default=None,
        description="Total portfolio value"
    )
    insights: Optional[List[str]] = Field(
        default=None,
        description="Key insights or recommendations"
    )


def format_portfolio_response(response: PortfolioResponse) -> str:
    """Format structured portfolio response into readable text."""
    output = []

    # Add summary
    output.append(response.summary)
    output.append("")

    # Format holdings if present
    if response.holdings:
        output.append("## Current Portfolio Holdings")
        output.append("")
        # Create table header
        output.append("| Symbol | Shares | Avg Cost | Total Invested | Realized Gains |")
        output.append("|--------|--------|----------|----------------|----------------|")
        # Add each holding as a table row
        for holding in response.holdings:
            realized_gains = f"${holding.realized_gains:.2f}" if holding.realized_gains is not None else "$0.00"
            output.append(
                f"| {holding.symbol} | {holding.quantity} | "
                f"${holding.avg_cost_basis:.2f} | ${holding.total_invested:.2f} | {realized_gains} |"
            )
        output.append("")

    # Format single transaction if present
    if response.transaction:
        output.append("## Transaction Details")
        output.append("")
        t = response.transaction
        output.append(f"- **Type**: {t.transaction_type}")
        output.append(f"- **Symbol**: {t.symbol}")
        output.append(f"- **Quantity**: {t.quantity} shares")
        output.append(f"- **Price**: ${t.price:.2f} per share")
        output.append(f"- **Date**: {t.transaction_date}")
        output.append(f"- **Total Amount**: ${t.quantity * t.price:.2f}")
        if t.notes:
            output.append(f"- **Notes**: {t.notes}")
        output.append("")

    # Format transaction history if present
    if response.transactions:
        output.append("## Transaction History")
        output.append("")
        for t in response.transactions:
            output.append(f"**{t.transaction_date}** - {t.transaction_type} {t.quantity} shares of {t.symbol} @ ${t.price:.2f}")
            if t.notes:
                output.append(f"  Notes: {t.notes}")
        output.append("")

    # Add total portfolio value if present
    if response.total_portfolio_value is not None:
        output.append(f"**Total Portfolio Value**: ${response.total_portfolio_value:.2f}")
        output.append("")

    # Add insights if present
    if response.insights:
        output.append("## Key Insights")
        output.append("")
        for insight in response.insights:
            output.append(f"- {insight}")
        output.append("")

    return "\n".join(output).strip()


class FinanceTrackerMCP:
    """
    Agent for managing personal investment portfolio using MCP Toolbox HTTP server.

    Features:
    - Portfolio Management: Track stock purchases/sales via Cloud SQL
    - Position Analysis: View holdings, cost basis, realized gains
    - Performance Metrics: Calculate gains/losses with current prices
    - Database Operations: Full CRUD via MCP Toolbox Docker container
    - Secure Access: Uses Google Cloud SQL with MCP Toolbox connector
    """

    def __init__(self):
        self.name = "Finance Tracker (MCP)"
        self.description = "Personal portfolio tracking and analysis using Cloud SQL via MCP Toolbox"
        self.toolbox_client: Optional[ToolboxClient] = None
        self.model: Optional[ChatGoogleGenerativeAI] = None
        self.model_with_tools = None
        self.model_structured = None
        self.tools: List[Any] = []
        self.tool_map: Dict[str, Any] = {}
        self.initialized = False

    async def initialize(self) -> None:
        """Initialize the agent with MCP Toolbox HTTP server."""
        print(f"πŸ’Ό Initializing {self.name}...")

        try:
            # Connect to MCP Toolbox HTTP server (Docker container)
            print(f"  πŸ“‘ Connecting to MCP Toolbox HTTP server...")
            print(f"    Server URL: {config.MCP_TOOLBOX_SERVER_URL}")

            # Create toolbox client and enter async context manager
            self.toolbox_client = ToolboxClient(config.MCP_TOOLBOX_SERVER_URL)
            # Manually enter the async context manager to keep connection alive
            await self.toolbox_client.__aenter__()

            # Load database tools from toolbox
            print("    Loading tools from MCP Toolbox...")
            self.tools = self.toolbox_client.load_toolset()

            if not self.tools:
                raise RuntimeError(
                    f"No tools available from MCP Toolbox server at {config.MCP_TOOLBOX_SERVER_URL}\n"
                    f"Make sure the Docker container is running: docker-compose up -d"
                )

            self.tool_map = {tool.name: tool for tool in self.tools}

            # Initialize Gemini chat model bound to tools
            self.model = ChatGoogleGenerativeAI(
                model="gemini-2.5-flash",
                temperature=0.1,
                google_api_key=config.GOOGLE_API_KEY,
            )
            self.model_with_tools = self.model.bind_tools(self.tools)

            # Initialize model with structured output for final responses
            self.model_structured = self.model.with_structured_output(PortfolioResponse)

            print(f"  βœ… Connected to MCP Toolbox with {len(self.tools)} tools")
            print(f"  πŸ“‹ Available MCP Toolbox capabilities:")
            print(f"    - Database queries (SELECT, INSERT, UPDATE, DELETE)")
            print(f"    - Schema introspection (tables, columns, relationships)")
            print(f"    - Transaction management")
            print(f"    - Natural language to SQL conversion")
            print(f"    Total: {len(self.tools)} tools available")

            self.initialized = True
            print(f"  βœ… {self.name} ready!")

        except Exception as e:
            import traceback
            print(f"  ❌ Error initializing {self.name}: {e}")
            print(f"  πŸ“‹ Full error details:")
            traceback.print_exc()
            print(f"\nπŸ’‘ Troubleshooting:")
            print(f"   1. Make sure Docker is running")
            print(f"   2. Start MCP Toolbox container: docker-compose up -d")
            print(f"   3. Check container status: docker-compose ps")
            print(f"   4. View container logs: docker-compose logs mcp-toolbox")
            print(f"   5. Verify server is accessible: curl {config.MCP_TOOLBOX_SERVER_URL}/health")
            raise

    async def process(self, query: str, history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
        """Process a query using MCP Toolbox for Cloud SQL operations."""
        try:
            if not self.initialized:
                await self.initialize()

            print(f"\nπŸ’Ό {self.name} processing: '{query}'")

            # Build system message with portfolio context
            system_message = """You are a Finance Tracker Agent specialized in portfolio management.

You have access to a Cloud SQL PostgreSQL database through MCP Toolbox with the following schema:

Tables:
- stock_transactions: Stores all BUY/SELL transactions (symbol, transaction_type, quantity, price, transaction_date, notes)
- portfolio_positions: Current aggregated positions (symbol, total_quantity, avg_cost_basis, total_invested, realized_gains)
- portfolio_snapshots: Historical portfolio value tracking
- stock_metadata: Cached stock information (company_name, sector, industry, market_cap)

When users want to:
- ADD a transaction: INSERT into stock_transactions (triggers will update portfolio_positions automatically)
- VIEW portfolio: SELECT from portfolio_positions
- CHECK transaction history: SELECT from stock_transactions
- ANALYZE performance: Query portfolio_positions with calculations

Always use the MCP Toolbox database tools to:
1. Query the database for current data
2. Insert/update records as needed
3. Calculate metrics (gains, losses, allocations)
4. Provide clear, actionable insights

Be helpful, accurate, and provide investment insights based on their data."""

            messages: List[Any] = [HumanMessage(content=system_message)]

            # Add conversation history
            if history:
                trimmed_history = history[-10:]
                for turn in trimmed_history:
                    user_text = turn.get("user")
                    if user_text:
                        messages.append(HumanMessage(content=user_text))
                    assistant_text = turn.get("assistant")
                    if assistant_text:
                        messages.append(AIMessage(content=assistant_text))

            messages.append(HumanMessage(content=query))

            # Tool calling loop - gather data from database
            tool_calls_info = []
            while True:
                if not self.model_with_tools:
                    raise RuntimeError("Model not initialized with tools")

                ai_message = await self.model_with_tools.ainvoke(messages)
                messages.append(ai_message)

                tool_calls = getattr(ai_message, "tool_calls", [])
                if not tool_calls:
                    # No more tool calls - exit loop
                    break

                for call in tool_calls:
                    tool_name = call.get("name")
                    tool_args = call.get("args", {})
                    tool_call_id = call.get("id")
                    print(f"  πŸ”§ MCP Toolbox call: {tool_name}({json.dumps(tool_args, indent=2)})")
                    tool_calls_info.append(f"πŸ”§ MCP Toolbox call: {tool_name}({json.dumps(tool_args)})")

                    tool = self.tool_map.get(tool_name)
                    if not tool:
                        tool_result = {"error": f"Tool '{tool_name}' not found"}
                    else:
                        tool_result = await tool.ainvoke(tool_args)

                    messages.append(
                        ToolMessage(
                            content=_to_text(tool_result),
                            tool_call_id=tool_call_id or "",
                        )
                    )

            # Generate structured response based on all gathered information
            print("  πŸ“Š Generating structured response...")
            structured_prompt = """Based on the database query results, provide a structured response.

Guidelines:
- For portfolio viewing: Set response_type to "portfolio_view" and populate holdings list
- For adding transactions: Set response_type to "transaction_added" and populate transaction field
- For transaction history: Set response_type to "transaction_history" and populate transactions list
- For other queries: Set response_type to "general_response"
- Always provide a clear summary
- Include relevant insights when possible"""

            messages.append(HumanMessage(content=structured_prompt))

            if not self.model_structured:
                raise RuntimeError("Structured output model not initialized")

            structured_response = await self.model_structured.ainvoke(messages)

            # Format the structured response into readable text
            final_response = format_portfolio_response(structured_response)

            return {
                "success": True,
                "agent": self.name,
                "response": final_response,
                "query": query,
                "structured_data": structured_response.model_dump(),  # Include raw structured data
                "tool_calls": tool_calls_info
            }

        except Exception as e:
            print(f"  ❌ Error in {self.name}: {e}")
            import traceback
            traceback.print_exc()
            return {
                "success": False,
                "agent": self.name,
                "error": str(e),
                "query": query
            }

    async def cleanup(self) -> None:
        """Cleanup resources."""
        if self.toolbox_client:
            try:
                # Exit the async context manager
                await self.toolbox_client.__aexit__(None, None, None)
            except Exception as e:
                print(f"  ⚠️  Warning during cleanup: {e}")
        print(f"🧹 {self.name} cleaned up")