import logging from typing import List, Dict, Any, Optional from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path import re import uuid try: from elevenlabs import VoiceSettings from elevenlabs.client import ElevenLabs ELEVENLABS_AVAILABLE = True except ImportError: ELEVENLABS_AVAILABLE = False import config from services.llamaindex_service import LlamaIndexService from services.llm_service import LLMService from services.document_store_service import DocumentStoreService logger = logging.getLogger(__name__) @dataclass class DocumentAnalysis: """Analysis results from document(s)""" key_insights: List[str] topics: List[str] complexity_level: str estimated_words: int source_documents: List[str] summary: str @dataclass class DialogueLine: """Single line of podcast dialogue""" speaker: str text: str pause_after: float = 0.5 @dataclass class PodcastScript: """Complete podcast script""" dialogue: List[DialogueLine] total_duration_estimate: float word_count: int style: str def to_text(self) -> str: lines = [] for line in self.dialogue: lines.append(f"{line.speaker}: {line.text}") return "\n\n".join(lines) @dataclass class PodcastMetadata: """Metadata for generated podcast""" podcast_id: str title: str description: str source_documents: List[str] style: str duration_seconds: float file_size_mb: float voices: Dict[str, str] generated_at: str generation_cost: Dict[str, float] key_topics: List[str] @dataclass class PodcastResult: """Complete podcast generation result""" podcast_id: str audio_file_path: str transcript: str metadata: PodcastMetadata generation_time: float success: bool error: Optional[str] = None class PodcastGeneratorService: """ Service for generating conversational podcasts from documents. """ WORDS_PER_MINUTE = 150 SCRIPT_PROMPTS = { "conversational": """You are an expert podcast script writer. Create an engaging 2-host podcast discussing the provided documents. DOCUMENT CONTENT: {document_content} KEY INSIGHTS: {key_insights} REQUIREMENTS: - Duration: {duration_minutes} minutes (approximately {word_count} words) - Style: Conversational, friendly, and accessible - Format: Alternating dialogue between HOST1 and HOST2 - Make the content engaging and easy to understand - Include natural transitions and enthusiasm DIALOGUE FORMAT (strictly follow): HOST1: [What they say] HOST2: [What they say] STRUCTURE: 1. Opening Hook (30 seconds): Grab attention 2. Introduction (1 minute): Set context 3. Main Discussion (70% of time): Deep dive into insights 4. Wrap-up (1 minute): Summarize key takeaways Generate the complete podcast script now:""", "educational": """Create an educational podcast discussing the provided documents. DOCUMENT CONTENT: {document_content} KEY INSIGHTS: {key_insights} REQUIREMENTS: - Duration: {duration_minutes} minutes (approximately {word_count} words) - Style: Clear, methodical, educational - HOST1 acts as teacher, HOST2 as curious learner DIALOGUE FORMAT: HOST1: [Expert explanation] HOST2: [Clarifying question] Generate the educational podcast script now:""", "technical": """Create a technical podcast for an informed audience. DOCUMENT CONTENT: {document_content} KEY INSIGHTS: {key_insights} REQUIREMENTS: - Duration: {duration_minutes} minutes (approximately {word_count} words) - Style: Professional, detailed, technically accurate - HOST1 is expert, HOST2 is informed interviewer DIALOGUE FORMAT: HOST1: [Technical insight] HOST2: [Probing question] Generate the technical podcast script now:""", "casual": """Create a fun, casual podcast discussing the documents. DOCUMENT CONTENT: {document_content} KEY INSIGHTS: {key_insights} REQUIREMENTS: - Duration: {duration_minutes} minutes (approximately {word_count} words) - Style: Relaxed, humorous, energetic - Make it entertaining while informative DIALOGUE FORMAT: HOST1: [Casual commentary] HOST2: [Enthusiastic response] Generate the casual podcast script now:""" } def __init__( self, llamaindex_service: LlamaIndexService, llm_service: LLMService, elevenlabs_api_key: Optional[str] = None ): self.config = config.config self.llamaindex_service = llamaindex_service self.llm_service = llm_service # Get document store from llamaindex service self.document_store = llamaindex_service.document_store # Initialize ElevenLabs client self.elevenlabs_client = None if ELEVENLABS_AVAILABLE: api_key = elevenlabs_api_key or self.config.ELEVENLABS_API_KEY if api_key: try: self.elevenlabs_client = ElevenLabs(api_key=api_key) logger.info("ElevenLabs client initialized for podcast generation") except Exception as e: logger.error(f"Failed to initialize ElevenLabs client: {e}") # Create podcast storage directory self.podcast_dir = Path("./data/podcasts") self.podcast_dir.mkdir(parents=True, exist_ok=True) # Metadata database file self.metadata_file = self.podcast_dir / "metadata_db.json" self._ensure_metadata_db() # Voice cache self._voice_cache = {} def _ensure_metadata_db(self): """Ensure metadata database exists""" if not self.metadata_file.exists(): import json self.metadata_file.write_text(json.dumps([], indent=2)) async def generate_podcast( self, document_ids: List[str], style: str = "conversational", duration_minutes: int = 10, host1_voice: str = "Rachel", host2_voice: str = "Adam" ) -> PodcastResult: """Generate a complete podcast from documents""" start_time = datetime.now() podcast_id = str(uuid.uuid4()) try: logger.info(f"Starting podcast generation {podcast_id}") logger.info(f"Documents: {document_ids}, Style: {style}, Duration: {duration_minutes}min") # Step 1: Retrieve and analyze documents logger.info("Step 1: Retrieving and analyzing documents...") analysis = await self.analyze_documents(document_ids) # Step 2: Generate script logger.info("Step 2: Generating podcast script...") script = await self.generate_script(analysis, style, duration_minutes) # Step 3: Synthesize audio logger.info("Step 3: Synthesizing audio with voices...") audio_file_path = await self.synthesize_audio( podcast_id, script, host1_voice, host2_voice ) # Calculate generation time generation_time = (datetime.now() - start_time).total_seconds() # Step 4: Create metadata logger.info("Step 4: Creating metadata...") metadata = self._create_metadata( podcast_id, analysis, script, audio_file_path, {host1_voice, host2_voice}, document_ids, style ) # Save metadata self._save_metadata(metadata) # Save transcript transcript_path = self.podcast_dir / f"{podcast_id}_transcript.txt" transcript_path.write_text(script.to_text(), encoding="utf-8") logger.info(f"Podcast generated successfully: {podcast_id}") return PodcastResult( podcast_id=podcast_id, audio_file_path=str(audio_file_path), transcript=script.to_text(), metadata=metadata, generation_time=generation_time, success=True ) except Exception as e: logger.error(f"Podcast generation failed: {str(e)}", exc_info=True) return PodcastResult( podcast_id=podcast_id, audio_file_path="", transcript="", metadata=None, generation_time=(datetime.now() - start_time).total_seconds(), success=False, error=str(e) ) async def analyze_documents(self, document_ids: List[str]) -> DocumentAnalysis: """ Retrieve documents and extract key insights for podcast FIXED: Now actually retrieves document content from document store """ try: # Step 1: Retrieve actual documents from document store logger.info(f"Retrieving {len(document_ids)} documents from store...") documents = [] document_contents = [] for doc_id in document_ids: doc = await self.document_store.get_document(doc_id) if doc: documents.append(doc) document_contents.append(doc.content) logger.info(f"Retrieved document: {doc.filename} ({len(doc.content)} chars)") else: logger.warning(f"Document {doc_id} not found in store") if not documents: raise ValueError(f"No documents found for IDs: {document_ids}") # Step 2: Combine document content combined_content = "\n\n---DOCUMENT SEPARATOR---\n\n".join(document_contents) # Truncate if too long (keep first portion for context) max_content_length = 15000 # Adjust based on your LLM context window if len(combined_content) > max_content_length: logger.warning(f"Content too long ({len(combined_content)} chars), truncating to {max_content_length}") combined_content = combined_content[:max_content_length] + "\n\n[Content truncated...]" # Step 3: Use LLM to analyze the content analysis_prompt = f"""Analyze the following document(s) and provide: 1. The 5-7 most important insights or key points (be specific and detailed) 2. Main themes and topics covered 3. The overall complexity level (beginner/intermediate/advanced) 4. A comprehensive summary suitable for podcast discussion DOCUMENTS: {combined_content} Provide a structured analysis optimized for creating an engaging podcast discussion. Format your response as: KEY INSIGHTS: 1. [First key insight] 2. [Second key insight] ... TOPICS: - [Topic 1] - [Topic 2] ... COMPLEXITY: [beginner/intermediate/advanced] SUMMARY: [Your comprehensive summary here] """ logger.info("Analyzing content with LLM...") result = await self.llm_service.generate_text( analysis_prompt, max_tokens=2000, temperature=0.7 ) # Step 4: Parse the structured response insights = self._extract_insights(result) topics = self._extract_topics(result) complexity = self._determine_complexity(result) summary = self._extract_summary(result) logger.info(f"Analysis complete: {len(insights)} insights, {len(topics)} topics") return DocumentAnalysis( key_insights=insights[:7], topics=topics, complexity_level=complexity, estimated_words=len(combined_content.split()), source_documents=[doc.filename for doc in documents], summary=summary or result[:500] ) except Exception as e: logger.error(f"Document analysis failed: {str(e)}", exc_info=True) raise RuntimeError(f"Failed to analyze documents: {str(e)}") def _extract_summary(self, text: str) -> str: """Extract summary section from analysis""" try: if "SUMMARY:" in text: parts = text.split("SUMMARY:") if len(parts) > 1: summary = parts[1].strip() # Take first 500 chars if too long return summary[:500] if len(summary) > 500 else summary except: pass # Fallback: take first few sentences sentences = text.split('.') return '. '.join(sentences[:3]) + '.' def _extract_insights(self, text: str) -> List[str]: """Extract key insights from analysis text""" insights = [] lines = text.split('\n') in_insights_section = False for line in lines: line = line.strip() if "KEY INSIGHTS:" in line.upper(): in_insights_section = True continue elif line.upper().startswith(("TOPICS:", "COMPLEXITY:", "SUMMARY:")): in_insights_section = False if in_insights_section and line: # Match patterns like "1.", "2.", "-", "*", "•" insight = re.sub(r'^\d+\.|\-|\*|•', '', line).strip() if len(insight) > 20: insights.append(insight) # Fallback if no insights found if not insights: sentences = text.split('.') insights = [s.strip() + '.' for s in sentences[:7] if len(s.strip()) > 20] return insights def _extract_topics(self, text: str) -> List[str]: """Extract main topics from analysis""" topics = [] lines = text.split('\n') in_topics_section = False for line in lines: line = line.strip() if "TOPICS:" in line.upper(): in_topics_section = True continue elif line.upper().startswith(("KEY INSIGHTS:", "COMPLEXITY:", "SUMMARY:")): in_topics_section = False if in_topics_section and line: topic = re.sub(r'^\-|\*|•', '', line).strip() if len(topic) > 2: topics.append(topic) # Fallback: simple keyword extraction if not topics: common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} words = text.lower().split() word_freq = {} for word in words: word = re.sub(r'[^\w\s]', '', word) if len(word) > 4 and word not in common_words: word_freq[word] = word_freq.get(word, 0) + 1 top_topics = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5] topics = [topic[0].title() for topic in top_topics] return topics[:5] def _determine_complexity(self, text: str) -> str: """Determine content complexity level""" text_lower = text.lower() if "complexity:" in text_lower: for level in ["beginner", "intermediate", "advanced"]: if level in text_lower.split("complexity:")[1][:100]: return level # Heuristic based on keywords if any(word in text_lower for word in ['basic', 'introduction', 'beginner', 'simple']): return "beginner" elif any(word in text_lower for word in ['advanced', 'complex', 'sophisticated', 'expert']): return "advanced" else: return "intermediate" async def generate_script( self, analysis: DocumentAnalysis, style: str, duration_minutes: int ) -> PodcastScript: """Generate podcast script from analysis""" target_words = duration_minutes * self.WORDS_PER_MINUTE # Prepare context with insights insights_text = "\n".join(f"{i+1}. {insight}" for i, insight in enumerate(analysis.key_insights)) # Get prompt template prompt_template = self.SCRIPT_PROMPTS.get(style, self.SCRIPT_PROMPTS["conversational"]) # Fill template prompt = prompt_template.format( document_content=analysis.summary, key_insights=insights_text, duration_minutes=duration_minutes, word_count=target_words ) # Generate script script_text = await self.llm_service.generate_text( prompt, max_tokens=target_words * 2, temperature=0.8 ) # Parse into dialogue dialogue = self._parse_script(script_text) if not dialogue: raise ValueError("Failed to parse script into dialogue lines") word_count = sum(len(line.text.split()) for line in dialogue) duration_estimate = word_count / self.WORDS_PER_MINUTE return PodcastScript( dialogue=dialogue, total_duration_estimate=duration_estimate * 60, word_count=word_count, style=style ) def _parse_script(self, script_text: str) -> List[DialogueLine]: """Parse generated script into dialogue lines""" dialogue = [] lines = script_text.split('\n') for line in lines: line = line.strip() if not line: continue if line.startswith('HOST1:'): text = line[6:].strip() if text: dialogue.append(DialogueLine(speaker="HOST1", text=text)) elif line.startswith('HOST2:'): text = line[6:].strip() if text: dialogue.append(DialogueLine(speaker="HOST2", text=text)) return dialogue def _get_voice_id(self, voice_name: str) -> str: """Get voice ID from voice name""" try: # Use cache if available if not self._voice_cache: voices = self.elevenlabs_client.voices.get_all() if not voices or not voices.voices: raise RuntimeError("No voices available") for voice in voices.voices: self._voice_cache[voice.name.lower()] = voice.voice_id # Exact match if voice_name.lower() in self._voice_cache: return self._voice_cache[voice_name.lower()] # Partial match for name, voice_id in self._voice_cache.items(): if voice_name.lower() in name: logger.info(f"Partial match for '{voice_name}': {name}") return voice_id # Fallback first_voice_id = list(self._voice_cache.values())[0] logger.warning(f"Voice '{voice_name}' not found, using default") return first_voice_id except Exception as e: logger.error(f"Could not fetch voices: {e}") raise RuntimeError(f"Failed to get voice ID: {str(e)}") async def synthesize_audio( self, podcast_id: str, script: PodcastScript, host1_voice: str, host2_voice: str ) -> Path: """Synthesize audio with alternating voices""" if not self.elevenlabs_client: raise RuntimeError("ElevenLabs client not initialized") audio_file = self.podcast_dir / f"{podcast_id}.mp3" try: # Get voice IDs host1_voice_id = self._get_voice_id(host1_voice) host2_voice_id = self._get_voice_id(host2_voice) logger.info(f"HOST1: {host1_voice}, HOST2: {host2_voice}") voice_map = { "HOST1": host1_voice_id, "HOST2": host2_voice_id } audio_chunks = [] # Process each line with correct voice for i, line in enumerate(script.dialogue): logger.info(f"Line {i+1}/{len(script.dialogue)}: {line.speaker}") voice_id = voice_map.get(line.speaker, host1_voice_id) audio_generator = self.elevenlabs_client.text_to_speech.convert( voice_id=voice_id, text=line.text, model_id="eleven_multilingual_v2" ) line_chunks = [] for chunk in audio_generator: if chunk: line_chunks.append(chunk) if line_chunks: audio_chunks.append(b''.join(line_chunks)) if not audio_chunks: raise RuntimeError("No audio chunks generated") full_audio = b''.join(audio_chunks) with open(audio_file, 'wb') as f: f.write(full_audio) if audio_file.exists() and audio_file.stat().st_size > 1000: logger.info(f"Audio created: {audio_file} ({audio_file.stat().st_size} bytes)") return audio_file else: raise RuntimeError("Audio file too small or empty") except Exception as e: logger.error(f"Audio synthesis failed: {e}", exc_info=True) raise RuntimeError(f"Failed to generate audio: {str(e)}") def _create_metadata( self, podcast_id: str, analysis: DocumentAnalysis, script: PodcastScript, audio_path: Path, voices: set, document_ids: List[str], style: str ) -> PodcastMetadata: """Create podcast metadata""" title = f"Podcast: {analysis.topics[0] if analysis.topics else 'Document Discussion'}" description = f"A {style} podcast discussing: {', '.join(analysis.source_documents)}" file_size_mb = audio_path.stat().st_size / (1024 * 1024) if audio_path.exists() else 0 llm_cost = (script.word_count / 1000) * 0.01 tts_cost = (script.word_count * 5 / 1000) * 0.30 return PodcastMetadata( podcast_id=podcast_id, title=title, description=description, source_documents=analysis.source_documents, style=style, duration_seconds=script.total_duration_estimate, file_size_mb=file_size_mb, voices={"host1": list(voices)[0] if len(voices) > 0 else "Rachel", "host2": list(voices)[1] if len(voices) > 1 else "Adam"}, generated_at=datetime.now().isoformat(), generation_cost={"llm_cost": llm_cost, "tts_cost": tts_cost, "total": llm_cost + tts_cost}, key_topics=analysis.topics ) def _save_metadata(self, metadata: PodcastMetadata): """Save metadata to database""" try: import json existing = json.loads(self.metadata_file.read_text()) existing.append(asdict(metadata)) self.metadata_file.write_text(json.dumps(existing, indent=2)) logger.info(f"Metadata saved: {metadata.podcast_id}") except Exception as e: logger.error(f"Failed to save metadata: {e}") def list_podcasts(self, limit: int = 10) -> List[PodcastMetadata]: """List generated podcasts""" try: import json data = json.loads(self.metadata_file.read_text()) podcasts = [PodcastMetadata(**item) for item in data[-limit:]] return list(reversed(podcasts)) except Exception as e: logger.error(f"Failed to list podcasts: {e}") return [] def get_podcast(self, podcast_id: str) -> Optional[PodcastMetadata]: """Get specific podcast metadata""" try: import json data = json.loads(self.metadata_file.read_text()) for item in data: if item.get('podcast_id') == podcast_id: return PodcastMetadata(**item) return None except Exception as e: logger.error(f"Failed to get podcast: {e}") return None