Spaces:
Runtime error
Runtime error
| # app.py — MCP POC updated to use Kimi (Moonshot) tool_calls flow (HTTP-based) | |
| # IMPORTANT: | |
| # - Put keys in config.py (do NOT paste keys in chat). | |
| # - requirements.txt should include: fastmcp, gradio, requests | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import uuid | |
| # ---------------------------- | |
| # Load secrets/config - edit config.py accordingly | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, | |
| KIMI_API_KEY, # Moonshot Kimi API key (put it in config.py) | |
| KIMI_MODEL # optional; default "moonshot-v1-8k" used if missing | |
| ) | |
| except Exception: | |
| raise SystemExit( | |
| "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, " | |
| "API_BASE, KIMI_API_KEY. Optionally set KIMI_MODEL." | |
| ) | |
| KIMI_BASE_URL = "https://api.moonshot.ai/v1" | |
| KIMI_MODEL = globals().get("KIMI_MODEL", "moonshot-v1-8k") | |
| # ---------------------------- | |
| # Initialize FastMCP | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics / KPI logging (simple local JSON file) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = { | |
| "tool_calls": {}, | |
| "llm_calls": 0, | |
| "last_llm_confidence": None, | |
| "created_at": time.time(), | |
| } | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # Kimi HTTP helpers (calls Moonshot Kimi API) | |
| # ---------------------------- | |
| def _kimi_headers(): | |
| return {"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"} | |
| def _kimi_chat_completion(messages: List[Dict], tools: Optional[List[Dict]] = None, model: str = KIMI_MODEL): | |
| """ | |
| Send a single chat/completion request to Kimi. Returns the full parsed JSON response. | |
| """ | |
| body = { | |
| "model": model, | |
| "messages": messages | |
| } | |
| # include tools if present (tools should be JSON Schema declarations) | |
| if tools: | |
| body["tools"] = tools | |
| url = f"{KIMI_BASE_URL}/chat/completions" | |
| resp = requests.post(url, headers=_kimi_headers(), json=body, timeout=60) | |
| if resp.status_code not in (200, 201): | |
| raise RuntimeError(f"Kimi API error: {resp.status_code} {resp.text}") | |
| return resp.json() | |
| # ---------------------------- | |
| # Zoho token refresh & headers | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| r = requests.post(token_url, params=params, timeout=20) | |
| if r.status_code == 200: | |
| t = r.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {t}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") | |
| # ---------------------------- | |
| # MCP tools: Zoho CRM and Books (CRUD + documents) | |
| # (same as earlier; use these function names as tool names in the Kimi tools definitions) | |
| # ---------------------------- | |
| def authenticate_zoho() -> str: | |
| try: | |
| _ = _get_valid_token_headers() | |
| _log_tool_call("authenticate_zoho", True) | |
| return "Zoho token refreshed (ok)." | |
| except Exception as e: | |
| _log_tool_call("authenticate_zoho", False) | |
| return f"Failed to authenticate: {e}" | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| else: | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| else: | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| else: | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| else: | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| def create_invoice(data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/invoices" | |
| r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| else: | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| try: | |
| extracted = {} | |
| if os.path.exists(file_path): | |
| # For POC: simulated extraction; replace with real OCR and parsing | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "[email protected]", | |
| "Phone": "+91-99999-00000", | |
| "Total": "1234.00", | |
| "Confidence": 0.87, | |
| } | |
| else: | |
| extracted = {"note": "file not found locally; treat as URL in production", "path": file_path} | |
| _log_tool_call("process_document", True) | |
| return { | |
| "status": "success", | |
| "file": os.path.basename(file_path), | |
| "source_path": file_path, | |
| "target_module": target_module, | |
| "extracted_data": extracted, | |
| } | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Tool map for local execution (used to run tool_calls returned by Kimi) | |
| # ---------------------------- | |
| # Keys should match the "name" you place in the tools JSON you send to Kimi | |
| tool_map = { | |
| "authenticate_zoho": authenticate_zoho, | |
| "create_record": create_record, | |
| "get_records": get_records, | |
| "update_record": update_record, | |
| "delete_record": delete_record, | |
| "create_invoice": create_invoice, | |
| "process_document": process_document, | |
| } | |
| # ---------------------------- | |
| # Build the "tools" JSON to send to Kimi (simple schema per doc) | |
| # For the POC, declare only a subset or declare all tools. Each tool is a JSON schema. | |
| # Below is an example declaration for create_record; expand as needed. | |
| # ---------------------------- | |
| def build_tool_definitions(): | |
| # Example: create simple JSON schema definitions that Kimi can use. | |
| # Keep definitions concise to avoid token blowup. | |
| tools = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "create_record", | |
| "description": "Create a record in a Zoho CRM module. Args: module_name (str), record_data (json).", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "module_name": {"type": "string"}, | |
| "record_data": {"type": "object"} | |
| }, | |
| "required": ["module_name", "record_data"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "process_document", | |
| "description": "Process an uploaded document (local path or URL). Args: file_path, target_module.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string"}, | |
| "target_module": {"type": "string"} | |
| }, | |
| "required": ["file_path"] | |
| } | |
| } | |
| }, | |
| # Add more tool definitions (get_records, update_record, create_invoice, etc.) similarly if needed | |
| ] | |
| return tools | |
| # ---------------------------- | |
| # Kimi tool_calls orchestration loop (follows Moonshot docs) | |
| # ---------------------------- | |
| def kimi_chat_with_tools(user_message: str, history: Optional[List[Dict]] = None): | |
| """ | |
| Orchestrates the chat + tool_calls flow with Kimi: | |
| - messages: list of dict {"role": "system"/"user"/"assistant"/"tool", "content": "..." } | |
| - tools: list of JSON schema tool definitions (from build_tool_definitions) | |
| The loop: | |
| 1. call Kimi with messages+tools | |
| 2. if Kimi returns finish_reason == "tool_calls", iterate each tool_call, execute local tool, append role=tool message with tool_call_id and continue | |
| 3. when finish_reason == "stop" or other, return assistant content | |
| """ | |
| # Build initial messages list from history (history is list of (user, assistant) tuples) | |
| messages = [] | |
| system_prompt = ( | |
| "You are Zoho Assistant. Use available tools when needed. " | |
| "When you want to perform an action, return tool_calls. Otherwise, return normal assistant text." | |
| ) | |
| messages.append({"role": "system", "content": system_prompt}) | |
| history = history or [] | |
| for pair in history: | |
| try: | |
| user_turn, assistant_turn = pair[0], pair[1] | |
| except Exception: | |
| if isinstance(pair, dict): | |
| user_turn = pair.get("user", "") | |
| assistant_turn = pair.get("assistant", "") | |
| else: | |
| user_turn, assistant_turn = "", "" | |
| if user_turn: | |
| messages.append({"role": "user", "content": user_turn}) | |
| if assistant_turn: | |
| messages.append({"role": "assistant", "content": assistant_turn}) | |
| # Append the new user message | |
| messages.append({"role": "user", "content": user_message}) | |
| # Prepare tool definitions | |
| tools = build_tool_definitions() | |
| finish_reason = None | |
| assistant_reply_text = None | |
| # Start loop | |
| while True: | |
| # Call Kimi | |
| resp_json = _kimi_chat_completion(messages, tools=tools, model=KIMI_MODEL) | |
| # According to docs, response structure: choices[0] with finish_reason and message | |
| choice = resp_json.get("choices", [{}])[0] | |
| finish_reason = choice.get("finish_reason") | |
| message = choice.get("message", {}) | |
| # If finish_reason == "tool_calls", Kimi has returned tool_calls to execute | |
| if finish_reason == "tool_calls": | |
| # The message may contain 'tool_calls' field which is a list | |
| tool_calls = message.get("tool_calls", []) or [] | |
| # Append the assistant message as-is so the next call has proper context | |
| messages.append(message) # message already contains tool_calls per docs | |
| # Execute each tool_call (can be done in parallel, but we'll do sequential for POC) | |
| for tc in tool_calls: | |
| # tc.function.name and tc.function.arguments (arguments serialized JSON string) | |
| func_meta = tc.get("function", {}) | |
| tool_name = func_meta.get("name") | |
| raw_args = func_meta.get("arguments", "{}") | |
| try: | |
| parsed_args = json.loads(raw_args) | |
| except Exception: | |
| parsed_args = {} | |
| # Execute the matching local tool function | |
| tool_fn = tool_map.get(tool_name) | |
| if callable(tool_fn): | |
| try: | |
| result = tool_fn(**parsed_args) if isinstance(parsed_args, dict) else tool_fn(parsed_args) | |
| except Exception as e: | |
| result = {"error": str(e)} | |
| else: | |
| result = {"error": f"tool '{tool_name}' not found locally."} | |
| # Per docs: append a role=tool message with tool_call_id and name so Kimi can match it | |
| tool_message = { | |
| "role": "tool", | |
| "tool_call_id": tc.get("id") or str(uuid.uuid4()), | |
| "name": tool_name, | |
| "content": json.dumps(result, ensure_ascii=False) | |
| } | |
| messages.append(tool_message) | |
| # Continue loop: call Kimi again with appended tool messages | |
| continue | |
| else: | |
| # finish_reason != tool_calls; assistant likely returned a final response | |
| # message.content may be the assistant reply | |
| assistant_reply_text = message.get("content", "") | |
| # Log LLM call (no explicit confidence field in this response shape; leave None) | |
| _log_llm_call(None) | |
| break | |
| return assistant_reply_text or "(no content)" | |
| # ---------------------------- | |
| # Chat handler + Gradio UI | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| DEV_TEST_PREFIX = "/mnt/data/" | |
| if trimmed.startswith(DEV_TEST_PREFIX): | |
| try: | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" | |
| except Exception as e: | |
| return f"Error processing document: {e}" | |
| # Otherwise call Kimi with tool_calls loop | |
| try: | |
| reply = kimi_chat_with_tools(trimmed, history) | |
| return reply | |
| except Exception as e: | |
| return f"(Kimi error) {e}" | |
| def chat_interface(): | |
| return gr.ChatInterface( | |
| fn=chat_handler, | |
| textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).") | |
| ) | |
| # ---------------------------- | |
| # Launch | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| print("[startup] Launching Gradio UI + FastMCP server (Kimi tool_calls integration).") | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |