import logging import asyncio from typing import Optional, Dict, Any, List import json try: from elevenlabs.client import ElevenLabs from elevenlabs.conversational_ai.conversation import Conversation, ClientTools from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface ELEVENLABS_AVAILABLE = True except ImportError: ELEVENLABS_AVAILABLE = False logger = logging.getLogger(__name__) logger.warning("ElevenLabs SDK not available. Install: pip install elevenlabs") import config from services.llamaindex_service import LlamaIndexService logger = logging.getLogger(__name__) class ElevenLabsService: """ Enhanced service for ElevenLabs Conversational AI with proper RAG integration. Key improvements: - Proper client tools registration with event loop handling - Built-in RAG through ElevenLabs Knowledge Base - Support for both real-time voice and text-based chat - Session management and conversation history """ def __init__(self, llamaindex_service: LlamaIndexService): """ Initialize ElevenLabs service with RAG integration Args: llamaindex_service: LlamaIndex service for document queries """ self.config = config.config self.llamaindex_service = llamaindex_service self.client = None self.client_tools = None self.active_conversations: Dict[str, Conversation] = {} self.conversation_history: Dict[str, List[Dict]] = {} if not ELEVENLABS_AVAILABLE: logger.error("ElevenLabs SDK not installed. Run: pip install elevenlabs") return if not self.config.ELEVENLABS_API_KEY: logger.warning("ELEVENLABS_API_KEY not configured.") return try: # Initialize ElevenLabs client self.client = ElevenLabs(api_key=self.config.ELEVENLABS_API_KEY) logger.info("ElevenLabs client initialized successfully") # Initialize client tools - CRITICAL: Must be done in async context self._init_client_tools() logger.info("ElevenLabs service initialized") except Exception as e: logger.error(f"Error initializing ElevenLabs service: {str(e)}") def _init_client_tools(self): """Initialize client tools for RAG integration""" try: # FIX: Try initializing without arguments first (Newer SDKs) try: self.client_tools = ClientTools() except TypeError: # Fallback for older SDKs that might require a loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self.client_tools = ClientTools(loop=loop) # Register RAG query tool with proper metadata self.client_tools.register( "query_documents", handler=self._rag_query_handler, description="Search through the user's uploaded documents. Use this tool whenever the user asks questions about their documents, files, or content in their library.", parameters={ "query": { "type": "string", "description": "The search query or question to find information in the documents" } }, is_async=True ) logger.info("Client tools registered: query_documents") except Exception as e: logger.error(f"Error initializing client tools: {str(e)}") # Keep client_tools as None so we know it failed self.client_tools = None async def _rag_query_handler(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Enhanced RAG query handler with better error handling and response formatting This tool is called by the ElevenLabs agent when it needs to search documents. Args: params: Dictionary with 'query' key containing user's question Returns: Dictionary with 'answer' and optional 'sources' """ try: query = params.get("query", "") if not query or not query.strip(): return { "answer": "I didn't receive a question to search for. Could you please ask again?" } logger.info(f"RAG query: {query}") # Query LlamaIndex with timeout try: result = await asyncio.wait_for( self.llamaindex_service.query(query), timeout=self.config.CONVERSATION_TIMEOUT if hasattr(self.config, 'CONVERSATION_TIMEOUT') else 30 ) logger.info(f"RAG query successful: {len(result)} chars") # Format response for conversational voice return { "answer": result, "confidence": "high", "source": "document_library" } except asyncio.TimeoutError: logger.error("RAG query timeout") return { "answer": "The search is taking longer than expected. Could you try rephrasing your question?" } except Exception as e: logger.error(f"RAG query error: {str(e)}", exc_info=True) return { "answer": f"I encountered an error while searching: {str(e)}. Please try again." } def create_conversation( self, agent_id: Optional[str] = None, session_id: Optional[str] = None, use_audio: bool = True ) -> Optional[Conversation]: """ Create a new conversation session Args: agent_id: ElevenLabs agent ID (uses config default if not provided) session_id: Optional session ID for tracking use_audio: If True, use audio interface; if False, text-only mode Returns: Conversation object or None if initialization fails """ if not self.client: logger.error("ElevenLabs client not initialized") return None try: agent_id = agent_id or self.config.ELEVENLABS_AGENT_ID if not agent_id: logger.error("No agent ID provided or configured") return None # Create audio interface only if requested audio_interface = DefaultAudioInterface() if use_audio else None # Create conversation with RAG tool conversation = Conversation( client=self.client, agent_id=agent_id, requires_auth=True, audio_interface=audio_interface, client_tools=self.client_tools, # Add callbacks for monitoring callback_agent_response=lambda response: self._on_agent_response(session_id, response), callback_user_transcript=lambda transcript: self._on_user_message(session_id, transcript) ) # Store conversation and initialize history if session_id: self.active_conversations[session_id] = conversation self.conversation_history[session_id] = [] logger.info(f"Created conversation for agent: {agent_id}") return conversation except Exception as e: logger.error(f"Error creating conversation: {str(e)}") return None def _on_agent_response(self, session_id: Optional[str], response: str): """Track agent responses""" if session_id and session_id in self.conversation_history: self.conversation_history[session_id].append({ "role": "assistant", "content": response }) logger.debug(f"Agent response: {response[:100]}...") def _on_user_message(self, session_id: Optional[str], message: str): """Track user messages""" if session_id and session_id in self.conversation_history: self.conversation_history[session_id].append({ "role": "user", "content": message }) logger.debug(f"User message: {message[:100]}...") async def start_conversation(self, session_id: Optional[str] = None) -> Dict[str, Any]: """ Start a new conversation session Args: session_id: Optional session ID for tracking Returns: Dictionary with success status and conversation info """ try: conversation = self.create_conversation(session_id=session_id, use_audio=False) if conversation: return { "success": True, "session_id": session_id, "message": "Voice assistant ready. Ask me anything about your documents!" } else: return { "success": False, "error": "Failed to create conversation. Check API configuration." } except Exception as e: logger.error(f"Error starting conversation: {str(e)}") return { "success": False, "error": str(e) } async def send_text_message( self, message: str, session_id: str ) -> Dict[str, Any]: """ Send a text message to the agent and get response This is for text-based chat (no audio). Perfect for web interfaces. Args: message: User's text message session_id: Session identifier Returns: Dictionary with agent's response """ try: if not message or not message.strip(): return { "success": False, "error": "Empty message" } # For text-based interaction, we directly query the RAG system # since ElevenLabs Conversational AI is primarily audio-focused # Store user message if session_id in self.conversation_history: self.conversation_history[session_id].append({ "role": "user", "content": message }) # Query RAG system response = await self._rag_query_handler({"query": message}) # Store assistant response if session_id in self.conversation_history: self.conversation_history[session_id].append({ "role": "assistant", "content": response["answer"] }) return { "success": True, "answer": response["answer"], "session_id": session_id } except Exception as e: logger.error(f"Error sending message: {str(e)}") return { "success": False, "error": str(e) } async def end_conversation(self, session_id: str) -> bool: """ End an active conversation session Args: session_id: Session identifier Returns: True if conversation ended successfully """ try: if session_id in self.active_conversations: conversation = self.active_conversations[session_id] # Try to end the session gracefully try: if hasattr(conversation, 'end_session'): conversation.end_session() except Exception as e: logger.warning(f"Error during session cleanup: {str(e)}") # Remove from active conversations del self.active_conversations[session_id] logger.info(f"Ended conversation: {session_id}") return True return False except Exception as e: logger.error(f"Error ending conversation: {str(e)}") return False def get_conversation_history(self, session_id: str) -> List[Dict]: """Get conversation history for a session""" return self.conversation_history.get(session_id, []) def get_available_voices(self) -> List[Dict[str, str]]: """ Get list of available voice models Returns: List of voice model information """ try: if not self.client: return [] voices = self.client.voices.get_all() return [ { "voice_id": voice.voice_id, "name": voice.name, "category": getattr(voice, 'category', "general") } for voice in voices.voices ] except Exception as e: logger.error(f"Error getting voices: {str(e)}") return [] def is_available(self) -> bool: """Check if ElevenLabs service is available and configured""" return ELEVENLABS_AVAILABLE and self.client is not None async def test_connection(self) -> Dict[str, Any]: """ Test ElevenLabs API connection Returns: Dictionary with test results """ try: if not self.client: return { "status": "error", "message": "Client not initialized" } # Test API by fetching voices voices = self.get_available_voices() # Test RAG tool test_result = await self._rag_query_handler({"query": "test"}) return { "status": "success", "message": "ElevenLabs API connected", "voices_available": len(voices), "rag_tool_working": "answer" in test_result, "client_tools_registered": self.client_tools is not None } except Exception as e: logger.error(f"Connection test failed: {str(e)}") return { "status": "error", "message": str(e) }