import os import io import json import asyncio import base64 import time from typing import Optional import gradio as gr from pydantic import BaseModel # Optional: use openai if available for transcription and image captioning try: import openai OPENAI_AVAILABLE = True except Exception: OPENAI_AVAILABLE = False # ----------------------------- # Configuration # ----------------------------- OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY") HUGGINGFACE_API_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") GOOGLE_GEMINI_API_KEY = os.environ.get("GOOGLE_GEMINI_API_KEY") if OPENAI_API_KEY and OPENAI_AVAILABLE: openai.api_key = OPENAI_API_KEY # ElevenLabs defaults ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL") # placeholder ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech" # Hugging Face Inference API endpoint (for image captioning fallback) HF_INFERENCE_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" # ----------------------------- # Minimal MCP Server shim # ----------------------------- class ToolResult(BaseModel): content: str meta: Optional[dict] = None class MCPServer: def __init__(self, name: str, version: str = "0.1.0"): self.name = name self.version = version self.tools = {} def tool(self, name: str, description: str = ""): def decorator(fn): self.tools[name] = { "fn": fn, "description": description, } return fn return decorator async def run_tool(self, name: str, *args, **kwargs): tool = self.tools.get(name) if not tool: raise ValueError(f"Tool {name} not found") fn = tool["fn"] if asyncio.iscoroutinefunction(fn): res = await fn(*args, **kwargs) else: res = fn(*args, **kwargs) if isinstance(res, ToolResult): return res return ToolResult(content=str(res)) server = MCPServer("accessibility_voice_mcp") # ----------------------------- # Utilities: STT, TTS, Image describe # ----------------------------- def transcribe_with_openai(audio_file_path: str) -> str: """Transcribe audio using OpenAI Whisper (if available).""" if not OPENAI_AVAILABLE: return "OpenAI library not available" try: with open(audio_file_path, "rb") as f: transcript = openai.Audio.transcriptions.create(model="whisper-1", file=f) if isinstance(transcript, dict): return transcript.get("text", "") return getattr(transcript, "text", "") except Exception as e: return f"OpenAI transcription error: {e}" def transcribe_fallback(audio_file_path: str) -> str: """Fallback: invoke whisper from local package (if installed).""" try: import whisper model = whisper.load_model("small") res = model.transcribe(audio_file_path) return res.get("text", "") except Exception as e: return f"Local transcription fallback failed: {e}" def tts_elevenlabs(text: str) -> bytes: """Call ElevenLabs API to synthesize speech. Returns raw audio bytes.""" if not ELEVENLABS_API_KEY: raise RuntimeError("ELEVENLABS_API_KEY not set in environment") import requests url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}" headers = { "xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json", } payload = { "text": text, "voice_settings": {"stability": 0.5, "similarity_boost": 0.75} } resp = requests.post(url, headers=headers, json=payload, stream=True) if resp.status_code != 200: raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}") return resp.content def describe_image_hf(image_path: str) -> str: """Describe an image using Hugging Face Inference API (BLIP model hosted).""" try: import requests if not HUGGINGFACE_API_TOKEN: return "HUGGINGFACE_API_TOKEN not set" with open(image_path, "rb") as f: image_bytes = f.read() headers = { "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}" } # The HF Inference API accepts files as binary resp = requests.post(HF_INFERENCE_URL, headers=headers, data=image_bytes) if resp.status_code != 200: return f"HF Inference error: {resp.status_code} {resp.text}" # Model returns JSON with 'generated_text' or a simple string depending on model try: j = resp.json() # Some endpoints return [{'generated_text': '...'}] if isinstance(j, list) and j and 'generated_text' in j[0]: return j[0]['generated_text'] if isinstance(j, dict) and 'generated_text' in j: return j['generated_text'] # Otherwise return text return str(j) except Exception: return resp.text except Exception as e: return f"HF describe error: {e}" def describe_image_openai(image_path: str) -> str: """Describe an image using OpenAI Vision (modern SDK compatible).""" if not OPENAI_AVAILABLE: return "OpenAI not available for image captioning" try: # Read image bytes with open(image_path, "rb") as f: image_bytes = f.read() # Convert to base64 for safe transport in older SDKs b64_image = base64.b64encode(image_bytes).decode("utf-8") # Modern prompt content prompt = ( "You are an accessibility assistant that describes images for visually impaired users. " "Provide a clear, helpful, vivid, human-friendly description of the image.\n" ) # Some OpenAI SDK versions require: client = openai.OpenAI() try: client = openai.OpenAI() response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You describe images for visually impaired users."}, {"role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": f"data:image/jpeg;base64,{b64_image}" } ]} ], max_tokens=300, ) return response.choices[0].message.content.strip() except Exception: # Fallback for legacy SDKs legacy_prompt = ( "You are an assistant that describes images for visually impaired users.\n" "Provide a concise, vivid, accessible description.\n" "Image(base64): " + b64_image ) resp = openai.ChatCompletion.create( model="gpt-4o-mini", messages=[{"role": "user", "content": legacy_prompt}], max_tokens=300, ) return resp.choices[0].message.content.strip() except Exception as e: return f"OpenAI image describe error: {e}" # ----------------------------- # MCP Tools # ----------------------------- @server.tool(name="speak_text", description="Convert text to speech using ElevenLabs") def speak_text_tool(text: str) -> ToolResult: try: audio_bytes = tts_elevenlabs(text) encoded = base64.b64encode(audio_bytes).decode("utf-8") return ToolResult(content=encoded, meta={"format": "base64-audio"}) except Exception as e: return ToolResult(content=f"TTS Error: {e}") @server.tool(name="describe_image", description="Describe an uploaded image for visually impaired users") def describe_image_tool(image_path: str) -> ToolResult: # Priority: OpenAI -> Gemini -> Hugging Face Inference -> error if OPENAI_AVAILABLE: desc = describe_image_openai(image_path) if desc and not desc.startswith("OpenAI image describe error"): return ToolResult(content=desc, meta={"backend":"openai"}) # Gemini (if configured) if GOOGLE_GEMINI_API_KEY: try: import google.generativeai as genai genai.configure(api_key=GOOGLE_GEMINI_API_KEY) model = genai.GenerativeModel("gemini-1.5-flash") with open(image_path, "rb") as f: image_bytes = f.read() response = model.generate_content(["Describe this image for a visually impaired user.", {"mime_type":"image/jpeg", "data": image_bytes}]) return ToolResult(content=response.text, meta={"backend":"gemini"}) except Exception: pass # Hugging Face Inference desc = describe_image_hf(image_path) if desc: return ToolResult(content=desc, meta={"backend":"huggingface"}) return ToolResult(content="No image captioning backend available. Set OPENAI_API_KEY, GOOGLE_GEMINI_API_KEY, or HUGGINGFACE_API_TOKEN.") @server.tool(name="transcribe_audio", description="Transcribe user audio to text") def transcribe_audio_tool(audio_path: str) -> ToolResult: start = time.time() if OPENAI_AVAILABLE: text = transcribe_with_openai(audio_path) duration = time.time() - start return ToolResult(content=text, meta={"backend":"openai","duration":duration}) else: text = transcribe_fallback(audio_path) duration = time.time() - start return ToolResult(content=text, meta={"backend":"local_whisper","duration":duration}) # ----------------------------- # Gradio UI (client) # ----------------------------- def decode_base64_audio(b64: str) -> bytes: return base64.b64decode(b64) app_theme = { "primary_hue": "blue", "secondary_hue": "slate", } # Helper to format tool-call explanations def format_tool_log(tool_name, reason, meta, output, style="A"): backend = meta.get("backend") if meta else "unknown" duration = meta.get("duration") if meta else None # --------------------------- # Style A: Simple # --------------------------- if style == "A": return f"[{tool_name}] {backend} -> {str(output)[:200]}" # --------------------------- # Style B: Detailed Human-Readable # --------------------------- if style == "B": lines = [ f"🔧 Tool: {tool_name}", f"🎯 Why: {reason}", f"⚙️ Backend: {backend}", ] if duration is not None: try: lines.append(f"⏱ Duration: {float(duration):.2f}s") except: lines.append(f"⏱ Duration: {duration}") lines.append(f"📝 Output: {str(output)}") return "\n".join(lines) # --------------------------- # Style C: Ultra-visual # --------------------------- if style == "C": parts = [ f"🔧 {tool_name}", f"• Reason: {reason}", f"• Backend: {backend}", ] if duration is not None: try: parts.append(f"• {float(duration):.2f}s") except: parts.append(f"• {duration}") visual = " ".join(parts) + "\n" + f"→ {str(output)}" return visual # --------------------------- # Style D: Both Simple + Detailed # --------------------------- return { "simple": f"[{tool_name}] {backend} -> {str(output)[:200]}", "detailed": format_tool_log(tool_name, reason, meta, output, style="B"), } # Conversion helpers for chat history between 'messages' (gradio new) and tuple list used in logic def messages_to_tuples(messages): # messages is a list of dicts {"role": "user"/"assistant", "content": "..."} tuples = [] if not messages: return tuples for m in messages: if isinstance(m, dict): role = m.get("role", "user") content = m.get("content", "") tuples.append((content, "")) if role == "user" else tuples.append(("", content)) elif isinstance(m, (list, tuple)) and len(m) == 2: tuples.append((m[0], m[1])) else: # fallback: treat as assistant reply tuples.append(("", str(m))) return tuples def tuples_to_messages(tuples): messages = [] for user_text, assistant_text in tuples: if user_text: messages.append({"role":"user","content":user_text}) if assistant_text: messages.append({"role":"assistant","content":assistant_text}) return messages with gr.Blocks(css=".gradio-container {background:#f7fafc}") as demo: gr.Markdown("# Accessibility Voice Agent — MCP Tools") with gr.Row(): with gr.Column(scale=3): # Set type='messages' to avoid the deprecation warning, and convert inside handlers. chatbox = gr.Chatbot(label="Assistant", elem_id="chatbox", type="messages") user_input = gr.Textbox(placeholder="Type or press the microphone to speak...", show_label=False) with gr.Row(): # Some gradio versions don't accept 'source' kw; remove it to be broadly compatible. mic = gr.Audio(type="filepath", label="Record voice (press to record)") send_btn = gr.Button("Send") with gr.Accordion("Advanced / Tools", open=False): tts_text = gr.Textbox(label="Text to speak (ElevenLabs)") tts_btn = gr.Button("Speak (TTS)") img_upload = gr.File(label="Upload image (for description)") img_btn = gr.Button("Describe image") with gr.Column(scale=2): gr.Markdown("### Tool Call Log & Explanations") log_style = gr.Radio(choices=["A","B","C","D"], value="B", label="Log style (A:Simple B:Detailed C:Visual D:Both)") tools_log = gr.Textbox(value="Ready.", lines=20, interactive=False, label="Tools Log") tools_panel = gr.HTML("
") gr.Markdown("---") gr.Markdown("**Tool explanations appear here each time a tool runs.**") # Callbacks def on_send_text(text, chat_history, mic_file, style): tools_entries = [] # convert incoming chat_history (messages) into tuples for internal logic tuples = messages_to_tuples(chat_history) if mic_file: # transcribe audio tr = transcribe_audio_tool(mic_file) user_text = tr.content log = format_tool_log("transcribe_audio", "User provided microphone audio", tr.meta or {}, tr.content, style) tools_entries.append(log) else: user_text = text or "" # Append user message to tuples and placeholder assistant tuples.append((user_text, "...")) # demo assistant behavior if user_text and user_text.strip().lower().startswith("describe image:"): # expects: "describe image: filename" _, _, fname = user_text.partition(":") fname = fname.strip() if fname: # We assume the image was uploaded earlier and path provided res = describe_image_tool(fname) assistant = res.content log = format_tool_log("describe_image", "User requested image description", res.meta or {}, res.content, style) tools_entries.append(log) else: assistant = "Please upload an image using the Describe Image tool or provide a path like: describe image: /path/to/image.jpg" else: assistant = "I heard: " + (user_text or "(empty)") # replace placeholder assistant tuples[-1] = (tuples[-1][0], assistant) # update tools panel content panel_html = '' if isinstance(log, dict): # D style returns dict panel_html += f"{log['detailed']}"
panel_html += f"{log['simple']}"
else:
for e in tools_entries:
panel_html += f"{e}"
# convert back to messages for gr.Chatbot
new_messages = tuples_to_messages(tuples)
return new_messages, gr.update(value="\n".join(tools_entries) or "Ready."), gr.update(value=panel_html)
send_btn.click(on_send_text, inputs=[user_input, chatbox, mic, log_style], outputs=[chatbox, tools_log, tools_panel])
def on_tts(text, style):
if not text:
return None, gr.update(value="No text provided")
res = speak_text_tool(text)
if res.meta and res.meta.get("format") == "base64-audio":
audio_bytes = decode_base64_audio(res.content)
log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, "