File size: 13,123 Bytes
a584f85
 
 
 
 
f2c8ef1
a584f85
 
 
 
 
 
 
 
 
f2c8ef1
a584f85
 
 
 
 
 
 
 
f2c8ef1
 
 
 
 
 
a584f85
 
 
 
 
 
f2c8ef1
a584f85
 
 
f2c8ef1
 
 
 
a584f85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2c8ef1
a584f85
 
f2c8ef1
a584f85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""
Schema Analyzer Agent: Automatically analyze customer schemas and propose mappings.
"""

import json
from typing import List, Dict, Any, Tuple, Optional
from anthropic import Anthropic
from ..models import (
    CustomerSchema,
    SchemaTable,
    SchemaColumn,
    SemanticType,
    ConceptMapping,
)
from ..knowledge_graph import SchemaKnowledgeGraph
from ..config import Config


class SchemaAnalyzerAgent:
    """
    Automatically analyzes new customer database schemas and proposes
    semantic concept mappings using LLM-powered understanding.
    """

    def __init__(
        self, 
        anthropic_api_key: str, 
        knowledge_graph: SchemaKnowledgeGraph,
        config: Optional[Config] = None
    ):
        """
        Initialize the schema analyzer agent.

        Args:
            anthropic_api_key: API key for Anthropic Claude
            knowledge_graph: Schema knowledge graph with semantic concepts
            config: Configuration object (creates new if None)
        """
        self.client = Anthropic(api_key=anthropic_api_key)
        self.kg = knowledge_graph
        self.config = config or Config()
        self.model = self.config.model_name
        self.max_tokens = self.config.max_tokens
        self.temperature = self.config.temperature

    def _build_system_prompt(self) -> str:
        """Build the system prompt with available semantic concepts."""
        concepts = []
        for concept_name, concept_data in self.kg.concepts.items():
            description = concept_data.description if hasattr(concept_data, 'description') else 'No description'
            concepts.append(
                f"- {concept_name}: {description}"
            )
        concept_list = "\n".join(concepts)

        return f"""You are a database schema analysis assistant. Your job is to analyze customer database schemas and identify which columns map to semantic concepts.

Available Semantic Concepts:
{concept_list}

Your task is to:
1. Examine the customer's table and column names
2. Consider column data types and constraints
3. Identify which columns represent which semantic concepts
4. Determine if any transformations are needed (e.g., converting annual to lifetime values)
5. Provide confidence scores (0.0 to 1.0) for each mapping

Semantic Types:
- "string": Text values (VARCHAR, TEXT, CHAR)
- "number": Numeric values (INT, FLOAT, DECIMAL, BIGINT)
- "date": Date values (DATE, DATETIME, TIMESTAMP)
- "boolean": True/False values (BOOLEAN, TINYINT(1))
- "enum": One of a fixed set of values

Return ONLY valid JSON matching this schema:
{{
    "mappings": [
        {{
            "concept": "semantic_concept_name",
            "table": "table_name",
            "column": "column_name",
            "confidence": 0.95,
            "reasoning": "Why this mapping makes sense",
            "transformation": "Optional: transformation rule if needed"
        }}
    ]
}}"""

    def _build_user_prompt(
        self, customer_id: str, schema: CustomerSchema
    ) -> str:
        """Build the user prompt with schema details."""
        # Format schema information
        schema_text = f"Customer: {customer_id}\n\nTables and Columns:\n"
        for table in schema.tables:
            schema_text += f"\nTable: {table.name}\n"
            for column in table.columns:
                constraints = []
                if column.is_primary_key:
                    constraints.append("PRIMARY KEY")
                if column.is_foreign_key:
                    constraints.append("FOREIGN KEY")

                constraint_str = ", ".join(constraints) if constraints else ""
                schema_text += f"  - {column.name} ({column.data_type}) {constraint_str}\n"

        return f"""{schema_text}

Analyze this schema and identify mappings to semantic concepts.

Example mappings for reference:

Customer A:
- contract_identifier β†’ contracts.contract_id (confidence: 1.0, exact name match)
- contract_value β†’ contracts.total_value (confidence: 0.95, "total_value" represents contract value)
- contract_expiration β†’ contracts.end_date (confidence: 0.90, "end_date" is when contract expires)

Customer B (multi-table):
- customer_name β†’ customers.customer_name (confidence: 1.0, requires JOIN via contracts.customer_id)

Customer D (transformation needed):
- contract_expiration β†’ agreements.days_remaining (confidence: 0.85, transformation: "Convert days_remaining to actual date using CURRENT_DATE")

Consider:
1. Exact name matches have highest confidence
2. Semantic equivalents (e.g., "end_date" for expiration) have high confidence
3. Multi-table mappings may require JOINs
4. Some fields may need transformations (e.g., days β†’ date, annual β†’ lifetime)

