Spaces:
Paused
Paused
File size: 13,123 Bytes
a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 f2c8ef1 a584f85 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
"""
Schema Analyzer Agent: Automatically analyze customer schemas and propose mappings.
"""
import json
from typing import List, Dict, Any, Tuple, Optional
from anthropic import Anthropic
from ..models import (
CustomerSchema,
SchemaTable,
SchemaColumn,
SemanticType,
ConceptMapping,
)
from ..knowledge_graph import SchemaKnowledgeGraph
from ..config import Config
class SchemaAnalyzerAgent:
"""
Automatically analyzes new customer database schemas and proposes
semantic concept mappings using LLM-powered understanding.
"""
def __init__(
self,
anthropic_api_key: str,
knowledge_graph: SchemaKnowledgeGraph,
config: Optional[Config] = None
):
"""
Initialize the schema analyzer agent.
Args:
anthropic_api_key: API key for Anthropic Claude
knowledge_graph: Schema knowledge graph with semantic concepts
config: Configuration object (creates new if None)
"""
self.client = Anthropic(api_key=anthropic_api_key)
self.kg = knowledge_graph
self.config = config or Config()
self.model = self.config.model_name
self.max_tokens = self.config.max_tokens
self.temperature = self.config.temperature
def _build_system_prompt(self) -> str:
"""Build the system prompt with available semantic concepts."""
concepts = []
for concept_name, concept_data in self.kg.concepts.items():
description = concept_data.description if hasattr(concept_data, 'description') else 'No description'
concepts.append(
f"- {concept_name}: {description}"
)
concept_list = "\n".join(concepts)
return f"""You are a database schema analysis assistant. Your job is to analyze customer database schemas and identify which columns map to semantic concepts.
Available Semantic Concepts:
{concept_list}
Your task is to:
1. Examine the customer's table and column names
2. Consider column data types and constraints
3. Identify which columns represent which semantic concepts
4. Determine if any transformations are needed (e.g., converting annual to lifetime values)
5. Provide confidence scores (0.0 to 1.0) for each mapping
Semantic Types:
- "string": Text values (VARCHAR, TEXT, CHAR)
- "number": Numeric values (INT, FLOAT, DECIMAL, BIGINT)
- "date": Date values (DATE, DATETIME, TIMESTAMP)
- "boolean": True/False values (BOOLEAN, TINYINT(1))
- "enum": One of a fixed set of values
Return ONLY valid JSON matching this schema:
{{
"mappings": [
{{
"concept": "semantic_concept_name",
"table": "table_name",
"column": "column_name",
"confidence": 0.95,
"reasoning": "Why this mapping makes sense",
"transformation": "Optional: transformation rule if needed"
}}
]
}}"""
def _build_user_prompt(
self, customer_id: str, schema: CustomerSchema
) -> str:
"""Build the user prompt with schema details."""
# Format schema information
schema_text = f"Customer: {customer_id}\n\nTables and Columns:\n"
for table in schema.tables:
schema_text += f"\nTable: {table.name}\n"
for column in table.columns:
constraints = []
if column.is_primary_key:
constraints.append("PRIMARY KEY")
if column.is_foreign_key:
constraints.append("FOREIGN KEY")
constraint_str = ", ".join(constraints) if constraints else ""
schema_text += f" - {column.name} ({column.data_type}) {constraint_str}\n"
return f"""{schema_text}
Analyze this schema and identify mappings to semantic concepts.
Example mappings for reference:
Customer A:
- contract_identifier β contracts.contract_id (confidence: 1.0, exact name match)
- contract_value β contracts.total_value (confidence: 0.95, "total_value" represents contract value)
- contract_expiration β contracts.end_date (confidence: 0.90, "end_date" is when contract expires)
Customer B (multi-table):
- customer_name β customers.customer_name (confidence: 1.0, requires JOIN via contracts.customer_id)
Customer D (transformation needed):
- contract_expiration β agreements.days_remaining (confidence: 0.85, transformation: "Convert days_remaining to actual date using CURRENT_DATE")
Consider:
1. Exact name matches have highest confidence
2. Semantic equivalents (e.g., "end_date" for expiration) have high confidence
3. Multi-table mappings may require JOINs
4. Some fields may need transformations (e.g., days β date, annual β lifetime)
Return ONLY the JSON object with mappings."""
def analyze_schema(
self, customer_id: str, schema: CustomerSchema, max_retries: int = 2
) -> List[ConceptMapping]:
"""
Analyze a customer schema and propose concept mappings.
Args:
customer_id: The customer identifier
schema: The customer's database schema
max_retries: Maximum number of retries on parsing errors
Returns:
List of ConceptMapping objects with proposed mappings
Raises:
ValueError: If unable to analyze schema after retries
"""
system_prompt = self._build_system_prompt()
user_prompt = self._build_user_prompt(customer_id, schema)
for attempt in range(max_retries + 1):
try:
# Call Claude API
message = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
temperature=self.temperature,
)
# Extract JSON from response
response_text = message.content[0].text.strip()
# Handle markdown code blocks
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(
line for line in lines if not line.startswith("```")
)
# Parse JSON
result = json.loads(response_text)
# Convert to ConceptMapping objects
mappings = []
for m in result["mappings"]:
# Find the actual column to get semantic type and data type
semantic_type = SemanticType.TEXT # Default
data_type = "TEXT" # Default
for table in schema.tables:
if table.name == m["table"]:
for column in table.columns:
if column.name == m["column"]:
data_type = column.data_type
# Infer semantic type from SQL type
sql_type = column.data_type.upper()
if any(
t in sql_type
for t in ["INT", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC", "REAL"]
):
semantic_type = SemanticType.FLOAT
elif any(
t in sql_type for t in ["DATE", "TIME", "TIMESTAMP"]
):
semantic_type = SemanticType.DATE
elif any(
t in sql_type for t in ["BOOL", "TINYINT(1)"]
):
semantic_type = SemanticType.BOOLEAN
else:
semantic_type = SemanticType.TEXT
break
mapping = ConceptMapping(
customer_id=customer_id,
table_name=m["table"],
column_name=m["column"],
data_type=data_type,
semantic_type=semantic_type,
transformation=m.get("transformation"),
)
# Store additional metadata from LLM (not in model)
mapping._confidence = m.get("confidence", 0.8)
mapping._reasoning = m.get("reasoning", "")
mapping._concept = m["concept"]
mappings.append(mapping)
# Validate that all concepts exist
for mapping in mappings:
if mapping._concept not in self.kg.concepts:
raise ValueError(
f"Unknown semantic concept: {mapping._concept}. "
f"Available: {list(self.kg.concepts.keys())}"
)
return mappings
except (json.JSONDecodeError, KeyError, ValueError) as e:
if attempt < max_retries:
user_prompt += f"\n\nPrevious attempt failed: {str(e)}\nPlease try again with valid JSON."
continue
else:
raise ValueError(
f"Failed to analyze schema after {max_retries + 1} attempts. "
f"Last error: {str(e)}"
)
raise ValueError("Unexpected error in schema analysis")
def explain_mappings(
self, mappings: List[ConceptMapping], include_low_confidence: bool = False
) -> str:
"""
Generate a human-readable explanation of proposed mappings.
Args:
mappings: List of concept mappings
include_low_confidence: Whether to include low-confidence mappings
Returns:
Human-readable explanation
"""
lines = ["Proposed Schema Mappings:\n"]
# Group by concept
by_concept: Dict[str, List[ConceptMapping]] = {}
for mapping in mappings:
confidence = getattr(mapping, '_confidence', 1.0)
concept = getattr(mapping, '_concept', 'unknown')
if not include_low_confidence and confidence < 0.5:
continue
if concept not in by_concept:
by_concept[concept] = []
by_concept[concept].append(mapping)
for concept, concept_mappings in sorted(by_concept.items()):
concept_display = concept.replace("_", " ").title()
lines.append(f"\n{concept_display}:")
for mapping in concept_mappings:
confidence = getattr(mapping, '_confidence', 1.0)
confidence_pct = int(confidence * 100)
location = f"{mapping.table_name}.{mapping.column_name}"
line = f" - {location} ({confidence_pct}% confidence)"
if mapping.transformation:
line += f"\n Transformation: {mapping.transformation}"
lines.append(line)
return "\n".join(lines)
def validate_mappings(
self,
customer_id: str,
schema: CustomerSchema,
mappings: List[ConceptMapping],
) -> Tuple[List[ConceptMapping], List[str]]:
"""
Validate proposed mappings against the actual schema.
Args:
customer_id: The customer identifier
schema: The customer's database schema
mappings: Proposed concept mappings
Returns:
Tuple of (valid_mappings, error_messages)
"""
valid_mappings = []
errors = []
# Build a lookup of available tables and columns
schema_lookup = {}
for table in schema.tables:
schema_lookup[table.name] = {
col.name for col in table.columns
}
for mapping in mappings:
# Check if table exists
if mapping.table_name not in schema_lookup:
errors.append(
f"Table '{mapping.table_name}' not found in schema for {customer_id}"
)
continue
# Check if column exists
if mapping.column_name not in schema_lookup[mapping.table_name]:
errors.append(
f"Column '{mapping.column_name}' not found in table "
f"'{mapping.table_name}' for {customer_id}"
)
continue
# Check if concept exists (stored as _concept attribute)
concept = getattr(mapping, '_concept', None)
if concept and concept not in self.kg.concepts:
errors.append(f"Unknown concept: {concept}")
continue
# Mapping is valid
valid_mappings.append(mapping)
return valid_mappings, errors
|