Spaces:
Running
Running
fixed version
Browse files
app.py
CHANGED
|
@@ -28,8 +28,15 @@ GOOGLE_GEMINI_API_KEY = os.environ.get("GOOGLE_GEMINI_API_KEY")
|
|
| 28 |
if OPENAI_API_KEY and OPENAI_AVAILABLE:
|
| 29 |
openai.api_key = OPENAI_API_KEY
|
| 30 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 31 |
# -----------------------------
|
| 32 |
-
# Minimal MCP Server shim
|
| 33 |
# -----------------------------
|
| 34 |
class ToolResult(BaseModel):
|
| 35 |
content: str
|
|
@@ -66,10 +73,11 @@ class MCPServer:
|
|
| 66 |
server = MCPServer("accessibility_voice_mcp")
|
| 67 |
|
| 68 |
# -----------------------------
|
| 69 |
-
# Utilities: STT, TTS, Image describe
|
| 70 |
# -----------------------------
|
| 71 |
|
| 72 |
def transcribe_with_openai(audio_file_path: str) -> str:
|
|
|
|
| 73 |
if not OPENAI_AVAILABLE:
|
| 74 |
return "OpenAI library not available"
|
| 75 |
try:
|
|
@@ -81,7 +89,9 @@ def transcribe_with_openai(audio_file_path: str) -> str:
|
|
| 81 |
except Exception as e:
|
| 82 |
return f"OpenAI transcription error: {e}"
|
| 83 |
|
|
|
|
| 84 |
def transcribe_fallback(audio_file_path: str) -> str:
|
|
|
|
| 85 |
try:
|
| 86 |
import whisper
|
| 87 |
model = whisper.load_model("small")
|
|
@@ -90,59 +100,152 @@ def transcribe_fallback(audio_file_path: str) -> str:
|
|
| 90 |
except Exception as e:
|
| 91 |
return f"Local transcription fallback failed: {e}"
|
| 92 |
|
|
|
|
| 93 |
def tts_elevenlabs(text: str) -> bytes:
|
|
|
|
| 94 |
if not ELEVENLABS_API_KEY:
|
| 95 |
raise RuntimeError("ELEVENLABS_API_KEY not set in environment")
|
| 96 |
import requests
|
| 97 |
-
ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL")
|
| 98 |
-
ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech"
|
| 99 |
url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}"
|
| 100 |
-
headers = {
|
| 101 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 102 |
resp = requests.post(url, headers=headers, json=payload, stream=True)
|
| 103 |
if resp.status_code != 200:
|
| 104 |
raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}")
|
| 105 |
return resp.content
|
| 106 |
|
|
|
|
| 107 |
def describe_image_hf(image_path: str) -> str:
|
|
|
|
| 108 |
try:
|
| 109 |
import requests
|
| 110 |
-
|
| 111 |
-
HF_INFERENCE_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
|
| 112 |
-
if not HF_INFERENCE_TOKEN:
|
| 113 |
return "HUGGINGFACE_API_TOKEN not set"
|
| 114 |
with open(image_path, "rb") as f:
|
| 115 |
image_bytes = f.read()
|
| 116 |
-
headers = {
|
|
|
|
|
|
|
|
|
|
| 117 |
resp = requests.post(HF_INFERENCE_URL, headers=headers, data=image_bytes)
|
| 118 |
if resp.status_code != 200:
|
| 119 |
return f"HF Inference error: {resp.status_code} {resp.text}"
|
| 120 |
-
|
| 121 |
-
|
| 122 |
-
|
| 123 |
-
|
| 124 |
-
|
| 125 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 126 |
except Exception as e:
|
| 127 |
return f"HF describe error: {e}"
|
| 128 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 129 |
# -----------------------------
|
| 130 |
-
# MCP Tools
|
| 131 |
# -----------------------------
|
| 132 |
@server.tool(name="speak_text", description="Convert text to speech using ElevenLabs")
|
| 133 |
def speak_text_tool(text: str) -> ToolResult:
|
| 134 |
try:
|
| 135 |
audio_bytes = tts_elevenlabs(text)
|
| 136 |
encoded = base64.b64encode(audio_bytes).decode("utf-8")
|
| 137 |
-
return ToolResult(content=encoded, meta={"format": "base64-audio"
|
| 138 |
except Exception as e:
|
| 139 |
-
return ToolResult(content=f"TTS Error: {e}"
|
|
|
|
| 140 |
|
| 141 |
@server.tool(name="describe_image", description="Describe an uploaded image for visually impaired users")
|
| 142 |
def describe_image_tool(image_path: str) -> ToolResult:
|
| 143 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 144 |
desc = describe_image_hf(image_path)
|
| 145 |
-
|
|
|
|
|
|
|
|
|
|
| 146 |
|
| 147 |
@server.tool(name="transcribe_audio", description="Transcribe user audio to text")
|
| 148 |
def transcribe_audio_tool(audio_path: str) -> ToolResult:
|
|
@@ -157,29 +260,75 @@ def transcribe_audio_tool(audio_path: str) -> ToolResult:
|
|
| 157 |
return ToolResult(content=text, meta={"backend":"local_whisper","duration":duration})
|
| 158 |
|
| 159 |
# -----------------------------
|
| 160 |
-
# UI
|
| 161 |
# -----------------------------
|
| 162 |
|
| 163 |
def decode_base64_audio(b64: str) -> bytes:
|
| 164 |
return base64.b64decode(b64)
|
| 165 |
|
| 166 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 167 |
backend = meta.get("backend") if meta else "unknown"
|
| 168 |
duration = meta.get("duration") if meta else None
|
| 169 |
-
lines = [
|
| 170 |
-
f"🔧 Tool: {tool_name}",
|
| 171 |
-
f"🎯 Reason: {reason}",
|
| 172 |
-
f"⚙️ Backend: {backend}",
|
| 173 |
-
]
|
| 174 |
-
if duration is not None:
|
| 175 |
-
try:
|
| 176 |
-
lines.append(f"⏱ Duration: {float(duration):.2f}s")
|
| 177 |
-
except:
|
| 178 |
-
lines.append(f"⏱ Duration: {duration}")
|
| 179 |
-
lines.append("📝 Output: " + (str(output)[:1000] if output else ""))
|
| 180 |
-
return "\n".join(lines)
|
| 181 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 182 |
def messages_to_tuples(messages):
|
|
|
|
| 183 |
tuples = []
|
| 184 |
if not messages:
|
| 185 |
return tuples
|
|
@@ -191,6 +340,7 @@ def messages_to_tuples(messages):
|
|
| 191 |
elif isinstance(m, (list, tuple)) and len(m) == 2:
|
| 192 |
tuples.append((m[0], m[1]))
|
| 193 |
else:
|
|
|
|
| 194 |
tuples.append(("", str(m)))
|
| 195 |
return tuples
|
| 196 |
|
|
@@ -203,106 +353,113 @@ def tuples_to_messages(tuples):
|
|
| 203 |
messages.append({"role":"assistant","content":assistant_text})
|
| 204 |
return messages
|
| 205 |
|
| 206 |
-
|
| 207 |
-
.gradio-container { background: #f7fafc; font-family: Inter, Roboto, Arial; }
|
| 208 |
-
.tool-panel { background: linear-gradient(180deg,#ffffff,#f8fafc); padding:12px; border-radius:10px; box-shadow: 0 6px 18px rgba(15,23,42,0.06); }
|
| 209 |
-
.tool-badge { display:inline-block; padding:6px 10px; border-radius:999px; font-weight:600; margin-right:8px; background:#eff6ff; color:#0369a1; }
|
| 210 |
-
.tool-name { font-weight:700; margin-bottom:6px; display:block; }
|
| 211 |
-
.log-good { background:#ecfdf5; padding:8px; border-radius:8px; }
|
| 212 |
-
.log-warn { background:#fff7ed; padding:8px; border-radius:8px; }
|
| 213 |
-
.chat-wrap { border-radius:12px; padding:8px; background:#ffffff; box-shadow: 0 4px 10px rgba(2,6,23,0.04); }
|
| 214 |
-
"""
|
| 215 |
-
|
| 216 |
-
with gr.Blocks(css=custom_css, title="Accessibility Voice Agent (Improved UX)") as demo:
|
| 217 |
gr.Markdown("# Accessibility Voice Agent — MCP Tools")
|
|
|
|
| 218 |
with gr.Row():
|
| 219 |
with gr.Column(scale=3):
|
|
|
|
| 220 |
chatbox = gr.Chatbot(label="Assistant", elem_id="chatbox", type="messages")
|
| 221 |
-
user_input = gr.Textbox(placeholder="Type
|
| 222 |
|
| 223 |
with gr.Row():
|
| 224 |
-
#
|
| 225 |
-
mic = gr.
|
| 226 |
send_btn = gr.Button("Send")
|
| 227 |
|
| 228 |
-
|
| 229 |
-
|
| 230 |
-
gr.
|
| 231 |
-
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
tts_text = gr.Textbox(label="Text to speak", placeholder="Enter a sentence to synthesize", lines=2)
|
| 235 |
-
tts_btn = gr.Button("Speak (TTS)")
|
| 236 |
-
with gr.Column(scale=6):
|
| 237 |
-
gr.Markdown("<span class='tool-badge'>IMG</span><span class='tool-name'>Describe Image</span>", elem_id="img_label")
|
| 238 |
-
img_upload = gr.File(label="Upload image (for description)")
|
| 239 |
-
img_btn = gr.Button("Describe Image")
|
| 240 |
|
| 241 |
with gr.Column(scale=2):
|
| 242 |
gr.Markdown("### Tool Call Log & Explanations")
|
| 243 |
-
|
| 244 |
-
|
|
|
|
| 245 |
gr.Markdown("---")
|
| 246 |
-
gr.Markdown("**
|
| 247 |
|
| 248 |
# Callbacks
|
| 249 |
-
def on_send_text(text, chat_history, mic_file):
|
| 250 |
tools_entries = []
|
|
|
|
| 251 |
tuples = messages_to_tuples(chat_history)
|
| 252 |
-
user_text = ""
|
| 253 |
-
|
| 254 |
-
# If mic recorded, prefer that
|
| 255 |
if mic_file:
|
|
|
|
| 256 |
tr = transcribe_audio_tool(mic_file)
|
| 257 |
user_text = tr.content
|
| 258 |
-
|
|
|
|
| 259 |
else:
|
| 260 |
user_text = text or ""
|
| 261 |
|
| 262 |
-
# Append to
|
| 263 |
-
tuples.append((user_text, "..."
|
|
|
|
|
|
|
| 264 |
if user_text and user_text.strip().lower().startswith("describe image:"):
|
|
|
|
| 265 |
_, _, fname = user_text.partition(":")
|
| 266 |
fname = fname.strip()
|
| 267 |
if fname:
|
|
|
|
| 268 |
res = describe_image_tool(fname)
|
| 269 |
assistant = res.content
|
| 270 |
-
|
|
|
|
| 271 |
else:
|
| 272 |
assistant = "Please upload an image using the Describe Image tool or provide a path like: describe image: /path/to/image.jpg"
|
| 273 |
else:
|
| 274 |
assistant = "I heard: " + (user_text or "(empty)")
|
| 275 |
|
|
|
|
| 276 |
tuples[-1] = (tuples[-1][0], assistant)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 277 |
new_messages = tuples_to_messages(tuples)
|
| 278 |
-
|
| 279 |
-
for e in tools_entries:
|
| 280 |
-
panel_html += f"<div class='log-good' style='margin-bottom:8px;'><pre>{e}</pre></div>"
|
| 281 |
-
return new_messages, gr.update(value="\\n\\n".join(tools_entries) or "Ready."), gr.update(value=panel_html)
|
| 282 |
|
| 283 |
-
send_btn.click(on_send_text, inputs=[user_input, chatbox, mic], outputs=[chatbox, tools_log, tools_panel])
|
| 284 |
|
| 285 |
-
def on_tts(text):
|
| 286 |
if not text:
|
| 287 |
-
return None, gr.update(value="
|
| 288 |
res = speak_text_tool(text)
|
| 289 |
if res.meta and res.meta.get("format") == "base64-audio":
|
| 290 |
audio_bytes = decode_base64_audio(res.content)
|
| 291 |
-
|
|
|
|
| 292 |
return (audio_bytes, 16000), gr.update(value=panel_html)
|
| 293 |
else:
|
| 294 |
-
|
|
|
|
| 295 |
return None, gr.update(value=panel_html)
|
| 296 |
|
| 297 |
-
tts_btn.click(on_tts, inputs=[tts_text], outputs=[gr.Audio(label="TTS Output"), tools_panel])
|
| 298 |
|
| 299 |
-
def on_describe_image(file_obj):
|
| 300 |
if not file_obj:
|
| 301 |
-
return [], gr.update(value="
|
|
|
|
| 302 |
path = getattr(file_obj, 'name', None)
|
|
|
|
| 303 |
if isinstance(file_obj, dict) and 'tmp_path' in file_obj:
|
| 304 |
path = file_obj['tmp_path']
|
| 305 |
if not path:
|
|
|
|
| 306 |
try:
|
| 307 |
contents = file_obj.read()
|
| 308 |
tmp_path = "/tmp/gr_uploaded_image.jpg"
|
|
@@ -310,16 +467,19 @@ with gr.Blocks(css=custom_css, title="Accessibility Voice Agent (Improved UX)")
|
|
| 310 |
f.write(contents)
|
| 311 |
path = tmp_path
|
| 312 |
except Exception as e:
|
| 313 |
-
return [], gr.update(value=f"
|
| 314 |
|
| 315 |
res = describe_image_tool(path)
|
| 316 |
-
|
|
|
|
|
|
|
|
|
|
| 317 |
messages = [{"role":"user","content":"<image uploaded>"}, {"role":"assistant","content":res.content}]
|
| 318 |
return messages, gr.update(value=panel_html)
|
| 319 |
|
| 320 |
-
img_btn.click(on_describe_image, inputs=[img_upload], outputs=[chatbox, tools_panel])
|
| 321 |
|
| 322 |
-
#
|
| 323 |
with gr.Accordion("🔑 API Keys (stored only in session)", open=False):
|
| 324 |
openai_key = gr.Textbox(label="OpenAI API Key", type="password")
|
| 325 |
eleven_key = gr.Textbox(label="ElevenLabs API Key", type="password")
|
|
|
|
| 28 |
if OPENAI_API_KEY and OPENAI_AVAILABLE:
|
| 29 |
openai.api_key = OPENAI_API_KEY
|
| 30 |
|
| 31 |
+
# ElevenLabs defaults
|
| 32 |
+
ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL") # placeholder
|
| 33 |
+
ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech"
|
| 34 |
+
|
| 35 |
+
# Hugging Face Inference API endpoint (for image captioning fallback)
|
| 36 |
+
HF_INFERENCE_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base"
|
| 37 |
+
|
| 38 |
# -----------------------------
|
| 39 |
+
# Minimal MCP Server shim
|
| 40 |
# -----------------------------
|
| 41 |
class ToolResult(BaseModel):
|
| 42 |
content: str
|
|
|
|
| 73 |
server = MCPServer("accessibility_voice_mcp")
|
| 74 |
|
| 75 |
# -----------------------------
|
| 76 |
+
# Utilities: STT, TTS, Image describe
|
| 77 |
# -----------------------------
|
| 78 |
|
| 79 |
def transcribe_with_openai(audio_file_path: str) -> str:
|
| 80 |
+
"""Transcribe audio using OpenAI Whisper (if available)."""
|
| 81 |
if not OPENAI_AVAILABLE:
|
| 82 |
return "OpenAI library not available"
|
| 83 |
try:
|
|
|
|
| 89 |
except Exception as e:
|
| 90 |
return f"OpenAI transcription error: {e}"
|
| 91 |
|
| 92 |
+
|
| 93 |
def transcribe_fallback(audio_file_path: str) -> str:
|
| 94 |
+
"""Fallback: invoke whisper from local package (if installed)."""
|
| 95 |
try:
|
| 96 |
import whisper
|
| 97 |
model = whisper.load_model("small")
|
|
|
|
| 100 |
except Exception as e:
|
| 101 |
return f"Local transcription fallback failed: {e}"
|
| 102 |
|
| 103 |
+
|
| 104 |
def tts_elevenlabs(text: str) -> bytes:
|
| 105 |
+
"""Call ElevenLabs API to synthesize speech. Returns raw audio bytes."""
|
| 106 |
if not ELEVENLABS_API_KEY:
|
| 107 |
raise RuntimeError("ELEVENLABS_API_KEY not set in environment")
|
| 108 |
import requests
|
|
|
|
|
|
|
| 109 |
url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}"
|
| 110 |
+
headers = {
|
| 111 |
+
"xi-api-key": ELEVENLABS_API_KEY,
|
| 112 |
+
"Content-Type": "application/json",
|
| 113 |
+
}
|
| 114 |
+
payload = {
|
| 115 |
+
"text": text,
|
| 116 |
+
"voice_settings": {"stability": 0.5, "similarity_boost": 0.75}
|
| 117 |
+
}
|
| 118 |
resp = requests.post(url, headers=headers, json=payload, stream=True)
|
| 119 |
if resp.status_code != 200:
|
| 120 |
raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}")
|
| 121 |
return resp.content
|
| 122 |
|
| 123 |
+
|
| 124 |
def describe_image_hf(image_path: str) -> str:
|
| 125 |
+
"""Describe an image using Hugging Face Inference API (BLIP model hosted)."""
|
| 126 |
try:
|
| 127 |
import requests
|
| 128 |
+
if not HUGGINGFACE_API_TOKEN:
|
|
|
|
|
|
|
| 129 |
return "HUGGINGFACE_API_TOKEN not set"
|
| 130 |
with open(image_path, "rb") as f:
|
| 131 |
image_bytes = f.read()
|
| 132 |
+
headers = {
|
| 133 |
+
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}"
|
| 134 |
+
}
|
| 135 |
+
# The HF Inference API accepts files as binary
|
| 136 |
resp = requests.post(HF_INFERENCE_URL, headers=headers, data=image_bytes)
|
| 137 |
if resp.status_code != 200:
|
| 138 |
return f"HF Inference error: {resp.status_code} {resp.text}"
|
| 139 |
+
# Model returns JSON with 'generated_text' or a simple string depending on model
|
| 140 |
+
try:
|
| 141 |
+
j = resp.json()
|
| 142 |
+
# Some endpoints return [{'generated_text': '...'}]
|
| 143 |
+
if isinstance(j, list) and j and 'generated_text' in j[0]:
|
| 144 |
+
return j[0]['generated_text']
|
| 145 |
+
if isinstance(j, dict) and 'generated_text' in j:
|
| 146 |
+
return j['generated_text']
|
| 147 |
+
# Otherwise return text
|
| 148 |
+
return str(j)
|
| 149 |
+
except Exception:
|
| 150 |
+
return resp.text
|
| 151 |
except Exception as e:
|
| 152 |
return f"HF describe error: {e}"
|
| 153 |
|
| 154 |
+
|
| 155 |
+
def describe_image_openai(image_path: str) -> str:
|
| 156 |
+
"""Describe an image using OpenAI Vision (modern SDK compatible)."""
|
| 157 |
+
if not OPENAI_AVAILABLE:
|
| 158 |
+
return "OpenAI not available for image captioning"
|
| 159 |
+
|
| 160 |
+
try:
|
| 161 |
+
# Read image bytes
|
| 162 |
+
with open(image_path, "rb") as f:
|
| 163 |
+
image_bytes = f.read()
|
| 164 |
+
|
| 165 |
+
# Convert to base64 for safe transport in older SDKs
|
| 166 |
+
b64_image = base64.b64encode(image_bytes).decode("utf-8")
|
| 167 |
+
|
| 168 |
+
# Modern prompt content
|
| 169 |
+
prompt = (
|
| 170 |
+
"You are an accessibility assistant that describes images for visually impaired users. "
|
| 171 |
+
"Provide a clear, helpful, vivid, human-friendly description of the image.\n"
|
| 172 |
+
)
|
| 173 |
+
|
| 174 |
+
# Some OpenAI SDK versions require: client = openai.OpenAI()
|
| 175 |
+
try:
|
| 176 |
+
client = openai.OpenAI()
|
| 177 |
+
response = client.chat.completions.create(
|
| 178 |
+
model="gpt-4o-mini",
|
| 179 |
+
messages=[
|
| 180 |
+
{"role": "system", "content": "You describe images for visually impaired users."},
|
| 181 |
+
{"role": "user", "content": [
|
| 182 |
+
{"type": "text", "text": prompt},
|
| 183 |
+
{
|
| 184 |
+
"type": "image_url",
|
| 185 |
+
"image_url": f"data:image/jpeg;base64,{b64_image}"
|
| 186 |
+
}
|
| 187 |
+
]}
|
| 188 |
+
],
|
| 189 |
+
max_tokens=300,
|
| 190 |
+
)
|
| 191 |
+
return response.choices[0].message.content.strip()
|
| 192 |
+
|
| 193 |
+
except Exception:
|
| 194 |
+
# Fallback for legacy SDKs
|
| 195 |
+
legacy_prompt = (
|
| 196 |
+
"You are an assistant that describes images for visually impaired users.\n"
|
| 197 |
+
"Provide a concise, vivid, accessible description.\n"
|
| 198 |
+
"Image(base64): " + b64_image
|
| 199 |
+
)
|
| 200 |
+
resp = openai.ChatCompletion.create(
|
| 201 |
+
model="gpt-4o-mini",
|
| 202 |
+
messages=[{"role": "user", "content": legacy_prompt}],
|
| 203 |
+
max_tokens=300,
|
| 204 |
+
)
|
| 205 |
+
return resp.choices[0].message.content.strip()
|
| 206 |
+
|
| 207 |
+
except Exception as e:
|
| 208 |
+
return f"OpenAI image describe error: {e}"
|
| 209 |
+
|
| 210 |
+
|
| 211 |
# -----------------------------
|
| 212 |
+
# MCP Tools
|
| 213 |
# -----------------------------
|
| 214 |
@server.tool(name="speak_text", description="Convert text to speech using ElevenLabs")
|
| 215 |
def speak_text_tool(text: str) -> ToolResult:
|
| 216 |
try:
|
| 217 |
audio_bytes = tts_elevenlabs(text)
|
| 218 |
encoded = base64.b64encode(audio_bytes).decode("utf-8")
|
| 219 |
+
return ToolResult(content=encoded, meta={"format": "base64-audio"})
|
| 220 |
except Exception as e:
|
| 221 |
+
return ToolResult(content=f"TTS Error: {e}")
|
| 222 |
+
|
| 223 |
|
| 224 |
@server.tool(name="describe_image", description="Describe an uploaded image for visually impaired users")
|
| 225 |
def describe_image_tool(image_path: str) -> ToolResult:
|
| 226 |
+
# Priority: OpenAI -> Gemini -> Hugging Face Inference -> error
|
| 227 |
+
if OPENAI_AVAILABLE:
|
| 228 |
+
desc = describe_image_openai(image_path)
|
| 229 |
+
if desc and not desc.startswith("OpenAI image describe error"):
|
| 230 |
+
return ToolResult(content=desc, meta={"backend":"openai"})
|
| 231 |
+
# Gemini (if configured)
|
| 232 |
+
if GOOGLE_GEMINI_API_KEY:
|
| 233 |
+
try:
|
| 234 |
+
import google.generativeai as genai
|
| 235 |
+
genai.configure(api_key=GOOGLE_GEMINI_API_KEY)
|
| 236 |
+
model = genai.GenerativeModel("gemini-1.5-flash")
|
| 237 |
+
with open(image_path, "rb") as f:
|
| 238 |
+
image_bytes = f.read()
|
| 239 |
+
response = model.generate_content(["Describe this image for a visually impaired user.", {"mime_type":"image/jpeg", "data": image_bytes}])
|
| 240 |
+
return ToolResult(content=response.text, meta={"backend":"gemini"})
|
| 241 |
+
except Exception:
|
| 242 |
+
pass
|
| 243 |
+
# Hugging Face Inference
|
| 244 |
desc = describe_image_hf(image_path)
|
| 245 |
+
if desc:
|
| 246 |
+
return ToolResult(content=desc, meta={"backend":"huggingface"})
|
| 247 |
+
return ToolResult(content="No image captioning backend available. Set OPENAI_API_KEY, GOOGLE_GEMINI_API_KEY, or HUGGINGFACE_API_TOKEN.")
|
| 248 |
+
|
| 249 |
|
| 250 |
@server.tool(name="transcribe_audio", description="Transcribe user audio to text")
|
| 251 |
def transcribe_audio_tool(audio_path: str) -> ToolResult:
|
|
|
|
| 260 |
return ToolResult(content=text, meta={"backend":"local_whisper","duration":duration})
|
| 261 |
|
| 262 |
# -----------------------------
|
| 263 |
+
# Gradio UI (client)
|
| 264 |
# -----------------------------
|
| 265 |
|
| 266 |
def decode_base64_audio(b64: str) -> bytes:
|
| 267 |
return base64.b64decode(b64)
|
| 268 |
|
| 269 |
+
app_theme = {
|
| 270 |
+
"primary_hue": "blue",
|
| 271 |
+
"secondary_hue": "slate",
|
| 272 |
+
}
|
| 273 |
+
|
| 274 |
+
# Helper to format tool-call explanations
|
| 275 |
+
def format_tool_log(tool_name, reason, meta, output, style="A"):
|
| 276 |
backend = meta.get("backend") if meta else "unknown"
|
| 277 |
duration = meta.get("duration") if meta else None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 278 |
|
| 279 |
+
# ---------------------------
|
| 280 |
+
# Style A: Simple
|
| 281 |
+
# ---------------------------
|
| 282 |
+
if style == "A":
|
| 283 |
+
return f"[{tool_name}] {backend} -> {str(output)[:200]}"
|
| 284 |
+
|
| 285 |
+
# ---------------------------
|
| 286 |
+
# Style B: Detailed Human-Readable
|
| 287 |
+
# ---------------------------
|
| 288 |
+
if style == "B":
|
| 289 |
+
lines = [
|
| 290 |
+
f"🔧 Tool: {tool_name}",
|
| 291 |
+
f"🎯 Why: {reason}",
|
| 292 |
+
f"⚙️ Backend: {backend}",
|
| 293 |
+
]
|
| 294 |
+
if duration is not None:
|
| 295 |
+
try:
|
| 296 |
+
lines.append(f"⏱ Duration: {float(duration):.2f}s")
|
| 297 |
+
except:
|
| 298 |
+
lines.append(f"⏱ Duration: {duration}")
|
| 299 |
+
|
| 300 |
+
lines.append(f"📝 Output: {str(output)}")
|
| 301 |
+
return "\n".join(lines)
|
| 302 |
+
|
| 303 |
+
# ---------------------------
|
| 304 |
+
# Style C: Ultra-visual
|
| 305 |
+
# ---------------------------
|
| 306 |
+
if style == "C":
|
| 307 |
+
parts = [
|
| 308 |
+
f"🔧 {tool_name}",
|
| 309 |
+
f"• Reason: {reason}",
|
| 310 |
+
f"• Backend: {backend}",
|
| 311 |
+
]
|
| 312 |
+
if duration is not None:
|
| 313 |
+
try:
|
| 314 |
+
parts.append(f"• {float(duration):.2f}s")
|
| 315 |
+
except:
|
| 316 |
+
parts.append(f"• {duration}")
|
| 317 |
+
|
| 318 |
+
visual = " ".join(parts) + "\n" + f"→ {str(output)}"
|
| 319 |
+
return visual
|
| 320 |
+
|
| 321 |
+
# ---------------------------
|
| 322 |
+
# Style D: Both Simple + Detailed
|
| 323 |
+
# ---------------------------
|
| 324 |
+
return {
|
| 325 |
+
"simple": f"[{tool_name}] {backend} -> {str(output)[:200]}",
|
| 326 |
+
"detailed": format_tool_log(tool_name, reason, meta, output, style="B"),
|
| 327 |
+
}
|
| 328 |
+
|
| 329 |
+
# Conversion helpers for chat history between 'messages' (gradio new) and tuple list used in logic
|
| 330 |
def messages_to_tuples(messages):
|
| 331 |
+
# messages is a list of dicts {"role": "user"/"assistant", "content": "..."}
|
| 332 |
tuples = []
|
| 333 |
if not messages:
|
| 334 |
return tuples
|
|
|
|
| 340 |
elif isinstance(m, (list, tuple)) and len(m) == 2:
|
| 341 |
tuples.append((m[0], m[1]))
|
| 342 |
else:
|
| 343 |
+
# fallback: treat as assistant reply
|
| 344 |
tuples.append(("", str(m)))
|
| 345 |
return tuples
|
| 346 |
|
|
|
|
| 353 |
messages.append({"role":"assistant","content":assistant_text})
|
| 354 |
return messages
|
| 355 |
|
| 356 |
+
with gr.Blocks(css=".gradio-container {background:#f7fafc}") as demo:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 357 |
gr.Markdown("# Accessibility Voice Agent — MCP Tools")
|
| 358 |
+
|
| 359 |
with gr.Row():
|
| 360 |
with gr.Column(scale=3):
|
| 361 |
+
# Set type='messages' to avoid the deprecation warning, and convert inside handlers.
|
| 362 |
chatbox = gr.Chatbot(label="Assistant", elem_id="chatbox", type="messages")
|
| 363 |
+
user_input = gr.Textbox(placeholder="Type or press the microphone to speak...", show_label=False)
|
| 364 |
|
| 365 |
with gr.Row():
|
| 366 |
+
# Some gradio versions don't accept 'source' kw; remove it to be broadly compatible.
|
| 367 |
+
mic = gr.Audio(type="filepath", label="Record voice (press to record)")
|
| 368 |
send_btn = gr.Button("Send")
|
| 369 |
|
| 370 |
+
with gr.Accordion("Advanced / Tools", open=False):
|
| 371 |
+
tts_text = gr.Textbox(label="Text to speak (ElevenLabs)")
|
| 372 |
+
tts_btn = gr.Button("Speak (TTS)")
|
| 373 |
+
|
| 374 |
+
img_upload = gr.File(label="Upload image (for description)")
|
| 375 |
+
img_btn = gr.Button("Describe image")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 376 |
|
| 377 |
with gr.Column(scale=2):
|
| 378 |
gr.Markdown("### Tool Call Log & Explanations")
|
| 379 |
+
log_style = gr.Radio(choices=["A","B","C","D"], value="B", label="Log style (A:Simple B:Detailed C:Visual D:Both)")
|
| 380 |
+
tools_log = gr.Textbox(value="Ready.", lines=20, interactive=False, label="Tools Log")
|
| 381 |
+
tools_panel = gr.HTML("<div id='tools_panel' style='max-height:400px;overflow:auto;background:#ffffff;padding:8px;border-radius:8px;'></div>")
|
| 382 |
gr.Markdown("---")
|
| 383 |
+
gr.Markdown("**Tool explanations appear here each time a tool runs.**")
|
| 384 |
|
| 385 |
# Callbacks
|
| 386 |
+
def on_send_text(text, chat_history, mic_file, style):
|
| 387 |
tools_entries = []
|
| 388 |
+
# convert incoming chat_history (messages) into tuples for internal logic
|
| 389 |
tuples = messages_to_tuples(chat_history)
|
|
|
|
|
|
|
|
|
|
| 390 |
if mic_file:
|
| 391 |
+
# transcribe audio
|
| 392 |
tr = transcribe_audio_tool(mic_file)
|
| 393 |
user_text = tr.content
|
| 394 |
+
log = format_tool_log("transcribe_audio", "User provided microphone audio", tr.meta or {}, tr.content, style)
|
| 395 |
+
tools_entries.append(log)
|
| 396 |
else:
|
| 397 |
user_text = text or ""
|
| 398 |
|
| 399 |
+
# Append user message to tuples and placeholder assistant
|
| 400 |
+
tuples.append((user_text, "..."))
|
| 401 |
+
|
| 402 |
+
# demo assistant behavior
|
| 403 |
if user_text and user_text.strip().lower().startswith("describe image:"):
|
| 404 |
+
# expects: "describe image: filename"
|
| 405 |
_, _, fname = user_text.partition(":")
|
| 406 |
fname = fname.strip()
|
| 407 |
if fname:
|
| 408 |
+
# We assume the image was uploaded earlier and path provided
|
| 409 |
res = describe_image_tool(fname)
|
| 410 |
assistant = res.content
|
| 411 |
+
log = format_tool_log("describe_image", "User requested image description", res.meta or {}, res.content, style)
|
| 412 |
+
tools_entries.append(log)
|
| 413 |
else:
|
| 414 |
assistant = "Please upload an image using the Describe Image tool or provide a path like: describe image: /path/to/image.jpg"
|
| 415 |
else:
|
| 416 |
assistant = "I heard: " + (user_text or "(empty)")
|
| 417 |
|
| 418 |
+
# replace placeholder assistant
|
| 419 |
tuples[-1] = (tuples[-1][0], assistant)
|
| 420 |
+
|
| 421 |
+
# update tools panel content
|
| 422 |
+
panel_html = ''
|
| 423 |
+
if isinstance(log, dict):
|
| 424 |
+
# D style returns dict
|
| 425 |
+
panel_html += f"<pre>{log['detailed']}</pre>"
|
| 426 |
+
panel_html += f"<hr><pre>{log['simple']}</pre>"
|
| 427 |
+
else:
|
| 428 |
+
for e in tools_entries:
|
| 429 |
+
panel_html += f"<pre style='background:#f1f5f9;border-radius:6px;padding:8px;margin-bottom:8px;'>{e}</pre>"
|
| 430 |
+
|
| 431 |
+
# convert back to messages for gr.Chatbot
|
| 432 |
new_messages = tuples_to_messages(tuples)
|
| 433 |
+
return new_messages, gr.update(value="\n".join(tools_entries) or "Ready."), gr.update(value=panel_html)
|
|
|
|
|
|
|
|
|
|
| 434 |
|
| 435 |
+
send_btn.click(on_send_text, inputs=[user_input, chatbox, mic, log_style], outputs=[chatbox, tools_log, tools_panel])
|
| 436 |
|
| 437 |
+
def on_tts(text, style):
|
| 438 |
if not text:
|
| 439 |
+
return None, gr.update(value="No text provided")
|
| 440 |
res = speak_text_tool(text)
|
| 441 |
if res.meta and res.meta.get("format") == "base64-audio":
|
| 442 |
audio_bytes = decode_base64_audio(res.content)
|
| 443 |
+
log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, "<audio bytes>", style)
|
| 444 |
+
panel_html = f"<pre style='background:#eef2ff;padding:8px;border-radius:6px;'>{log}</pre>"
|
| 445 |
return (audio_bytes, 16000), gr.update(value=panel_html)
|
| 446 |
else:
|
| 447 |
+
log = format_tool_log("speak_text", "User requested text-to-speech", res.meta or {}, res.content, style)
|
| 448 |
+
panel_html = f"<pre style='background:#fee2e2;padding:8px;border-radius:6px;'>{log}</pre>"
|
| 449 |
return None, gr.update(value=panel_html)
|
| 450 |
|
| 451 |
+
tts_btn.click(on_tts, inputs=[tts_text, log_style], outputs=[gr.Audio(label="TTS Output"), tools_panel])
|
| 452 |
|
| 453 |
+
def on_describe_image(file_obj, style):
|
| 454 |
if not file_obj:
|
| 455 |
+
return [], gr.update(value="No file uploaded")
|
| 456 |
+
# file_obj may be an UploadFile-like object; get path or save to tmp file
|
| 457 |
path = getattr(file_obj, 'name', None)
|
| 458 |
+
# If it's a temporary file dict (from gr.File), it might be a dict with 'name' and 'tmp_path'
|
| 459 |
if isinstance(file_obj, dict) and 'tmp_path' in file_obj:
|
| 460 |
path = file_obj['tmp_path']
|
| 461 |
if not path:
|
| 462 |
+
# try to save bytes
|
| 463 |
try:
|
| 464 |
contents = file_obj.read()
|
| 465 |
tmp_path = "/tmp/gr_uploaded_image.jpg"
|
|
|
|
| 467 |
f.write(contents)
|
| 468 |
path = tmp_path
|
| 469 |
except Exception as e:
|
| 470 |
+
return [], gr.update(value=f"Failed to read uploaded file: {e}")
|
| 471 |
|
| 472 |
res = describe_image_tool(path)
|
| 473 |
+
log = format_tool_log("describe_image", "User uploaded an image for description", res.meta or {}, res.content, style)
|
| 474 |
+
panel_html = f"<pre style='background:#ecfdf5;padding:8px;border-radius:6px;'>{log}</pre>"
|
| 475 |
+
|
| 476 |
+
# Return as messages for chatbox
|
| 477 |
messages = [{"role":"user","content":"<image uploaded>"}, {"role":"assistant","content":res.content}]
|
| 478 |
return messages, gr.update(value=panel_html)
|
| 479 |
|
| 480 |
+
img_btn.click(on_describe_image, inputs=[img_upload, log_style], outputs=[chatbox, tools_panel])
|
| 481 |
|
| 482 |
+
# API Keys accordion (session-only)
|
| 483 |
with gr.Accordion("🔑 API Keys (stored only in session)", open=False):
|
| 484 |
openai_key = gr.Textbox(label="OpenAI API Key", type="password")
|
| 485 |
eleven_key = gr.Textbox(label="ElevenLabs API Key", type="password")
|