Spaces:
Paused
Paused
| """ | |
| Schema Analyzer Agent: Automatically analyze customer schemas and propose mappings. | |
| """ | |
| import json | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from anthropic import Anthropic | |
| from ..models import ( | |
| CustomerSchema, | |
| SchemaTable, | |
| SchemaColumn, | |
| SemanticType, | |
| ConceptMapping, | |
| ) | |
| from ..knowledge_graph import SchemaKnowledgeGraph | |
| from ..config import Config | |
| class SchemaAnalyzerAgent: | |
| """ | |
| Automatically analyzes new customer database schemas and proposes | |
| semantic concept mappings using LLM-powered understanding. | |
| """ | |
| def __init__( | |
| self, | |
| anthropic_api_key: str, | |
| knowledge_graph: SchemaKnowledgeGraph, | |
| config: Optional[Config] = None | |
| ): | |
| """ | |
| Initialize the schema analyzer agent. | |
| Args: | |
| anthropic_api_key: API key for Anthropic Claude | |
| knowledge_graph: Schema knowledge graph with semantic concepts | |
| config: Configuration object (creates new if None) | |
| """ | |
| self.client = Anthropic(api_key=anthropic_api_key) | |
| self.kg = knowledge_graph | |
| self.config = config or Config() | |
| self.model = self.config.model_name | |
| self.max_tokens = self.config.max_tokens | |
| self.temperature = self.config.temperature | |
| def _build_system_prompt(self) -> str: | |
| """Build the system prompt with available semantic concepts.""" | |
| concepts = [] | |
| for concept_name, concept_data in self.kg.concepts.items(): | |
| description = concept_data.description if hasattr(concept_data, 'description') else 'No description' | |
| concepts.append( | |
| f"- {concept_name}: {description}" | |
| ) | |
| concept_list = "\n".join(concepts) | |
| return f"""You are a database schema analysis assistant. Your job is to analyze customer database schemas and identify which columns map to semantic concepts. | |
| Available Semantic Concepts: | |
| {concept_list} | |
| Your task is to: | |
| 1. Examine the customer's table and column names | |
| 2. Consider column data types and constraints | |
| 3. Identify which columns represent which semantic concepts | |
| 4. Determine if any transformations are needed (e.g., converting annual to lifetime values) | |
| 5. Provide confidence scores (0.0 to 1.0) for each mapping | |
| Semantic Types: | |
| - "string": Text values (VARCHAR, TEXT, CHAR) | |
| - "number": Numeric values (INT, FLOAT, DECIMAL, BIGINT) | |
| - "date": Date values (DATE, DATETIME, TIMESTAMP) | |
| - "boolean": True/False values (BOOLEAN, TINYINT(1)) | |
| - "enum": One of a fixed set of values | |
| Return ONLY valid JSON matching this schema: | |
| {{ | |
| "mappings": [ | |
| {{ | |
| "concept": "semantic_concept_name", | |
| "table": "table_name", | |
| "column": "column_name", | |
| "confidence": 0.95, | |
| "reasoning": "Why this mapping makes sense", | |
| "transformation": "Optional: transformation rule if needed" | |
| }} | |
| ] | |
| }}""" | |
| def _build_user_prompt( | |
| self, customer_id: str, schema: CustomerSchema | |
| ) -> str: | |
| """Build the user prompt with schema details.""" | |
| # Format schema information | |
| schema_text = f"Customer: {customer_id}\n\nTables and Columns:\n" | |
| for table in schema.tables: | |
| schema_text += f"\nTable: {table.name}\n" | |
| for column in table.columns: | |
| constraints = [] | |
| if column.is_primary_key: | |
| constraints.append("PRIMARY KEY") | |
| if column.is_foreign_key: | |
| constraints.append("FOREIGN KEY") | |
| constraint_str = ", ".join(constraints) if constraints else "" | |
| schema_text += f" - {column.name} ({column.data_type}) {constraint_str}\n" | |
| return f"""{schema_text} | |
| Analyze this schema and identify mappings to semantic concepts. | |
| Example mappings for reference: | |
| Customer A: | |
| - contract_identifier β contracts.contract_id (confidence: 1.0, exact name match) | |
| - contract_value β contracts.total_value (confidence: 0.95, "total_value" represents contract value) | |
| - contract_expiration β contracts.end_date (confidence: 0.90, "end_date" is when contract expires) | |
| Customer B (multi-table): | |
| - customer_name β customers.customer_name (confidence: 1.0, requires JOIN via contracts.customer_id) | |
| Customer D (transformation needed): | |
| - contract_expiration β agreements.days_remaining (confidence: 0.85, transformation: "Convert days_remaining to actual date using CURRENT_DATE") | |
| Consider: | |
| 1. Exact name matches have highest confidence | |
| 2. Semantic equivalents (e.g., "end_date" for expiration) have high confidence | |
| 3. Multi-table mappings may require JOINs | |
| 4. Some fields may need transformations (e.g., days β date, annual β lifetime) | |
| Return ONLY the JSON object with mappings.""" | |
| def analyze_schema( | |
| self, customer_id: str, schema: CustomerSchema, max_retries: int = 2 | |
| ) -> List[ConceptMapping]: | |
| """ | |
| Analyze a customer schema and propose concept mappings. | |
| Args: | |
| customer_id: The customer identifier | |
| schema: The customer's database schema | |
| max_retries: Maximum number of retries on parsing errors | |
| Returns: | |
| List of ConceptMapping objects with proposed mappings | |
| Raises: | |
| ValueError: If unable to analyze schema after retries | |
| """ | |
| system_prompt = self._build_system_prompt() | |
| user_prompt = self._build_user_prompt(customer_id, schema) | |
| for attempt in range(max_retries + 1): | |
| try: | |
| # Call Claude API | |
| message = self.client.messages.create( | |
| model=self.model, | |
| max_tokens=self.max_tokens, | |
| system=system_prompt, | |
| messages=[{"role": "user", "content": user_prompt}], | |
| temperature=self.temperature, | |
| ) | |
| # Extract JSON from response | |
| response_text = message.content[0].text.strip() | |
| # Handle markdown code blocks | |
| if response_text.startswith("```"): | |
| lines = response_text.split("\n") | |
| response_text = "\n".join( | |
| line for line in lines if not line.startswith("```") | |
| ) | |
| # Parse JSON | |
| result = json.loads(response_text) | |
| # Convert to ConceptMapping objects | |
| mappings = [] | |
| for m in result["mappings"]: | |
| # Find the actual column to get semantic type and data type | |
| semantic_type = SemanticType.TEXT # Default | |
| data_type = "TEXT" # Default | |
| for table in schema.tables: | |
| if table.name == m["table"]: | |
| for column in table.columns: | |
| if column.name == m["column"]: | |
| data_type = column.data_type | |
| # Infer semantic type from SQL type | |
| sql_type = column.data_type.upper() | |
| if any( | |
| t in sql_type | |
| for t in ["INT", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC", "REAL"] | |
| ): | |
| semantic_type = SemanticType.FLOAT | |
| elif any( | |
| t in sql_type for t in ["DATE", "TIME", "TIMESTAMP"] | |
| ): | |
| semantic_type = SemanticType.DATE | |
| elif any( | |
| t in sql_type for t in ["BOOL", "TINYINT(1)"] | |
| ): | |
| semantic_type = SemanticType.BOOLEAN | |
| else: | |
| semantic_type = SemanticType.TEXT | |
| break | |
| mapping = ConceptMapping( | |
| customer_id=customer_id, | |
| table_name=m["table"], | |
| column_name=m["column"], | |
| data_type=data_type, | |
| semantic_type=semantic_type, | |
| transformation=m.get("transformation"), | |
| ) | |
| # Store additional metadata from LLM (not in model) | |
| mapping._confidence = m.get("confidence", 0.8) | |
| mapping._reasoning = m.get("reasoning", "") | |
| mapping._concept = m["concept"] | |
| mappings.append(mapping) | |
| # Validate that all concepts exist | |
| for mapping in mappings: | |
| if mapping._concept not in self.kg.concepts: | |
| raise ValueError( | |
| f"Unknown semantic concept: {mapping._concept}. " | |
| f"Available: {list(self.kg.concepts.keys())}" | |
| ) | |
| return mappings | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| if attempt < max_retries: | |
| user_prompt += f"\n\nPrevious attempt failed: {str(e)}\nPlease try again with valid JSON." | |
| continue | |
| else: | |
| raise ValueError( | |
| f"Failed to analyze schema after {max_retries + 1} attempts. " | |
| f"Last error: {str(e)}" | |
| ) | |
| raise ValueError("Unexpected error in schema analysis") | |
| def explain_mappings( | |
| self, mappings: List[ConceptMapping], include_low_confidence: bool = False | |
| ) -> str: | |
| """ | |
| Generate a human-readable explanation of proposed mappings. | |
| Args: | |
| mappings: List of concept mappings | |
| include_low_confidence: Whether to include low-confidence mappings | |
| Returns: | |
| Human-readable explanation | |
| """ | |
| lines = ["Proposed Schema Mappings:\n"] | |
| # Group by concept | |
| by_concept: Dict[str, List[ConceptMapping]] = {} | |
| for mapping in mappings: | |
| confidence = getattr(mapping, '_confidence', 1.0) | |
| concept = getattr(mapping, '_concept', 'unknown') | |
| if not include_low_confidence and confidence < 0.5: | |
| continue | |
| if concept not in by_concept: | |
| by_concept[concept] = [] | |
| by_concept[concept].append(mapping) | |
| for concept, concept_mappings in sorted(by_concept.items()): | |
| concept_display = concept.replace("_", " ").title() | |
| lines.append(f"\n{concept_display}:") | |
| for mapping in concept_mappings: | |
| confidence = getattr(mapping, '_confidence', 1.0) | |
| confidence_pct = int(confidence * 100) | |
| location = f"{mapping.table_name}.{mapping.column_name}" | |
| line = f" - {location} ({confidence_pct}% confidence)" | |
| if mapping.transformation: | |
| line += f"\n Transformation: {mapping.transformation}" | |
| lines.append(line) | |
| return "\n".join(lines) | |
| def validate_mappings( | |
| self, | |
| customer_id: str, | |
| schema: CustomerSchema, | |
| mappings: List[ConceptMapping], | |
| ) -> Tuple[List[ConceptMapping], List[str]]: | |
| """ | |
| Validate proposed mappings against the actual schema. | |
| Args: | |
| customer_id: The customer identifier | |
| schema: The customer's database schema | |
| mappings: Proposed concept mappings | |
| Returns: | |
| Tuple of (valid_mappings, error_messages) | |
| """ | |
| valid_mappings = [] | |
| errors = [] | |
| # Build a lookup of available tables and columns | |
| schema_lookup = {} | |
| for table in schema.tables: | |
| schema_lookup[table.name] = { | |
| col.name for col in table.columns | |
| } | |
| for mapping in mappings: | |
| # Check if table exists | |
| if mapping.table_name not in schema_lookup: | |
| errors.append( | |
| f"Table '{mapping.table_name}' not found in schema for {customer_id}" | |
| ) | |
| continue | |
| # Check if column exists | |
| if mapping.column_name not in schema_lookup[mapping.table_name]: | |
| errors.append( | |
| f"Column '{mapping.column_name}' not found in table " | |
| f"'{mapping.table_name}' for {customer_id}" | |
| ) | |
| continue | |
| # Check if concept exists (stored as _concept attribute) | |
| concept = getattr(mapping, '_concept', None) | |
| if concept and concept not in self.kg.concepts: | |
| errors.append(f"Unknown concept: {concept}") | |
| continue | |
| # Mapping is valid | |
| valid_mappings.append(mapping) | |
| return valid_mappings, errors | |