VisionPro / app.py
subhash4face's picture
fixed version
31ba90a verified
import os
import io
import json
import asyncio
import base64
import time
from typing import Optional
import gradio as gr
from pydantic import BaseModel
# Optional: use openai if available for transcription and image captioning
try:
import openai
OPENAI_AVAILABLE = True
except Exception:
OPENAI_AVAILABLE = False
# -----------------------------
# Configuration
# -----------------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
HUGGINGFACE_API_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
GOOGLE_GEMINI_API_KEY = os.environ.get("GOOGLE_GEMINI_API_KEY")
if OPENAI_API_KEY and OPENAI_AVAILABLE:
openai.api_key = OPENAI_API_KEY
# ElevenLabs defaults
ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL") # placeholder
ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech"
# Hugging Face Inference API endpoint (for image captioning fallback)
HF_INFERENCE_URL = "https://huggingface.co/proxy/api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base"
# -----------------------------
# Minimal MCP Server shim
# -----------------------------
class ToolResult(BaseModel):
content: str
meta: Optional[dict] = None
class MCPServer:
def __init__(self, name: str, version: str = "0.1.0"):
self.name = name
self.version = version
self.tools = {}
def tool(self, name: str, description: str = ""):
def decorator(fn):
self.tools[name] = {
"fn": fn,
"description": description,
}
return fn
return decorator
async def run_tool(self, name: str, *args, **kwargs):
tool = self.tools.get(name)
if not tool:
raise ValueError(f"Tool {name} not found")
fn = tool["fn"]
if asyncio.iscoroutinefunction(fn):
res = await fn(*args, **kwargs)
else:
res = fn(*args, **kwargs)
if isinstance(res, ToolResult):
return res
return ToolResult(content=str(res))
server = MCPServer("accessibility_voice_mcp")
# -----------------------------
# Utilities: STT, TTS, Image describe
# -----------------------------
def transcribe_with_openai(audio_file_path: str) -> str:
"""Transcribe audio using OpenAI Whisper (if available)."""
if not OPENAI_AVAILABLE:
return "OpenAI library not available"
try:
with open(audio_file_path, "rb") as f:
transcript = openai.Audio.transcriptions.create(model="whisper-1", file=f)
if isinstance(transcript, dict):
return transcript.get("text", "")
return getattr(transcript, "text", "")
except Exception as e:
return f"OpenAI transcription error: {e}"
def transcribe_fallback(audio_file_path: str) -> str:
"""Fallback: invoke whisper from local package (if installed)."""
try:
import whisper
model = whisper.load_model("small")
res = model.transcribe(audio_file_path)
return res.get("text", "")
except Exception as e:
return f"Local transcription fallback failed: {e}"
def tts_elevenlabs(text: str) -> bytes:
"""Call ElevenLabs API to synthesize speech. Returns raw audio bytes."""
if not ELEVENLABS_API_KEY:
raise RuntimeError("ELEVENLABS_API_KEY not set in environment")
import requests
url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
"Content-Type": "application/json",
}
payload = {
"text": text,
"voice_settings": {"stability": 0.5, "similarity_boost": 0.75}
}
resp = requests.post(url, headers=headers, json=payload, stream=True)
if resp.status_code != 200:
raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}")
return resp.content
def describe_image_hf(image_path: str) -> str:
"""Describe an image using Hugging Face Inference API (BLIP model hosted)."""
try:
import requests
if not HUGGINGFACE_API_TOKEN:
return "HUGGINGFACE_API_TOKEN not set"
with open(image_path, "rb") as f:
image_bytes = f.read()
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}"
}
# The HF Inference API accepts files as binary
resp = requests.post(HF_INFERENCE_URL, headers=headers, data=image_bytes)
if resp.status_code != 200:
return f"HF Inference error: {resp.status_code} {resp.text}"
# Model returns JSON with 'generated_text' or a simple string depending on model
try:
j = resp.json()
# Some endpoints return [{'generated_text': '...'}]
if isinstance(j, list) and j and 'generated_text' in j[0]:
return j[0]['generated_text']
if isinstance(j, dict) and 'generated_text' in j:
return j['generated_text']
# Otherwise return text
return str(j)
except Exception:
return resp.text
except Exception as e:
return f"HF describe error: {e}"
def describe_image_openai(image_path: str) -> str:
"""Describe an image using OpenAI Vision (modern SDK compatible)."""
if not OPENAI_AVAILABLE:
return "OpenAI not available for image captioning"
try:
# Read image bytes
with open(image_path, "rb") as f:
image_bytes = f.read()
# Convert to base64 for safe transport in older SDKs
b64_image = base64.b64encode(image_bytes).decode("utf-8")
# Modern prompt content
prompt = (
"You are an accessibility assistant that describes images for visually impaired users. "
"Provide a clear, helpful, vivid, human-friendly description of the image.\n"
)
# Some OpenAI SDK versions require: client = openai.OpenAI()
try:
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You describe images for visually impaired users."},
{"role": "user", "content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": f"data:image/jpeg;base64,{b64_image}"
}
]}
],
max_tokens=300,
)
return response.choices[0].message.content.strip()
except Exception:
# Fallback for legacy SDKs
legacy_prompt = (
"You are an assistant that describes images for visually impaired users.\n"
"Provide a concise, vivid, accessible description.\n"
"Image(base64): " + b64_image
)
resp = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": legacy_prompt}],
max_tokens=300,
)
return resp.choices[0].message.content.strip()
except Exception as e:
return f"OpenAI image describe error: {e}"
# -----------------------------
# MCP Tools
# -----------------------------
@server.tool(name="speak_text", description="Convert text to speech using ElevenLabs")
def speak_text_tool(text: str) -> ToolResult:
try:
audio_bytes = tts_elevenlabs(text)
encoded = base64.b64encode(audio_bytes).decode("utf-8")
return ToolResult(content=encoded, meta={"format": "base64-audio"})
except Exception as e:
return ToolResult(content=f"TTS Error: {e}")
@server.tool(name="describe_image", description="Describe an uploaded image for visually impaired users")
def describe_image_tool(image_path: str) -> ToolResult:
# Priority: OpenAI -> Gemini -> Hugging Face Inference -> error
if OPENAI_AVAILABLE:
desc = describe_image_openai(image_path)
if desc and not desc.startswith("OpenAI image describe error"):
return ToolResult(content=desc, meta={"backend":"openai"})
# Gemini (if configured)
if GOOGLE_GEMINI_API_KEY:
try:
import google.generativeai as genai
genai.configure(api_key=GOOGLE_GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-1.5-flash")
with open(image_path, "rb") as f:
image_bytes = f.read()
response = model.generate_content(["Describe this image for a visually impaired user.", {"mime_type":"image/jpeg", "data": image_bytes}])
return ToolResult(content=response.text, meta={"backend":"gemini"})
except Exception:
pass
# Hugging Face Inference
desc = describe_image_hf(image_path)
if desc:
return ToolResult(content=desc, meta={"backend":"huggingface"})
return ToolResult(content="No image captioning backend available. Set OPENAI_API_KEY, GOOGLE_GEMINI_API_KEY, or HUGGINGFACE_API_TOKEN.")
@server.tool(name="transcribe_audio", description="Transcribe user audio to text")
def transcribe_audio_tool(audio_path: str) -> ToolResult:
start = time.time()
if OPENAI_AVAILABLE:
text = transcribe_with_openai(audio_path)
duration = time.time() - start
return ToolResult(content=text, meta={"backend":"openai","duration":duration})
else:
text = transcribe_fallback(audio_path)
duration = time.time() - start
return ToolResult(content=text, meta={"backend":"local_whisper","duration":duration})
# -----------------------------
# Gradio UI (client)
# -----------------------------
def decode_base64_audio(b64: str) -> bytes:
return base64.b64decode(b64)
app_theme = {
"primary_hue": "blue",
"secondary_hue": "slate",
}
# Helper to format tool-call explanations
def format_tool_log(tool_name, reason, meta, output, style="A"):
backend = meta.get("backend") if meta else "unknown"
duration = meta.get("duration") if meta else None
# ---------------------------
# Style A: Simple
# ---------------------------
if style == "A":
return f"[{tool_name}] {backend} -> {str(output)[:200]}"
# ---------------------------
# Style B: Detailed Human-Readable
# ---------------------------
if style == "B":
lines = [
f"🔧 Tool: {tool_name}",
f"🎯 Why: {reason}",
f"⚙️ Backend: {backend}",
]
if duration is not None:
try:
lines.append(f"⏱ Duration: {float(duration):.2f}s")
except:
lines.append(f"⏱ Duration: {duration}")
lines.append(f"📝 Output: {str(output)}")
return "\n".join(lines)
# ---------------------------
# Style C: Ultra-visual
# ---------------------------
if style == "C":
parts = [
f"🔧 {tool_name}",
f"• Reason: {reason}",
f"• Backend: {backend}",
]
if duration is not None:
try:
parts.append(f"• {float(duration):.2f}s")
except:
parts.append(f"• {duration}")
visual = " ".join(parts) + "\n" + f"→ {str(output)}"
return visual
# ---------------------------
# Style D: Both Simple + Detailed
# ---------------------------
return {
"simple": f"[{tool_name}] {backend} -> {str(output)[:200]}",
"detailed": format_tool_log(tool_name, reason, meta, output, style="B"),
}
# Conversion helpers for chat history between 'messages' (gradio new) and tuple list used in logic
def messages_to_tuples(messages):
# messages is a list of dicts {"role": "user"/"assistant", "content": "..."}
tuples = []
if not messages:
return tuples
for m in messages:
if isinstance(m, dict):
role = m.get("role", "user")
content = m.get("content", "")
tuples.append((content, "")) if role == "user" else tuples.append(("", content))
elif isinstance(m, (list, tuple)) and len(m) == 2:
tuples.append((m[0], m[1]))
else:
# fallback: treat as assistant reply
tuples.append(("", str(m)))
return tuples
def tuples_to_messages(tuples):
messages = []
for user_text, assistant_text in tuples:
if user_text:
messages.append({"role":"user","content":user_text})
if assistant_text:
messages.append({"role":"assistant","content":assistant_text})
return messages
with gr.Blocks(css=".gradio-container {background:#f7fafc}") as demo:
gr.Markdown("# Accessibility Voice Agent — MCP Tools")
with gr.Row():
with gr.Column(scale=3):
# Set type='messages' to avoid the deprecation warning, and convert inside handlers.
chatbox = gr.Chatbot(label="Assistant", elem_id="chatbox", type="messages")
user_input = gr.Textbox(placeholder="Type or press the microphone to speak...", show_label=False)
with gr.Row():
# Some gradio versions don't accept 'source' kw; remove it to be broadly compatible.
mic = gr.Audio(type="filepath", label="Record voice (press to record)")
send_btn = gr.Button("Send")
with gr.Accordion("Advanced / Tools", open=False):
tts_text = gr.Textbox(label="Text to speak (ElevenLabs)")
tts_btn = gr.Button("Speak (TTS)")
img_upload = gr.File(label="Upload image (for description)")
img_btn = gr.Button("Describe image")
with gr.Column(scale=2):
gr.Markdown("### Tool Call Log & Explanations")
log_style = gr.Radio(choices=["A","B","C","D"], value="B", label="Log style (A:Simple B:Detailed C:Visual D:Both)")
tools_log = gr.Textbox(value="Ready.", lines=20, interactive=False, label="Tools Log")
tools_panel = gr.HTML("<div id='tools_panel' style='max-height:400px;overflow:auto;background:#ffffff;padding:8px;border-radius:8px;'></div>")
gr.Markdown("---")
gr.Markdown("**Tool explanations appear here each time a tool runs.**")
# Callbacks
def on_send_text(text, chat_history, mic_file, style):
tools_entries = []
# convert incoming chat_history (messages) into tuples for internal logic
tuples = messages_to_tuples(chat_history)
if mic_file:
# transcribe audio
tr = transcribe_audio_tool(mic_file)
user_text = tr.content
log = format_tool_log("transcribe_audio", "User provided microphone audio", tr.meta or {}, tr.content, style)
tools_entries.append(log)
else:
user_text = text or ""
# Append user message to tuples and placeholder assistant
tuples.append((user_text, "..."))
# demo assistant behavior
if user_text and user_text.strip().lower().startswith("describe image:"):
# expects: "describe image: filename"
_, _, fname = user_text.partition(":")
fname = fname.strip()
if fname:
# We assume the image was uploaded earlier and path provided
res = describe_image_tool(fname)
assistant = res.content
log = format_tool_log("describe_image", "User requested image description", res.meta or {}, res.content, style)
tools_entries.append(log)
else:
assistant = "Please upload an image using the Describe Image tool or provide a path like: describe image: /path/to/image.jpg"
else:
assistant = "I heard: " + (user_text or "(empty)")
# replace placeholder assistant
tuples[-1] = (tuples[-1][0], assistant)
# update tools panel content
panel_html = ''
if isinstance(log, dict):
# D style returns dict
panel_html += f"<pre>{log['detailed']}</pre>"
panel_html += f"<hr><pre>{log['simple']}</pre>"
else:
for e in tools_entries:
panel_html += f"<pre style='background:#f1f5f9;border-radius:6px;padding:8px;margin-bottom:8px;'>{e}</pre>"
# convert back to messages for gr.Chatbot
new_messages = tuples_to_messages(tuples)
return new_messages, gr.update(value="\n".join(tools_entries) or "Ready."), gr.update(value=panel_html)
send_btn.click(on_send_text, inputs=[user_input, chatbox, mic, log_style], outputs=[chatbox, tools_log, tools_panel])
def on_tts(text, style):
if not text:
return None, gr.update(value="No text provided")
res = speak_text_tool(text)
if res.meta and res.meta.get("format") == "base64-audio":
audio_bytes = decode_base64_audio(res.content)
log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, "<audio bytes>", style)
panel_html = f"<pre style='background:#eef2ff;padding:8px;border-radius:6px;'>{log}</pre>"
return (audio_bytes, 16000), gr.update(value=panel_html)
else:
log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, res.content, style)
panel_html = f"<pre style='background:#fee2e2;padding:8px;border-radius:6px;'>{log}</pre>"
return None, gr.update(value=panel_html)
tts_btn.click(on_tts, inputs=[tts_text, log_style], outputs=[gr.Audio(label="TTS Output"), tools_panel])
def on_describe_image(file_obj, style):
if not file_obj:
return [], gr.update(value="No file uploaded")
# file_obj may be an UploadFile-like object; get path or save to tmp file
path = getattr(file_obj, 'name', None)
# If it's a temporary file dict (from gr.File), it might be a dict with 'name' and 'tmp_path'
if isinstance(file_obj, dict) and 'tmp_path' in file_obj:
path = file_obj['tmp_path']
if not path:
# try to save bytes
try:
contents = file_obj.read()
tmp_path = "/tmp/gr_uploaded_image.jpg"
with open(tmp_path, "wb") as f:
f.write(contents)
path = tmp_path
except Exception as e:
return [], gr.update(value=f"Failed to read uploaded file: {e}")
res = describe_image_tool(path)
log = format_tool_log("describe_image", "User uploaded an image for description", res.meta or {}, res.content, style)
panel_html = f"<pre style='background:#ecfdf5;padding:8px;border-radius:6px;'>{log}</pre>"
# Return as messages for chatbox
messages = [{"role":"user","content":"<image uploaded>"}, {"role":"assistant","content":res.content}]
return messages, gr.update(value=panel_html)
img_btn.click(on_describe_image, inputs=[img_upload, log_style], outputs=[chatbox, tools_panel])
# API Keys accordion (session-only)
with gr.Accordion("🔑 API Keys (stored only in session)", open=False):
openai_key = gr.Textbox(label="OpenAI API Key", type="password")
eleven_key = gr.Textbox(label="ElevenLabs API Key", type="password")
hf_key = gr.Textbox(label="Hugging Face API Token", type="password")
def set_keys(ok, ek, hk):
if ok:
os.environ["OPENAI_API_KEY"] = ok
if ek:
os.environ["ELEVENLABS_API_KEY"] = ek
if hk:
os.environ["HUGGINGFACE_API_TOKEN"] = hk
return "API keys set for this session. Refresh the page to pick them up in all runtimes."
set_btn = gr.Button("Save API Keys")
set_output = gr.Textbox(label="Status")
set_btn.click(set_keys, [openai_key, eleven_key, hf_key], [set_output])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))