import logging import os from typing import List, Optional, Any from pathlib import Path import shutil import asyncio from llama_index.core import ( VectorStoreIndex, Document, StorageContext, load_index_from_storage, Settings, SummaryIndex ) from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from llama_index.llms.openai import OpenAI from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.embeddings.openai import OpenAIEmbedding import config from services.document_store_service import DocumentStoreService logger = logging.getLogger(__name__) class LlamaIndexService: def __init__(self, document_store: DocumentStoreService): self.document_store = document_store self.config = config.config self.storage_dir = Path(self.config.DATA_DIR) / "llamaindex_storage" self.index = None self.agent = None self.is_initialized = False self._initialize_settings() # Attempt to load existing index, but don't fail if empty self._try_load_from_storage() def _initialize_settings(self): """Initialize LlamaIndex settings (LLM, Embeddings)""" try: # LLM Setup if self.config.OPENAI_API_KEY: Settings.llm = OpenAI(model=self.config.OPENAI_MODEL, api_key=self.config.OPENAI_API_KEY) logger.info(f"LlamaIndex using OpenAI model: {self.config.OPENAI_MODEL}") elif self.config.NEBIUS_API_KEY: Settings.llm = OpenAI( model=self.config.NEBIUS_MODEL, api_key=self.config.NEBIUS_API_KEY, api_base=self.config.NEBIUS_BASE_URL ) logger.info(f"LlamaIndex using Nebius model: {self.config.NEBIUS_MODEL}") else: logger.warning("No API key found for LlamaIndex LLM. Agentic features may fail.") # Embedding Setup if self.config.EMBEDDING_MODEL.startswith("text-embedding-"): if self.config.OPENAI_API_KEY: Settings.embed_model = OpenAIEmbedding( model=self.config.EMBEDDING_MODEL, api_key=self.config.OPENAI_API_KEY ) else: Settings.embed_model = HuggingFaceEmbedding( model_name="sentence-transformers/all-MiniLM-L6-v2" ) else: Settings.embed_model = HuggingFaceEmbedding( model_name=self.config.EMBEDDING_MODEL ) except Exception as e: logger.error(f"Error initializing LlamaIndex settings: {str(e)}") def _try_load_from_storage(self): """Try to load index from storage synchronously""" try: if self.storage_dir.exists() and any(self.storage_dir.iterdir()): logger.info("Loading LlamaIndex from storage...") storage_context = StorageContext.from_defaults(persist_dir=str(self.storage_dir)) self.index = load_index_from_storage(storage_context) self._initialize_agent() self.is_initialized = True else: logger.info("No existing LlamaIndex storage found. Waiting for initialization.") except Exception as e: logger.error(f"Error loading LlamaIndex from storage: {str(e)}") async def initialize(self): """Async initialization to sync documents and build index""" try: logger.info("Starting LlamaIndex async initialization...") if self.index is None: await self.sync_from_document_store() self.is_initialized = True logger.info("LlamaIndex async initialization complete.") except Exception as e: logger.error(f"Error during LlamaIndex async initialization: {str(e)}") async def sync_from_document_store(self): """Sync documents from DocumentStore to LlamaIndex""" try: logger.info("Syncing documents from DocumentStore to LlamaIndex...") docs = await self.document_store.list_documents(limit=1000) if not docs: logger.warning("No documents found in DocumentStore. Creating empty index.") # FIX: Handle empty state gracefully self.index = None self.agent = None return # Convert to LlamaIndex documents llama_docs = [] for doc in docs: if doc.content and len(doc.content.strip()) > 0: llama_doc = Document( text=doc.content, metadata={ "filename": doc.filename, "document_id": doc.id, **doc.metadata } ) llama_docs.append(llama_doc) if not llama_docs: logger.warning("Documents found but content was empty.") return logger.info(f"Building LlamaIndex with {len(llama_docs)} documents...") self.index = VectorStoreIndex.from_documents(llama_docs) # Persist storage if not self.storage_dir.exists(): self.storage_dir.mkdir(parents=True, exist_ok=True) self.index.storage_context.persist(persist_dir=str(self.storage_dir)) # Re-initialize agent with new index self._initialize_agent() logger.info("LlamaIndex sync complete.") except Exception as e: logger.error(f"Error syncing LlamaIndex: {str(e)}") async def sync_on_demand(self): """Manual trigger for syncing documents""" await self.sync_from_document_store() return True def _initialize_agent(self): """Initialize the ReAct agent with query engine tools""" try: if not self.index: return query_engine = self.index.as_query_engine() query_engine_tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name="document_search", description="Search and retrieve information from the document library. Use this for specific questions about content." ) ) # ReAct Agent requires an LLM self.agent = ReActAgent.from_tools( [query_engine_tool], llm=Settings.llm, verbose=True ) logger.info("LlamaIndex ReAct agent initialized") except Exception as e: logger.error(f"Error initializing LlamaIndex agent: {str(e)}") async def query(self, query_text: str) -> str: """Process a query using the agent""" # 1. AUTO-RECOVERY: If agent is missing, try to initialize it now if not self.agent: logger.info("Agent not found during query. Attempting to initialize...") await self.initialize() # 2. Check if it's still missing after attempt if not self.agent: # Check why it failed if not self.index: return "I can't answer that yet because there are no documents in the library. Please upload a document first." return "System Error: The AI agent failed to start. Please check if your OPENAI_API_KEY is correct in the .env file." try: # 3. Run the query response = await self.agent.achat(query_text) return str(response) except Exception as e: logger.error(f"Error querying LlamaIndex agent: {str(e)}") return f"I encountered an error searching the documents: {str(e)}"