Spaces:
Paused
Paused
| """Query compiler to generate customer-specific SQL from semantic query plans.""" | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Set | |
| from schema_translator.knowledge_graph import SchemaKnowledgeGraph | |
| from schema_translator.models import ( | |
| QueryFilter, | |
| QueryOperator, | |
| SemanticQueryPlan, | |
| SemanticType, | |
| ) | |
| class QueryCompiler: | |
| """Compiles semantic query plans into customer-specific SQL.""" | |
| def __init__(self, knowledge_graph: SchemaKnowledgeGraph): | |
| """Initialize the query compiler. | |
| Args: | |
| knowledge_graph: Knowledge graph with schema mappings | |
| """ | |
| self.kg = knowledge_graph | |
| def compile_for_customer( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str | |
| ) -> str: | |
| """Compile a semantic query plan to SQL for a specific customer. | |
| Args: | |
| query_plan: Semantic query plan to compile | |
| customer_id: Customer identifier | |
| Returns: | |
| SQL query string | |
| Raises: | |
| ValueError: If required mappings are missing | |
| """ | |
| # Collect all tables needed for this query | |
| tables_needed = self._get_required_tables(query_plan, customer_id) | |
| # Determine primary table | |
| primary_table = self._determine_primary_table(query_plan, customer_id, tables_needed) | |
| # Generate SELECT clause | |
| select_clause = self._generate_select(query_plan, customer_id, tables_needed, primary_table) | |
| # Generate FROM clause with JOINs if needed | |
| from_clause = self._generate_from(query_plan, customer_id, tables_needed) | |
| # Generate WHERE clause | |
| where_clause = self._generate_where(query_plan, customer_id, primary_table) | |
| # Generate GROUP BY clause | |
| group_by_clause = self._generate_group_by(query_plan, customer_id, primary_table) | |
| # Generate ORDER BY clause | |
| order_by_clause = self._generate_order_by(query_plan, customer_id, primary_table) | |
| # Generate LIMIT clause | |
| limit_clause = self._generate_limit(query_plan) | |
| # Assemble the query | |
| sql_parts = [select_clause, from_clause] | |
| if where_clause: | |
| sql_parts.append(where_clause) | |
| if group_by_clause: | |
| sql_parts.append(group_by_clause) | |
| if order_by_clause: | |
| sql_parts.append(order_by_clause) | |
| if limit_clause: | |
| sql_parts.append(limit_clause) | |
| return "\n".join(sql_parts) | |
| def _get_required_tables( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str | |
| ) -> Set[str]: | |
| """Get all tables required for this query. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| Returns: | |
| Set of table names needed | |
| """ | |
| tables = set() | |
| # Get tables from projections | |
| if query_plan.projections: | |
| for concept_id in query_plan.projections: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if mapping: | |
| # Skip tables with transformations (they use subqueries, not JOINs) | |
| if not mapping.transformation: | |
| tables.add(mapping.table_name) | |
| tables.update(mapping.join_requirements) | |
| else: | |
| # If no projections specified (select all), get tables from all concepts | |
| all_concepts = self.kg.get_all_concepts() | |
| for concept in all_concepts: | |
| mapping = self.kg.get_mapping(concept.concept_id, customer_id) | |
| if mapping: | |
| # Skip tables with transformations (they use subqueries, not JOINs) | |
| if not mapping.transformation: | |
| tables.add(mapping.table_name) | |
| tables.update(mapping.join_requirements) | |
| # Get tables from filters | |
| for filter in query_plan.filters: | |
| mapping = self.kg.get_mapping(filter.concept, customer_id) | |
| if mapping: | |
| # If mapping has a transformation (subquery), don't add its table to joins | |
| # The transformation will be used directly in WHERE clause | |
| if not mapping.transformation: | |
| tables.add(mapping.table_name) | |
| tables.update(mapping.join_requirements) | |
| else: | |
| # For transformations, only add join_requirements (not the transformed table itself) | |
| tables.update(mapping.join_requirements) | |
| # Get tables from aggregations | |
| if query_plan.aggregations: | |
| for agg in query_plan.aggregations: | |
| mapping = self.kg.get_mapping(agg.concept, customer_id) | |
| if mapping: | |
| tables.add(mapping.table_name) | |
| tables.update(mapping.join_requirements) | |
| # Get tables from group_by | |
| if query_plan.group_by: | |
| for concept_id in query_plan.group_by: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if mapping: | |
| tables.add(mapping.table_name) | |
| tables.update(mapping.join_requirements) | |
| return tables | |
| def _generate_select( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| tables_needed: Optional[Set[str]] = None, | |
| primary_table: Optional[str] = None | |
| ) -> str: | |
| """Generate SELECT clause. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| tables_needed: Set of tables required (for DISTINCT determination) | |
| primary_table: Primary table name for the query | |
| Returns: | |
| SELECT clause | |
| """ | |
| select_items = [] | |
| # Handle aggregations | |
| if query_plan.aggregations: | |
| for agg in query_plan.aggregations: | |
| mapping = self.kg.get_mapping(agg.concept, customer_id) | |
| if not mapping: | |
| raise ValueError(f"No mapping for concept '{agg.concept}' in {customer_id}") | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| alias = agg.alias or f"{agg.function.lower()}_{agg.concept}" | |
| select_items.append(f"{agg.function}({column_expr}) AS {alias}") | |
| # Add group by columns to select | |
| if query_plan.group_by: | |
| for concept_id in query_plan.group_by: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if mapping: | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| select_items.append(f"{column_expr} AS {concept_id}") | |
| # Handle regular projections | |
| elif query_plan.projections: | |
| for concept_id in query_plan.projections: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if not mapping: | |
| raise ValueError(f"No mapping for concept '{concept_id}' in {customer_id}") | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| select_items.append(f"{column_expr} AS {concept_id}") | |
| else: | |
| # Select all conceptual fields if no projections specified | |
| # For multi-table queries, explicitly select columns to avoid foreign key columns | |
| if tables_needed and len(tables_needed) > 1: | |
| # Get all concepts that map to this customer | |
| all_concepts = self.kg.get_all_concepts() | |
| for concept in all_concepts: | |
| mapping = self.kg.get_mapping(concept.concept_id, customer_id) | |
| if mapping and mapping.table_name in tables_needed: | |
| # Skip if transformation - those are handled via subqueries | |
| if not mapping.transformation: | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| select_items.append(f"{column_expr} AS {concept.concept_id}") | |
| else: | |
| # Include transformed fields too | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| select_items.append(f"{column_expr} AS {concept.concept_id}") | |
| else: | |
| # Single table query - safe to use SELECT * | |
| select_items.append("*") | |
| # Add DISTINCT for multi-table queries to avoid duplicates from JOINs | |
| # Especially important for customer_b with 1-to-many relationships | |
| distinct = "" | |
| if tables_needed and len(tables_needed) > 1 and not query_plan.aggregations: | |
| distinct = "DISTINCT " | |
| return f"SELECT {distinct}" + ", ".join(select_items) | |
| def _get_column_expression( | |
| self, | |
| mapping, | |
| customer_id: str, | |
| primary_table: Optional[str] = None | |
| ) -> str: | |
| """Get the column expression with transformations if needed. | |
| Args: | |
| mapping: ConceptMapping object | |
| customer_id: Customer identifier | |
| primary_table: Primary table name for the query (used in transformation references) | |
| Returns: | |
| Column expression (possibly with transformation) | |
| """ | |
| table_prefix = self._get_table_alias(mapping.table_name) | |
| column_ref = f"{table_prefix}.{mapping.column_name}" | |
| # Apply transformation if specified | |
| if mapping.transformation: | |
| # Replace placeholders in transformation | |
| result = mapping.transformation.replace("{column}", column_ref) | |
| # For customer_b contract_status, replace 'id' with primary table reference | |
| if primary_table and "contract_id = id" in result: | |
| primary_alias = self._get_table_alias(primary_table) | |
| result = result.replace("contract_id = id", f"contract_id = {primary_alias}.id") | |
| return result | |
| return column_ref | |
| def _generate_from( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| tables_needed: Set[str] | |
| ) -> str: | |
| """Generate FROM clause with JOINs if needed. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| tables_needed: Set of tables required | |
| Returns: | |
| FROM clause with JOINs | |
| """ | |
| if not tables_needed: | |
| raise ValueError("No tables identified for query") | |
| # Determine primary table (most frequently referenced or first in projections) | |
| primary_table = self._determine_primary_table(query_plan, customer_id, tables_needed) | |
| from_parts = [f"FROM {primary_table} AS {self._get_table_alias(primary_table)}"] | |
| # Handle multi-table queries (like Customer B) | |
| if len(tables_needed) > 1: | |
| # Add JOINs for additional tables | |
| for table in tables_needed: | |
| if table != primary_table: | |
| join_clause = self._generate_join(primary_table, table, customer_id) | |
| if join_clause: | |
| from_parts.append(join_clause) | |
| return "\n".join(from_parts) | |
| def _determine_primary_table( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| tables_needed: Set[str] | |
| ) -> str: | |
| """Determine which table should be the primary table. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| tables_needed: Set of tables required | |
| Returns: | |
| Primary table name | |
| """ | |
| # For single table, it's obvious | |
| if len(tables_needed) == 1: | |
| return list(tables_needed)[0] | |
| # For multi-table, prefer the first projection's table | |
| if query_plan.projections: | |
| first_concept = query_plan.projections[0] | |
| mapping = self.kg.get_mapping(first_concept, customer_id) | |
| if mapping and mapping.table_name in tables_needed: | |
| return mapping.table_name | |
| # Default to first table in sorted order | |
| return sorted(tables_needed)[0] | |
| def _generate_join( | |
| self, | |
| primary_table: str, | |
| join_table: str, | |
| customer_id: str | |
| ) -> Optional[str]: | |
| """Generate JOIN clause for a secondary table. | |
| Args: | |
| primary_table: Primary table name | |
| join_table: Table to join | |
| customer_id: Customer identifier | |
| Returns: | |
| JOIN clause or None if no join possible | |
| """ | |
| # Customer B specific JOINs | |
| if customer_id == "customer_b": | |
| primary_alias = self._get_table_alias(primary_table) | |
| join_alias = self._get_table_alias(join_table) | |
| if primary_table == "contract_headers": | |
| if join_table == "renewal_schedule": | |
| return f"JOIN {join_table} AS {join_alias} ON {primary_alias}.id = {join_alias}.contract_id" | |
| elif join_table == "contract_status_history": | |
| return f"JOIN {join_table} AS {join_alias} ON {primary_alias}.id = {join_alias}.contract_id" | |
| elif primary_table == "renewal_schedule": | |
| if join_table == "contract_headers": | |
| return f"JOIN {join_table} AS {join_alias} ON {primary_alias}.contract_id = {join_alias}.id" | |
| elif primary_table == "contract_status_history": | |
| if join_table == "contract_headers": | |
| return f"JOIN {join_table} AS {join_alias} ON {primary_alias}.contract_id = {join_alias}.id" | |
| return None | |
| def _get_table_alias(self, table_name: str) -> str: | |
| """Get a short alias for a table name. | |
| Args: | |
| table_name: Full table name | |
| Returns: | |
| Table alias | |
| """ | |
| # Simple aliases | |
| alias_map = { | |
| "contracts": "c", | |
| "contract_headers": "h", | |
| "contract_status_history": "s", | |
| "renewal_schedule": "r" | |
| } | |
| return alias_map.get(table_name, table_name[0]) | |
| def _generate_where( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| primary_table: Optional[str] = None | |
| ) -> Optional[str]: | |
| """Generate WHERE clause from filters. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| primary_table: Primary table name for the query | |
| Returns: | |
| WHERE clause or None if no filters | |
| """ | |
| if not query_plan.filters: | |
| return None | |
| conditions = [] | |
| for filter in query_plan.filters: | |
| condition = self._compile_filter(filter, customer_id, primary_table) | |
| if condition: | |
| conditions.append(condition) | |
| if conditions: | |
| return "WHERE " + " AND ".join(conditions) | |
| return None | |
| def _compile_filter( | |
| self, | |
| filter: QueryFilter, | |
| customer_id: str, | |
| primary_table: Optional[str] = None | |
| ) -> str: | |
| """Compile a single filter to SQL condition. | |
| Args: | |
| filter: Query filter | |
| customer_id: Customer identifier | |
| primary_table: Primary table name for the query | |
| Returns: | |
| SQL condition | |
| """ | |
| mapping = self.kg.get_mapping(filter.concept, customer_id) | |
| if not mapping: | |
| raise ValueError(f"No mapping for concept '{filter.concept}' in {customer_id}") | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| # Handle different operators | |
| if filter.operator == QueryOperator.EQUALS: | |
| return f"{column_expr} = {self._quote_value(filter.value)}" | |
| elif filter.operator == QueryOperator.NOT_EQUALS: | |
| return f"{column_expr} != {self._quote_value(filter.value)}" | |
| elif filter.operator == QueryOperator.GREATER_THAN: | |
| return f"{column_expr} > {filter.value}" | |
| elif filter.operator == QueryOperator.GREATER_THAN_OR_EQUAL: | |
| return f"{column_expr} >= {filter.value}" | |
| elif filter.operator == QueryOperator.LESS_THAN: | |
| return f"{column_expr} < {filter.value}" | |
| elif filter.operator == QueryOperator.LESS_THAN_OR_EQUAL: | |
| return f"{column_expr} <= {filter.value}" | |
| elif filter.operator == QueryOperator.IN: | |
| values = ", ".join([self._quote_value(v) for v in filter.value]) | |
| return f"{column_expr} IN ({values})" | |
| elif filter.operator == QueryOperator.CONTAINS: | |
| return f"{column_expr} LIKE {self._quote_value(f'%{filter.value}%')}" | |
| elif filter.operator == QueryOperator.WITHIN_NEXT_DAYS: | |
| # Handle date vs days_remaining | |
| if mapping.semantic_type == SemanticType.DAYS_REMAINING: | |
| return f"{column_expr} BETWEEN 0 AND {filter.value}" | |
| else: | |
| # Date comparison | |
| return f"{column_expr} BETWEEN CURRENT_DATE AND DATE(CURRENT_DATE, '+{filter.value} days')" | |
| elif filter.operator == QueryOperator.DATE_RANGE: | |
| # Expect filter.value to be a tuple (start, end) | |
| start, end = filter.value | |
| return f"{column_expr} BETWEEN {self._quote_value(start)} AND {self._quote_value(end)}" | |
| else: | |
| raise ValueError(f"Unsupported operator: {filter.operator}") | |
| def _generate_group_by( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| primary_table: Optional[str] = None | |
| ) -> Optional[str]: | |
| """Generate GROUP BY clause. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| primary_table: Primary table name for the query | |
| Returns: | |
| GROUP BY clause or None | |
| """ | |
| if not query_plan.group_by: | |
| return None | |
| group_by_items = [] | |
| for concept_id in query_plan.group_by: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if mapping: | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| group_by_items.append(column_expr) | |
| if group_by_items: | |
| return "GROUP BY " + ", ".join(group_by_items) | |
| return None | |
| def _generate_order_by( | |
| self, | |
| query_plan: SemanticQueryPlan, | |
| customer_id: str, | |
| primary_table: Optional[str] = None | |
| ) -> Optional[str]: | |
| """Generate ORDER BY clause. | |
| Args: | |
| query_plan: Semantic query plan | |
| customer_id: Customer identifier | |
| primary_table: Primary table name for the query | |
| Returns: | |
| ORDER BY clause or None | |
| """ | |
| if not query_plan.order_by: | |
| return None | |
| order_items = [] | |
| for concept_id, direction in query_plan.order_by: | |
| mapping = self.kg.get_mapping(concept_id, customer_id) | |
| if mapping: | |
| column_expr = self._get_column_expression(mapping, customer_id, primary_table) | |
| order_items.append(f"{column_expr} {direction.upper()}") | |
| if order_items: | |
| return "ORDER BY " + ", ".join(order_items) | |
| return None | |
| def _generate_limit(self, query_plan: SemanticQueryPlan) -> Optional[str]: | |
| """Generate LIMIT clause. | |
| Args: | |
| query_plan: Semantic query plan | |
| Returns: | |
| LIMIT clause or None | |
| """ | |
| if query_plan.limit: | |
| return f"LIMIT {query_plan.limit}" | |
| return None | |
| def _quote_value(self, value) -> str: | |
| """Quote a value for SQL. | |
| Args: | |
| value: Value to quote | |
| Returns: | |
| Quoted value | |
| """ | |
| if isinstance(value, str): | |
| # Escape single quotes | |
| escaped = value.replace("'", "''") | |
| return f"'{escaped}'" | |
| elif isinstance(value, bool): | |
| return "1" if value else "0" | |
| elif value is None: | |
| return "NULL" | |
| else: | |
| return str(value) | |