Spaces:
Running
Running
| import os | |
| import io | |
| import json | |
| import asyncio | |
| import base64 | |
| import time | |
| from typing import Optional | |
| import gradio as gr | |
| from pydantic import BaseModel | |
| # Optional: use openai if available for transcription and image captioning | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except Exception: | |
| OPENAI_AVAILABLE = False | |
| # ----------------------------- | |
| # Configuration | |
| # ----------------------------- | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY") | |
| HUGGINGFACE_API_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| GOOGLE_GEMINI_API_KEY = os.environ.get("GOOGLE_GEMINI_API_KEY") | |
| if OPENAI_API_KEY and OPENAI_AVAILABLE: | |
| openai.api_key = OPENAI_API_KEY | |
| # ElevenLabs defaults | |
| ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL") # placeholder | |
| ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech" | |
| # Hugging Face Inference API endpoint (for image captioning fallback) | |
| HF_INFERENCE_URL = "https://huggingface.co/proxy/api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" | |
| # ----------------------------- | |
| # Minimal MCP Server shim | |
| # ----------------------------- | |
| class ToolResult(BaseModel): | |
| content: str | |
| meta: Optional[dict] = None | |
| class MCPServer: | |
| def __init__(self, name: str, version: str = "0.1.0"): | |
| self.name = name | |
| self.version = version | |
| self.tools = {} | |
| def tool(self, name: str, description: str = ""): | |
| def decorator(fn): | |
| self.tools[name] = { | |
| "fn": fn, | |
| "description": description, | |
| } | |
| return fn | |
| return decorator | |
| async def run_tool(self, name: str, *args, **kwargs): | |
| tool = self.tools.get(name) | |
| if not tool: | |
| raise ValueError(f"Tool {name} not found") | |
| fn = tool["fn"] | |
| if asyncio.iscoroutinefunction(fn): | |
| res = await fn(*args, **kwargs) | |
| else: | |
| res = fn(*args, **kwargs) | |
| if isinstance(res, ToolResult): | |
| return res | |
| return ToolResult(content=str(res)) | |
| server = MCPServer("accessibility_voice_mcp") | |
| # ----------------------------- | |
| # Utilities: STT, TTS, Image describe | |
| # ----------------------------- | |
| def transcribe_with_openai(audio_file_path: str) -> str: | |
| """Transcribe audio using OpenAI Whisper (if available).""" | |
| if not OPENAI_AVAILABLE: | |
| return "OpenAI library not available" | |
| try: | |
| with open(audio_file_path, "rb") as f: | |
| transcript = openai.Audio.transcriptions.create(model="whisper-1", file=f) | |
| if isinstance(transcript, dict): | |
| return transcript.get("text", "") | |
| return getattr(transcript, "text", "") | |
| except Exception as e: | |
| return f"OpenAI transcription error: {e}" | |
| def transcribe_fallback(audio_file_path: str) -> str: | |
| """Fallback: invoke whisper from local package (if installed).""" | |
| try: | |
| import whisper | |
| model = whisper.load_model("small") | |
| res = model.transcribe(audio_file_path) | |
| return res.get("text", "") | |
| except Exception as e: | |
| return f"Local transcription fallback failed: {e}" | |
| def tts_elevenlabs(text: str) -> bytes: | |
| """Call ElevenLabs API to synthesize speech. Returns raw audio bytes.""" | |
| if not ELEVENLABS_API_KEY: | |
| raise RuntimeError("ELEVENLABS_API_KEY not set in environment") | |
| import requests | |
| url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}" | |
| headers = { | |
| "xi-api-key": ELEVENLABS_API_KEY, | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "text": text, | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.75} | |
| } | |
| resp = requests.post(url, headers=headers, json=payload, stream=True) | |
| if resp.status_code != 200: | |
| raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}") | |
| return resp.content | |
| def describe_image_hf(image_path: str) -> str: | |
| """Describe an image using Hugging Face Inference API (BLIP model hosted).""" | |
| try: | |
| import requests | |
| if not HUGGINGFACE_API_TOKEN: | |
| return "HUGGINGFACE_API_TOKEN not set" | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| headers = { | |
| "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}" | |
| } | |
| # The HF Inference API accepts files as binary | |
| resp = requests.post(HF_INFERENCE_URL, headers=headers, data=image_bytes) | |
| if resp.status_code != 200: | |
| return f"HF Inference error: {resp.status_code} {resp.text}" | |
| # Model returns JSON with 'generated_text' or a simple string depending on model | |
| try: | |
| j = resp.json() | |
| # Some endpoints return [{'generated_text': '...'}] | |
| if isinstance(j, list) and j and 'generated_text' in j[0]: | |
| return j[0]['generated_text'] | |
| if isinstance(j, dict) and 'generated_text' in j: | |
| return j['generated_text'] | |
| # Otherwise return text | |
| return str(j) | |
| except Exception: | |
| return resp.text | |
| except Exception as e: | |
| return f"HF describe error: {e}" | |
| def describe_image_openai(image_path: str) -> str: | |
| """Describe an image using OpenAI Vision (modern SDK compatible).""" | |
| if not OPENAI_AVAILABLE: | |
| return "OpenAI not available for image captioning" | |
| try: | |
| # Read image bytes | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| # Convert to base64 for safe transport in older SDKs | |
| b64_image = base64.b64encode(image_bytes).decode("utf-8") | |
| # Modern prompt content | |
| prompt = ( | |
| "You are an accessibility assistant that describes images for visually impaired users. " | |
| "Provide a clear, helpful, vivid, human-friendly description of the image.\n" | |
| ) | |
| # Some OpenAI SDK versions require: client = openai.OpenAI() | |
| try: | |
| client = openai.OpenAI() | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You describe images for visually impaired users."}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": f"data:image/jpeg;base64,{b64_image}" | |
| } | |
| ]} | |
| ], | |
| max_tokens=300, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception: | |
| # Fallback for legacy SDKs | |
| legacy_prompt = ( | |
| "You are an assistant that describes images for visually impaired users.\n" | |
| "Provide a concise, vivid, accessible description.\n" | |
| "Image(base64): " + b64_image | |
| ) | |
| resp = openai.ChatCompletion.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": legacy_prompt}], | |
| max_tokens=300, | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| return f"OpenAI image describe error: {e}" | |
| # ----------------------------- | |
| # MCP Tools | |
| # ----------------------------- | |
| def speak_text_tool(text: str) -> ToolResult: | |
| try: | |
| audio_bytes = tts_elevenlabs(text) | |
| encoded = base64.b64encode(audio_bytes).decode("utf-8") | |
| return ToolResult(content=encoded, meta={"format": "base64-audio"}) | |
| except Exception as e: | |
| return ToolResult(content=f"TTS Error: {e}") | |
| def describe_image_tool(image_path: str) -> ToolResult: | |
| # Priority: OpenAI -> Gemini -> Hugging Face Inference -> error | |
| if OPENAI_AVAILABLE: | |
| desc = describe_image_openai(image_path) | |
| if desc and not desc.startswith("OpenAI image describe error"): | |
| return ToolResult(content=desc, meta={"backend":"openai"}) | |
| # Gemini (if configured) | |
| if GOOGLE_GEMINI_API_KEY: | |
| try: | |
| import google.generativeai as genai | |
| genai.configure(api_key=GOOGLE_GEMINI_API_KEY) | |
| model = genai.GenerativeModel("gemini-1.5-flash") | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| response = model.generate_content(["Describe this image for a visually impaired user.", {"mime_type":"image/jpeg", "data": image_bytes}]) | |
| return ToolResult(content=response.text, meta={"backend":"gemini"}) | |
| except Exception: | |
| pass | |
| # Hugging Face Inference | |
| desc = describe_image_hf(image_path) | |
| if desc: | |
| return ToolResult(content=desc, meta={"backend":"huggingface"}) | |
| return ToolResult(content="No image captioning backend available. Set OPENAI_API_KEY, GOOGLE_GEMINI_API_KEY, or HUGGINGFACE_API_TOKEN.") | |
| def transcribe_audio_tool(audio_path: str) -> ToolResult: | |
| start = time.time() | |
| if OPENAI_AVAILABLE: | |
| text = transcribe_with_openai(audio_path) | |
| duration = time.time() - start | |
| return ToolResult(content=text, meta={"backend":"openai","duration":duration}) | |
| else: | |
| text = transcribe_fallback(audio_path) | |
| duration = time.time() - start | |
| return ToolResult(content=text, meta={"backend":"local_whisper","duration":duration}) | |
| # ----------------------------- | |
| # Gradio UI (client) | |
| # ----------------------------- | |
| def decode_base64_audio(b64: str) -> bytes: | |
| return base64.b64decode(b64) | |
| app_theme = { | |
| "primary_hue": "blue", | |
| "secondary_hue": "slate", | |
| } | |
| # Helper to format tool-call explanations | |
| def format_tool_log(tool_name, reason, meta, output, style="A"): | |
| backend = meta.get("backend") if meta else "unknown" | |
| duration = meta.get("duration") if meta else None | |
| # --------------------------- | |
| # Style A: Simple | |
| # --------------------------- | |
| if style == "A": | |
| return f"[{tool_name}] {backend} -> {str(output)[:200]}" | |
| # --------------------------- | |
| # Style B: Detailed Human-Readable | |
| # --------------------------- | |
| if style == "B": | |
| lines = [ | |
| f"🔧 Tool: {tool_name}", | |
| f"🎯 Why: {reason}", | |
| f"⚙️ Backend: {backend}", | |
| ] | |
| if duration is not None: | |
| try: | |
| lines.append(f"⏱ Duration: {float(duration):.2f}s") | |
| except: | |
| lines.append(f"⏱ Duration: {duration}") | |
| lines.append(f"📝 Output: {str(output)}") | |
| return "\n".join(lines) | |
| # --------------------------- | |
| # Style C: Ultra-visual | |
| # --------------------------- | |
| if style == "C": | |
| parts = [ | |
| f"🔧 {tool_name}", | |
| f"• Reason: {reason}", | |
| f"• Backend: {backend}", | |
| ] | |
| if duration is not None: | |
| try: | |
| parts.append(f"• {float(duration):.2f}s") | |
| except: | |
| parts.append(f"• {duration}") | |
| visual = " ".join(parts) + "\n" + f"→ {str(output)}" | |
| return visual | |
| # --------------------------- | |
| # Style D: Both Simple + Detailed | |
| # --------------------------- | |
| return { | |
| "simple": f"[{tool_name}] {backend} -> {str(output)[:200]}", | |
| "detailed": format_tool_log(tool_name, reason, meta, output, style="B"), | |
| } | |
| # Conversion helpers for chat history between 'messages' (gradio new) and tuple list used in logic | |
| def messages_to_tuples(messages): | |
| # messages is a list of dicts {"role": "user"/"assistant", "content": "..."} | |
| tuples = [] | |
| if not messages: | |
| return tuples | |
| for m in messages: | |
| if isinstance(m, dict): | |
| role = m.get("role", "user") | |
| content = m.get("content", "") | |
| tuples.append((content, "")) if role == "user" else tuples.append(("", content)) | |
| elif isinstance(m, (list, tuple)) and len(m) == 2: | |
| tuples.append((m[0], m[1])) | |
| else: | |
| # fallback: treat as assistant reply | |
| tuples.append(("", str(m))) | |
| return tuples | |
| def tuples_to_messages(tuples): | |
| messages = [] | |
| for user_text, assistant_text in tuples: | |
| if user_text: | |
| messages.append({"role":"user","content":user_text}) | |
| if assistant_text: | |
| messages.append({"role":"assistant","content":assistant_text}) | |
| return messages | |
| with gr.Blocks(css=".gradio-container {background:#f7fafc}") as demo: | |
| gr.Markdown("# Accessibility Voice Agent — MCP Tools") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # Set type='messages' to avoid the deprecation warning, and convert inside handlers. | |
| chatbox = gr.Chatbot(label="Assistant", elem_id="chatbox", type="messages") | |
| user_input = gr.Textbox(placeholder="Type or press the microphone to speak...", show_label=False) | |
| with gr.Row(): | |
| # Some gradio versions don't accept 'source' kw; remove it to be broadly compatible. | |
| mic = gr.Audio(type="filepath", label="Record voice (press to record)") | |
| send_btn = gr.Button("Send") | |
| with gr.Accordion("Advanced / Tools", open=False): | |
| tts_text = gr.Textbox(label="Text to speak (ElevenLabs)") | |
| tts_btn = gr.Button("Speak (TTS)") | |
| img_upload = gr.File(label="Upload image (for description)") | |
| img_btn = gr.Button("Describe image") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Tool Call Log & Explanations") | |
| log_style = gr.Radio(choices=["A","B","C","D"], value="B", label="Log style (A:Simple B:Detailed C:Visual D:Both)") | |
| tools_log = gr.Textbox(value="Ready.", lines=20, interactive=False, label="Tools Log") | |
| tools_panel = gr.HTML("<div id='tools_panel' style='max-height:400px;overflow:auto;background:#ffffff;padding:8px;border-radius:8px;'></div>") | |
| gr.Markdown("---") | |
| gr.Markdown("**Tool explanations appear here each time a tool runs.**") | |
| # Callbacks | |
| def on_send_text(text, chat_history, mic_file, style): | |
| tools_entries = [] | |
| # convert incoming chat_history (messages) into tuples for internal logic | |
| tuples = messages_to_tuples(chat_history) | |
| if mic_file: | |
| # transcribe audio | |
| tr = transcribe_audio_tool(mic_file) | |
| user_text = tr.content | |
| log = format_tool_log("transcribe_audio", "User provided microphone audio", tr.meta or {}, tr.content, style) | |
| tools_entries.append(log) | |
| else: | |
| user_text = text or "" | |
| # Append user message to tuples and placeholder assistant | |
| tuples.append((user_text, "...")) | |
| # demo assistant behavior | |
| if user_text and user_text.strip().lower().startswith("describe image:"): | |
| # expects: "describe image: filename" | |
| _, _, fname = user_text.partition(":") | |
| fname = fname.strip() | |
| if fname: | |
| # We assume the image was uploaded earlier and path provided | |
| res = describe_image_tool(fname) | |
| assistant = res.content | |
| log = format_tool_log("describe_image", "User requested image description", res.meta or {}, res.content, style) | |
| tools_entries.append(log) | |
| else: | |
| assistant = "Please upload an image using the Describe Image tool or provide a path like: describe image: /path/to/image.jpg" | |
| else: | |
| assistant = "I heard: " + (user_text or "(empty)") | |
| # replace placeholder assistant | |
| tuples[-1] = (tuples[-1][0], assistant) | |
| # update tools panel content | |
| panel_html = '' | |
| if isinstance(log, dict): | |
| # D style returns dict | |
| panel_html += f"<pre>{log['detailed']}</pre>" | |
| panel_html += f"<hr><pre>{log['simple']}</pre>" | |
| else: | |
| for e in tools_entries: | |
| panel_html += f"<pre style='background:#f1f5f9;border-radius:6px;padding:8px;margin-bottom:8px;'>{e}</pre>" | |
| # convert back to messages for gr.Chatbot | |
| new_messages = tuples_to_messages(tuples) | |
| return new_messages, gr.update(value="\n".join(tools_entries) or "Ready."), gr.update(value=panel_html) | |
| send_btn.click(on_send_text, inputs=[user_input, chatbox, mic, log_style], outputs=[chatbox, tools_log, tools_panel]) | |
| def on_tts(text, style): | |
| if not text: | |
| return None, gr.update(value="No text provided") | |
| res = speak_text_tool(text) | |
| if res.meta and res.meta.get("format") == "base64-audio": | |
| audio_bytes = decode_base64_audio(res.content) | |
| log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, "<audio bytes>", style) | |
| panel_html = f"<pre style='background:#eef2ff;padding:8px;border-radius:6px;'>{log}</pre>" | |
| return (audio_bytes, 16000), gr.update(value=panel_html) | |
| else: | |
| log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, res.content, style) | |
| panel_html = f"<pre style='background:#fee2e2;padding:8px;border-radius:6px;'>{log}</pre>" | |
| return None, gr.update(value=panel_html) | |
| tts_btn.click(on_tts, inputs=[tts_text, log_style], outputs=[gr.Audio(label="TTS Output"), tools_panel]) | |
| def on_describe_image(file_obj, style): | |
| if not file_obj: | |
| return [], gr.update(value="No file uploaded") | |
| # file_obj may be an UploadFile-like object; get path or save to tmp file | |
| path = getattr(file_obj, 'name', None) | |
| # If it's a temporary file dict (from gr.File), it might be a dict with 'name' and 'tmp_path' | |
| if isinstance(file_obj, dict) and 'tmp_path' in file_obj: | |
| path = file_obj['tmp_path'] | |
| if not path: | |
| # try to save bytes | |
| try: | |
| contents = file_obj.read() | |
| tmp_path = "/tmp/gr_uploaded_image.jpg" | |
| with open(tmp_path, "wb") as f: | |
| f.write(contents) | |
| path = tmp_path | |
| except Exception as e: | |
| return [], gr.update(value=f"Failed to read uploaded file: {e}") | |
| res = describe_image_tool(path) | |
| log = format_tool_log("describe_image", "User uploaded an image for description", res.meta or {}, res.content, style) | |
| panel_html = f"<pre style='background:#ecfdf5;padding:8px;border-radius:6px;'>{log}</pre>" | |
| # Return as messages for chatbox | |
| messages = [{"role":"user","content":"<image uploaded>"}, {"role":"assistant","content":res.content}] | |
| return messages, gr.update(value=panel_html) | |
| img_btn.click(on_describe_image, inputs=[img_upload, log_style], outputs=[chatbox, tools_panel]) | |
| # API Keys accordion (session-only) | |
| with gr.Accordion("🔑 API Keys (stored only in session)", open=False): | |
| openai_key = gr.Textbox(label="OpenAI API Key", type="password") | |
| eleven_key = gr.Textbox(label="ElevenLabs API Key", type="password") | |
| hf_key = gr.Textbox(label="Hugging Face API Token", type="password") | |
| def set_keys(ok, ek, hk): | |
| if ok: | |
| os.environ["OPENAI_API_KEY"] = ok | |
| if ek: | |
| os.environ["ELEVENLABS_API_KEY"] = ek | |
| if hk: | |
| os.environ["HUGGINGFACE_API_TOKEN"] = hk | |
| return "API keys set for this session. Refresh the page to pick them up in all runtimes." | |
| set_btn = gr.Button("Save API Keys") | |
| set_output = gr.Textbox(label="Status") | |
| set_btn.click(set_keys, [openai_key, eleven_key, hf_key], [set_output]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) | |