Zoho_mcp_client / app.py
vachaspathi's picture
Update app.py
028d1d6 verified
raw
history blame
17.2 kB
# app.py — MCP POC updated to use Kimi (Moonshot) tool_calls flow (HTTP-based)
# IMPORTANT:
# - Put keys in config.py (do NOT paste keys in chat).
# - requirements.txt should include: fastmcp, gradio, requests
from mcp.server.fastmcp import FastMCP
from typing import Optional, List, Tuple, Any, Dict
import requests
import os
import gradio as gr
import json
import time
import traceback
import inspect
import uuid
# ----------------------------
# Load secrets/config - edit config.py accordingly
# ----------------------------
try:
from config import (
CLIENT_ID,
CLIENT_SECRET,
REFRESH_TOKEN,
API_BASE,
KIMI_API_KEY, # Moonshot Kimi API key (put it in config.py)
KIMI_MODEL # optional; default "moonshot-v1-8k" used if missing
)
except Exception:
raise SystemExit(
"Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, "
"API_BASE, KIMI_API_KEY. Optionally set KIMI_MODEL."
)
KIMI_BASE_URL = "https://api.moonshot.ai/v1"
KIMI_MODEL = globals().get("KIMI_MODEL", "moonshot-v1-8k")
# ----------------------------
# Initialize FastMCP
# ----------------------------
mcp = FastMCP("ZohoCRMAgent")
# ----------------------------
# Analytics / KPI logging (simple local JSON file)
# ----------------------------
ANALYTICS_PATH = "mcp_analytics.json"
def _init_analytics():
if not os.path.exists(ANALYTICS_PATH):
base = {
"tool_calls": {},
"llm_calls": 0,
"last_llm_confidence": None,
"created_at": time.time(),
}
with open(ANALYTICS_PATH, "w") as f:
json.dump(base, f, indent=2)
def _log_tool_call(tool_name: str, success: bool = True):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0})
data["tool_calls"][tool_name]["count"] += 1
if success:
data["tool_calls"][tool_name]["success"] += 1
else:
data["tool_calls"][tool_name]["fail"] += 1
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
def _log_llm_call(confidence: Optional[float] = None):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["llm_calls"] = data.get("llm_calls", 0) + 1
if confidence is not None:
data["last_llm_confidence"] = confidence
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
_init_analytics()
# ----------------------------
# Kimi HTTP helpers (calls Moonshot Kimi API)
# ----------------------------
def _kimi_headers():
return {"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"}
def _kimi_chat_completion(messages: List[Dict], tools: Optional[List[Dict]] = None, model: str = KIMI_MODEL):
"""
Send a single chat/completion request to Kimi. Returns the full parsed JSON response.
"""
body = {
"model": model,
"messages": messages
}
# include tools if present (tools should be JSON Schema declarations)
if tools:
body["tools"] = tools
url = f"{KIMI_BASE_URL}/chat/completions"
resp = requests.post(url, headers=_kimi_headers(), json=body, timeout=60)
if resp.status_code not in (200, 201):
raise RuntimeError(f"Kimi API error: {resp.status_code} {resp.text}")
return resp.json()
# ----------------------------
# Zoho token refresh & headers
# ----------------------------
def _get_valid_token_headers() -> dict:
token_url = "https://accounts.zoho.in/oauth/v2/token"
params = {
"refresh_token": REFRESH_TOKEN,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token"
}
r = requests.post(token_url, params=params, timeout=20)
if r.status_code == 200:
t = r.json().get("access_token")
return {"Authorization": f"Zoho-oauthtoken {t}"}
else:
raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}")
# ----------------------------
# MCP tools: Zoho CRM and Books (CRUD + documents)
# (same as earlier; use these function names as tool names in the Kimi tools definitions)
# ----------------------------
@mcp.tool()
def authenticate_zoho() -> str:
try:
_ = _get_valid_token_headers()
_log_tool_call("authenticate_zoho", True)
return "Zoho token refreshed (ok)."
except Exception as e:
_log_tool_call("authenticate_zoho", False)
return f"Failed to authenticate: {e}"
@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
payload = {"data": [record_data]}
r = requests.post(url, headers=headers, json=payload, timeout=20)
if r.status_code in (200, 201):
_log_tool_call("create_record", True)
return json.dumps(r.json(), ensure_ascii=False)
else:
_log_tool_call("create_record", False)
return f"Error creating record: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_record", False)
return f"Exception: {e}"
@mcp.tool()
def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20)
if r.status_code == 200:
_log_tool_call("get_records", True)
return r.json().get("data", [])
else:
_log_tool_call("get_records", False)
return [f"Error retrieving {module_name}: {r.status_code} {r.text}"]
except Exception as e:
_log_tool_call("get_records", False)
return [f"Exception: {e}"]
@mcp.tool()
def update_record(module_name: str, record_id: str, data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
payload = {"data": [data]}
r = requests.put(url, headers=headers, json=payload, timeout=20)
if r.status_code == 200:
_log_tool_call("update_record", True)
return json.dumps(r.json(), ensure_ascii=False)
else:
_log_tool_call("update_record", False)
return f"Error updating: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("update_record", False)
return f"Exception: {e}"
@mcp.tool()
def delete_record(module_name: str, record_id: str) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
r = requests.delete(url, headers=headers, timeout=20)
if r.status_code == 200:
_log_tool_call("delete_record", True)
return json.dumps(r.json(), ensure_ascii=False)
else:
_log_tool_call("delete_record", False)
return f"Error deleting: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("delete_record", False)
return f"Exception: {e}"
@mcp.tool()
def create_invoice(data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/invoices"
r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20)
if r.status_code in (200, 201):
_log_tool_call("create_invoice", True)
return json.dumps(r.json(), ensure_ascii=False)
else:
_log_tool_call("create_invoice", False)
return f"Error creating invoice: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_invoice", False)
return f"Exception: {e}"
@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
try:
extracted = {}
if os.path.exists(file_path):
# For POC: simulated extraction; replace with real OCR and parsing
extracted = {
"Name": "ACME Corp (simulated)",
"Email": "[email protected]",
"Phone": "+91-99999-00000",
"Total": "1234.00",
"Confidence": 0.87,
}
else:
extracted = {"note": "file not found locally; treat as URL in production", "path": file_path}
_log_tool_call("process_document", True)
return {
"status": "success",
"file": os.path.basename(file_path),
"source_path": file_path,
"target_module": target_module,
"extracted_data": extracted,
}
except Exception as e:
_log_tool_call("process_document", False)
return {"status": "error", "error": str(e)}
# ----------------------------
# Tool map for local execution (used to run tool_calls returned by Kimi)
# ----------------------------
# Keys should match the "name" you place in the tools JSON you send to Kimi
tool_map = {
"authenticate_zoho": authenticate_zoho,
"create_record": create_record,
"get_records": get_records,
"update_record": update_record,
"delete_record": delete_record,
"create_invoice": create_invoice,
"process_document": process_document,
}
# ----------------------------
# Build the "tools" JSON to send to Kimi (simple schema per doc)
# For the POC, declare only a subset or declare all tools. Each tool is a JSON schema.
# Below is an example declaration for create_record; expand as needed.
# ----------------------------
def build_tool_definitions():
# Example: create simple JSON schema definitions that Kimi can use.
# Keep definitions concise to avoid token blowup.
tools = [
{
"type": "function",
"function": {
"name": "create_record",
"description": "Create a record in a Zoho CRM module. Args: module_name (str), record_data (json).",
"parameters": {
"type": "object",
"properties": {
"module_name": {"type": "string"},
"record_data": {"type": "object"}
},
"required": ["module_name", "record_data"]
}
}
},
{
"type": "function",
"function": {
"name": "process_document",
"description": "Process an uploaded document (local path or URL). Args: file_path, target_module.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"target_module": {"type": "string"}
},
"required": ["file_path"]
}
}
},
# Add more tool definitions (get_records, update_record, create_invoice, etc.) similarly if needed
]
return tools
# ----------------------------
# Kimi tool_calls orchestration loop (follows Moonshot docs)
# ----------------------------
def kimi_chat_with_tools(user_message: str, history: Optional[List[Dict]] = None):
"""
Orchestrates the chat + tool_calls flow with Kimi:
- messages: list of dict {"role": "system"/"user"/"assistant"/"tool", "content": "..." }
- tools: list of JSON schema tool definitions (from build_tool_definitions)
The loop:
1. call Kimi with messages+tools
2. if Kimi returns finish_reason == "tool_calls", iterate each tool_call, execute local tool, append role=tool message with tool_call_id and continue
3. when finish_reason == "stop" or other, return assistant content
"""
# Build initial messages list from history (history is list of (user, assistant) tuples)
messages = []
system_prompt = (
"You are Zoho Assistant. Use available tools when needed. "
"When you want to perform an action, return tool_calls. Otherwise, return normal assistant text."
)
messages.append({"role": "system", "content": system_prompt})
history = history or []
for pair in history:
try:
user_turn, assistant_turn = pair[0], pair[1]
except Exception:
if isinstance(pair, dict):
user_turn = pair.get("user", "")
assistant_turn = pair.get("assistant", "")
else:
user_turn, assistant_turn = "", ""
if user_turn:
messages.append({"role": "user", "content": user_turn})
if assistant_turn:
messages.append({"role": "assistant", "content": assistant_turn})
# Append the new user message
messages.append({"role": "user", "content": user_message})
# Prepare tool definitions
tools = build_tool_definitions()
finish_reason = None
assistant_reply_text = None
# Start loop
while True:
# Call Kimi
resp_json = _kimi_chat_completion(messages, tools=tools, model=KIMI_MODEL)
# According to docs, response structure: choices[0] with finish_reason and message
choice = resp_json.get("choices", [{}])[0]
finish_reason = choice.get("finish_reason")
message = choice.get("message", {})
# If finish_reason == "tool_calls", Kimi has returned tool_calls to execute
if finish_reason == "tool_calls":
# The message may contain 'tool_calls' field which is a list
tool_calls = message.get("tool_calls", []) or []
# Append the assistant message as-is so the next call has proper context
messages.append(message) # message already contains tool_calls per docs
# Execute each tool_call (can be done in parallel, but we'll do sequential for POC)
for tc in tool_calls:
# tc.function.name and tc.function.arguments (arguments serialized JSON string)
func_meta = tc.get("function", {})
tool_name = func_meta.get("name")
raw_args = func_meta.get("arguments", "{}")
try:
parsed_args = json.loads(raw_args)
except Exception:
parsed_args = {}
# Execute the matching local tool function
tool_fn = tool_map.get(tool_name)
if callable(tool_fn):
try:
result = tool_fn(**parsed_args) if isinstance(parsed_args, dict) else tool_fn(parsed_args)
except Exception as e:
result = {"error": str(e)}
else:
result = {"error": f"tool '{tool_name}' not found locally."}
# Per docs: append a role=tool message with tool_call_id and name so Kimi can match it
tool_message = {
"role": "tool",
"tool_call_id": tc.get("id") or str(uuid.uuid4()),
"name": tool_name,
"content": json.dumps(result, ensure_ascii=False)
}
messages.append(tool_message)
# Continue loop: call Kimi again with appended tool messages
continue
else:
# finish_reason != tool_calls; assistant likely returned a final response
# message.content may be the assistant reply
assistant_reply_text = message.get("content", "")
# Log LLM call (no explicit confidence field in this response shape; leave None)
_log_llm_call(None)
break
return assistant_reply_text or "(no content)"
# ----------------------------
# Chat handler + Gradio UI
# ----------------------------
def chat_handler(message, history):
history = history or []
trimmed = (message or "").strip()
DEV_TEST_PREFIX = "/mnt/data/"
if trimmed.startswith(DEV_TEST_PREFIX):
try:
doc = process_document(trimmed)
return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}"
except Exception as e:
return f"Error processing document: {e}"
# Otherwise call Kimi with tool_calls loop
try:
reply = kimi_chat_with_tools(trimmed, history)
return reply
except Exception as e:
return f"(Kimi error) {e}"
def chat_interface():
return gr.ChatInterface(
fn=chat_handler,
textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).")
)
# ----------------------------
# Launch
# ----------------------------
if __name__ == "__main__":
print("[startup] Launching Gradio UI + FastMCP server (Kimi tool_calls integration).")
demo = chat_interface()
demo.launch(server_name="0.0.0.0", server_port=7860)