import logging import os from typing import List, Optional, Any from pathlib import Path import shutil import asyncio from llama_index.core import ( VectorStoreIndex, Document, StorageContext, load_index_from_storage, Settings, SummaryIndex ) from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from llama_index.core.selectors import LLMSingleSelector from llama_index.core.query_engine import RouterQueryEngine from llama_index.llms.openai import OpenAI from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.embeddings.openai import OpenAIEmbedding import config from services.document_store_service import DocumentStoreService logger = logging.getLogger(__name__) class LlamaIndexService: def __init__(self, document_store: DocumentStoreService): self.document_store = document_store self.config = config.config self.storage_dir = Path(self.config.DATA_DIR) / "llamaindex_storage" self.index = None self.agent = None self.is_initialized = False self._initialize_settings() # We don't fully initialize index here because we need async access to doc store # But we try to load existing storage if available self._try_load_from_storage() def _initialize_settings(self): """Initialize LlamaIndex settings (LLM, Embeddings)""" try: # LLM Setup if self.config.OPENAI_API_KEY: # Use configured OpenAI model (gpt-5.1-chat-latest or similar) Settings.llm = OpenAI(model=self.config.OPENAI_MODEL, api_key=self.config.OPENAI_API_KEY) logger.info(f"LlamaIndex using OpenAI model: {self.config.OPENAI_MODEL}") elif self.config.NEBIUS_API_KEY: # Use Nebius as OpenAI-compatible provider Settings.llm = OpenAI( model=self.config.NEBIUS_MODEL, api_key=self.config.NEBIUS_API_KEY, api_base=self.config.NEBIUS_BASE_URL ) logger.info(f"LlamaIndex using Nebius model: {self.config.NEBIUS_MODEL}") else: logger.warning("No API key found for LlamaIndex LLM (OpenAI or Nebius). Agentic features may fail.") # Embedding Setup if self.config.EMBEDDING_MODEL.startswith("text-embedding-"): if self.config.OPENAI_API_KEY: Settings.embed_model = OpenAIEmbedding( model=self.config.EMBEDDING_MODEL, api_key=self.config.OPENAI_API_KEY ) logger.info(f"LlamaIndex using OpenAI embeddings: {self.config.EMBEDDING_MODEL}") else: logger.warning("OpenAI embedding model requested but no API key found. Falling back to HuggingFace.") Settings.embed_model = HuggingFaceEmbedding( model_name="sentence-transformers/all-MiniLM-L6-v2" ) else: Settings.embed_model = HuggingFaceEmbedding( model_name=self.config.EMBEDDING_MODEL ) logger.info(f"LlamaIndex using HuggingFace embeddings: {self.config.EMBEDDING_MODEL}") except Exception as e: logger.error(f"Error initializing LlamaIndex settings: {str(e)}") def _try_load_from_storage(self): """Try to load index from storage synchronously""" try: if self.storage_dir.exists(): logger.info("Loading LlamaIndex from storage...") storage_context = StorageContext.from_defaults(persist_dir=str(self.storage_dir)) self.index = load_index_from_storage(storage_context) self._initialize_agent() self.is_initialized = True else: logger.info("No existing LlamaIndex storage found. Waiting for async initialization.") except Exception as e: logger.error(f"Error loading LlamaIndex from storage: {str(e)}") async def initialize(self): """Async initialization to sync documents and build index""" try: logger.info("Starting LlamaIndex async initialization...") # If we already have an index, we might still want to sync if it's empty or stale # For now, if no index exists, we definitely need to build it if self.index is None: await self.sync_from_document_store() self.is_initialized = True logger.info("LlamaIndex async initialization complete.") except Exception as e: logger.error(f"Error during LlamaIndex async initialization: {str(e)}") async def sync_from_document_store(self): """Sync documents from DocumentStore to LlamaIndex""" try: logger.info("Syncing documents from DocumentStore to LlamaIndex...") # Fetch documents from async document store # Limit to 1000 for now to avoid memory issues docs = await self.document_store.list_documents(limit=1000) if not docs: logger.warning("No documents found in DocumentStore to sync.") # Create empty index if no docs self.index = VectorStoreIndex.from_documents([]) else: # Convert to LlamaIndex documents llama_docs = [] for doc in docs: llama_doc = Document( text=doc.content, metadata={ "filename": doc.filename, "document_id": doc.id, **doc.metadata } ) llama_docs.append(llama_doc) logger.info(f"Building LlamaIndex with {len(llama_docs)} documents...") self.index = VectorStoreIndex.from_documents(llama_docs) # Persist storage if not self.storage_dir.exists(): self.storage_dir.mkdir(parents=True, exist_ok=True) self.index.storage_context.persist(persist_dir=str(self.storage_dir)) # Re-initialize agent with new index self._initialize_agent() logger.info("LlamaIndex sync complete.") except Exception as e: logger.error(f"Error syncing LlamaIndex: {str(e)}") async def sync_on_demand(self): """Manual trigger for syncing documents""" await self.sync_from_document_store() return True def _initialize_agent(self): """Initialize the ReAct agent with query engine tools""" try: if not self.index: return query_engine = self.index.as_query_engine() query_engine_tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name="document_search", description="Search and retrieve information from the document library. Use this for specific questions about content." ) ) self.agent = ReActAgent.from_tools( [query_engine_tool], llm=Settings.llm, verbose=True ) logger.info("LlamaIndex ReAct agent initialized") except Exception as e: logger.error(f"Error initializing LlamaIndex agent: {str(e)}") async def query(self, query_text: str) -> str: """Process a query using the agent""" if not self.agent: if not self.is_initialized: return "Agent is initializing, please try again in a moment." return "Agent failed to initialize. Please check logs." try: response = await self.agent.achat(query_text) return str(response) except Exception as e: logger.error(f"Error querying LlamaIndex agent: {str(e)}") return f"Error processing query: {str(e)}"