Return ONLY the JSON object with mappings."""

    def analyze_schema(
        self, customer_id: str, schema: CustomerSchema, max_retries: int = 2
    ) -> List[ConceptMapping]:
        """
        Analyze a customer schema and propose concept mappings.

        Args:
            customer_id: The customer identifier
            schema: The customer's database schema
            max_retries: Maximum number of retries on parsing errors

        Returns:
            List of ConceptMapping objects with proposed mappings

        Raises:
            ValueError: If unable to analyze schema after retries
        """
        system_prompt = self._build_system_prompt()
        user_prompt = self._build_user_prompt(customer_id, schema)

        for attempt in range(max_retries + 1):
            try:
                # Call Claude API
                message = self.client.messages.create(
                    model=self.model,
                    max_tokens=self.max_tokens,
                    system=system_prompt,
                    messages=[{"role": "user", "content": user_prompt}],
                    temperature=self.temperature,
                )

                # Extract JSON from response
                response_text = message.content[0].text.strip()

                # Handle markdown code blocks
                if response_text.startswith("```"):
                    lines = response_text.split("\n")
                    response_text = "\n".join(
                        line for line in lines if not line.startswith("```")
                    )

                # Parse JSON
                result = json.loads(response_text)

                # Convert to ConceptMapping objects
                mappings = []
                for m in result["mappings"]:
                    # Find the actual column to get semantic type and data type
                    semantic_type = SemanticType.TEXT  # Default
                    data_type = "TEXT"  # Default
                    for table in schema.tables:
                        if table.name == m["table"]:
                            for column in table.columns:
                                if column.name == m["column"]:
                                    data_type = column.data_type
                                    # Infer semantic type from SQL type
                                    sql_type = column.data_type.upper()
                                    if any(
                                        t in sql_type
                                        for t in ["INT", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC", "REAL"]
                                    ):
                                        semantic_type = SemanticType.FLOAT
                                    elif any(
                                        t in sql_type for t in ["DATE", "TIME", "TIMESTAMP"]
                                    ):
                                        semantic_type = SemanticType.DATE
                                    elif any(
                                        t in sql_type for t in ["BOOL", "TINYINT(1)"]
                                    ):
                                        semantic_type = SemanticType.BOOLEAN
                                    else:
                                        semantic_type = SemanticType.TEXT
                                    break

                    mapping = ConceptMapping(
                        customer_id=customer_id,
                        table_name=m["table"],
                        column_name=m["column"],
                        data_type=data_type,
                        semantic_type=semantic_type,
                        transformation=m.get("transformation"),
                    )
                    # Store additional metadata from LLM (not in model)
                    mapping._confidence = m.get("confidence", 0.8)
                    mapping._reasoning = m.get("reasoning", "")
                    mapping._concept = m["concept"]
                    mappings.append(mapping)

                # Validate that all concepts exist
                for mapping in mappings:
                    if mapping._concept not in self.kg.concepts:
                        raise ValueError(
                            f"Unknown semantic concept: {mapping._concept}. "
                            f"Available: {list(self.kg.concepts.keys())}"
                        )

                return mappings

            except (json.JSONDecodeError, KeyError, ValueError) as e:
                if attempt < max_retries:
                    user_prompt += f"\n\nPrevious attempt failed: {str(e)}\nPlease try again with valid JSON."
                    continue
                else:
                    raise ValueError(
                        f"Failed to analyze schema after {max_retries + 1} attempts. "
                        f"Last error: {str(e)}"
                    )

        raise ValueError("Unexpected error in schema analysis")

    def explain_mappings(
        self, mappings: List[ConceptMapping], include_low_confidence: bool = False
    ) -> str:
        """
        Generate a human-readable explanation of proposed mappings.

        Args:
            mappings: List of concept mappings
            include_low_confidence: Whether to include low-confidence mappings

        Returns:
            Human-readable explanation
        """
        lines = ["Proposed Schema Mappings:\n"]

        # Group by concept
        by_concept: Dict[str, List[ConceptMapping]] = {}
        for mapping in mappings:
            confidence = getattr(mapping, '_confidence', 1.0)
            concept = getattr(mapping, '_concept', 'unknown')
            if not include_low_confidence and confidence < 0.5:
                continue
            if concept not in by_concept:
                by_concept[concept] = []
            by_concept[concept].append(mapping)

        for concept, concept_mappings in sorted(by_concept.items()):
            concept_display = concept.replace("_", " ").title()
            lines.append(f"\n{concept_display}:")
            for mapping in concept_mappings:
                confidence = getattr(mapping, '_confidence', 1.0)
                confidence_pct = int(confidence * 100)
                location = f"{mapping.table_name}.{mapping.column_name}"
                line = f"  - {location} ({confidence_pct}% confidence)"
                if mapping.transformation:
                    line += f"\n    Transformation: {mapping.transformation}"
                lines.append(line)

        return "\n".join(lines)

    def validate_mappings(
        self,
        customer_id: str,
        schema: CustomerSchema,
        mappings: List[ConceptMapping],
    ) -> Tuple[List[ConceptMapping], List[str]]:
        """
        Validate proposed mappings against the actual schema.

        Args:
            customer_id: The customer identifier
            schema: The customer's database schema
            mappings: Proposed concept mappings

        Returns:
            Tuple of (valid_mappings, error_messages)
        """
        valid_mappings = []
        errors = []

        # Build a lookup of available tables and columns
        schema_lookup = {}
        for table in schema.tables:
            schema_lookup[table.name] = {
                col.name for col in table.columns
            }

        for mapping in mappings:
            # Check if table exists
            if mapping.table_name not in schema_lookup:
                errors.append(
                    f"Table '{mapping.table_name}' not found in schema for {customer_id}"
                )
                continue

            # Check if column exists
            if mapping.column_name not in schema_lookup[mapping.table_name]:
                errors.append(
                    f"Column '{mapping.column_name}' not found in table "
                    f"'{mapping.table_name}' for {customer_id}"
                )
                continue

            # Check if concept exists (stored as _concept attribute)
            concept = getattr(mapping, '_concept', None)
            if concept and concept not in self.kg.concepts:
                errors.append(f"Unknown concept: {concept}")
                continue

            # Mapping is valid
            valid_mappings.append(mapping)

        return valid_mappings, errors