File size: 7,013 Bytes
6fe7c36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""Crypto Agent using CoinGecko MCP Server via LangChain MCP adapters."""
import json
import shutil
from typing import Any, Dict, List, Optional

from src.core.config import config
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.tools import BaseTool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_mcp_adapters.client import MultiServerMCPClient


def _resolve_executable(candidates: List[str]) -> str:
    """Return first executable path found in PATH."""
    for name in candidates:
        resolved = shutil.which(name)
        if resolved:
            return resolved
    raise FileNotFoundError(f"Unable to locate any of: {', '.join(candidates)}")


def _to_text(payload: Any) -> str:
    """Convert model or tool output into a printable string."""
    if isinstance(payload, str):
        return payload
    try:
        return json.dumps(payload, ensure_ascii=False)
    except TypeError:
        return str(payload)


class CryptoAgentMCP:
    """Agent specialized in cryptocurrency data using CoinGecko MCP Server."""

    def __init__(self, use_public_endpoint: bool = False):
        self.name = "Crypto Agent (MCP)"
        self.description = "Cryptocurrency market data and analysis expert using CoinGecko MCP Server"
        self.use_public_endpoint = use_public_endpoint
        self.mcp_client: Optional[MultiServerMCPClient] = None
        self.model: Optional[ChatGoogleGenerativeAI] = None
        self.model_with_tools = None
        self.tools: List[BaseTool] = []
        self.tool_map: Dict[str, BaseTool] = {}

    async def initialize(self) -> None:
        """Initialize the agent with CoinGecko MCP Server."""
        print(f"πŸ”§ Initializing {self.name}...")

        try:
            # Connect to CoinGecko MCP Server
            print(f"  πŸ“‘ Connecting to CoinGecko MCP Server...")

            connection_name = "coingecko"
            connections: Dict[str, Dict[str, Any]] = {}

            api_key = (config.COINGECKO_API_KEY or "").strip()
            if api_key.lower().startswith("demo"):
                print("    Demo API key detected. Using public endpoint with limited access...")
                self.use_public_endpoint = True

            if self.use_public_endpoint or not api_key:
                print("    Using public SSE endpoint...")
                connections[connection_name] = {
                    "transport": "sse",
                    "url": "https://mcp.api.coingecko.com/sse",
                }
            else:
                print("    Using Pro endpoint with API key...")
                npx_executable = _resolve_executable(["npx.cmd", "npx.exe", "npx"])
                env = {
                    "COINGECKO_PRO_API_KEY": api_key,
                    "COINGECKO_ENVIRONMENT": "pro",
                }
                connections[connection_name] = {
                    "transport": "stdio",
                    "command": npx_executable,
                    "args": ["-y", "@coingecko/coingecko-mcp"],
                    "env": env,
                }

            self.mcp_client = MultiServerMCPClient(connections)

            # Load MCP tools as LangChain tools
            self.tools = await self.mcp_client.get_tools(server_name=connection_name)
            if not self.tools:
                raise RuntimeError("No tools available from CoinGecko MCP Server")

            self.tool_map = {tool.name: tool for tool in self.tools}

            # Initialize Gemini chat model bound to tools
            self.model = ChatGoogleGenerativeAI(
                model="gemini-2.5-flash",
                temperature=0.1,
                google_api_key=config.GOOGLE_API_KEY,
            )
            self.model_with_tools = self.model.bind_tools(self.tools)

            print(f"  βœ… Connected to CoinGecko MCP Server with {len(self.tools)} tools")

            print(f"  βœ… {self.name} ready!")

        except Exception as e:
            import traceback
            print(f"  ❌ Error initializing {self.name}: {e}")
            print(f"  πŸ“‹ Full error details:")
            traceback.print_exc()
            raise

    async def process(self, query: str, history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
        """Process a query using CoinGecko MCP Server tools."""
        try:
            print(f"\nπŸ’° {self.name} processing: '{query}'")
            messages: List[Any] = []
            if history:
                trimmed_history = history[-10:]
                for turn in trimmed_history:
                    user_text = turn.get("user")
                    if user_text:
                        messages.append(HumanMessage(content=user_text))
                    assistant_text = turn.get("assistant")
                    if assistant_text:
                        messages.append(AIMessage(content=assistant_text))
            messages.append(HumanMessage(content=query))
            final_response = ""
            tool_calls_info = []

            while True:
                if not self.model_with_tools:
                    raise RuntimeError("Model not initialized with tools")

                ai_message = await self.model_with_tools.ainvoke(messages)
                messages.append(ai_message)

                tool_calls = getattr(ai_message, "tool_calls", [])
                if not tool_calls:
                    final_response = _to_text(ai_message.content)
                    break

                for call in tool_calls:
                    tool_name = call.get("name")
                    tool_args = call.get("args", {})
                    tool_call_id = call.get("id")
                    print(f"  πŸ”§ MCP Tool call: {tool_name}({tool_args})")
                    tool_calls_info.append(f"πŸ”§ MCP Tool call: {tool_name}({tool_args})")

                    tool = self.tool_map.get(tool_name)
                    if not tool:
                        tool_result = {"error": f"Tool '{tool_name}' not found"}
                    else:
                        tool_result = await tool.ainvoke(tool_args)

                    messages.append(
                        ToolMessage(
                            content=_to_text(tool_result),
                            tool_call_id=tool_call_id or "",
                        )
                    )

            return {
                "success": True,
                "agent": self.name,
                "response": final_response,
                "query": query,
                "tool_calls": tool_calls_info
            }

        except Exception as e:
            print(f"  ❌ Error in {self.name}: {e}")
            import traceback
            traceback.print_exc()
            return {
                "success": False,
                "agent": self.name,
                "error": str(e),
                "query": query
            }

    async def cleanup(self) -> None:
        """Cleanup resources."""
        print(f"🧹 {self.name} cleaned up")