alessandro trinca tornidor
feat: make /thesaurus-inflated-phrase agnostic, not bounded to synonyms - /thesaurus-custom fixed
a707261
| from collections import defaultdict | |
| from typing import Any, Dict, Set | |
| class JSONPathStructureAnalyzer: | |
| """ | |
| Analyze JSON structure using JSONPath expressions | |
| Example usage: | |
| analyzer = JSONPathStructureAnalyzer() | |
| analyzer.extract_all_paths({"success": True, "data": {"users": [{"id": 1}]}}) | |
| print(analyzer.get_structure_report()) | |
| """ | |
| def __init__(self): | |
| self.paths = set() | |
| self.types = defaultdict(set) | |
| self.samples = defaultdict(list) | |
| self.array_lengths = {} # New: Store array lengths separately | |
| def extract_all_paths(self, data: dict, max_samples: int = 3) -> Set[str]: | |
| """ | |
| Extract all possible JSONPath expressions from data | |
| Args: | |
| data: JSON data to analyze | |
| max_samples: Maximum number of sample values to collect per path | |
| Returns: | |
| Set of JSONPath expressions found in the data | |
| """ | |
| def _extract_recursive(obj: Any, path: str = "$"): | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| current_path = f"{path}.{key}" | |
| self.paths.add(current_path) | |
| self.types[current_path].add(type(value).__name__) | |
| if not isinstance(value, (dict, list)) and len(self.samples[current_path]) < max_samples: | |
| self.samples[current_path].append(str(value)) | |
| _extract_recursive(value, current_path) | |
| elif isinstance(obj, list): | |
| array_path = f"{path}[*]" | |
| self.paths.add(array_path) | |
| self.types[array_path].add("array") # Just store "array" as type | |
| self.array_lengths[array_path] = len(obj) # Store length separately | |
| if obj: # If array is not empty | |
| # Process each item in the array to capture all possible structures | |
| for item in obj: | |
| _extract_recursive(item, array_path) | |
| _extract_recursive(data) | |
| return self.paths | |
| def get_structure_report(self) -> str: | |
| """ | |
| Generate a structure report using JSONPath notation | |
| Returns: | |
| Formatted string showing all paths with their types and sample values | |
| """ | |
| report = [] | |
| for path in sorted(self.paths): | |
| types = list(self.types[path]) | |
| samples = self.samples.get(path, []) | |
| if "array" in types: | |
| array_length = self.array_lengths.get(path, 0) | |
| report.append(f"{path} -- array[{array_length}]") | |
| elif samples: | |
| if len(samples) > 1: | |
| unique_count = len(set(samples)) | |
| if unique_count > 1: | |
| sample_range = f"{samples[0]} .. {samples[-1]} ({unique_count} unique values)" | |
| else: | |
| sample_range = samples[0] | |
| else: | |
| sample_range = samples[0] | |
| report.append(f"{path} -- {sample_range}") | |
| else: | |
| type_info = "/".join(types) | |
| report.append(f"{path} -- {type_info}") | |
| return "\n".join(report) | |
| def get_paths_with_types(self) -> Dict[str, str]: | |
| """ | |
| Get all paths with their associated value types | |
| Returns: | |
| Dictionary mapping JSONPath expressions to their value types | |
| """ | |
| paths_with_types = {} | |
| for path in self.paths: | |
| types = list(self.types[path]) | |
| if "array" in types: | |
| paths_with_types[path] = "array" | |
| elif len(types) == 1: | |
| # Single type | |
| paths_with_types[path] = types[0] | |
| elif len(types) > 1: | |
| # Multiple types (mixed) | |
| paths_with_types[path] = f"mixed({', '.join(sorted(types))})" | |
| else: | |
| # No type info available | |
| paths_with_types[path] = "unknown" | |
| return paths_with_types | |
| def get_array_lengths(self) -> Dict[str, int]: | |
| """ | |
| Get array lengths for all array paths | |
| Returns: | |
| Dictionary mapping array paths to their lengths | |
| """ | |
| return self.array_lengths.copy() | |
| def get_detailed_type_report(self, get_samples: bool = True) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Get detailed type information for each path including samples | |
| Returns: | |
| Dictionary with detailed type information for each path | |
| """ | |
| detailed_report = {} | |
| for path in sorted(self.paths): | |
| types = list(self.types[path]) | |
| samples = self.samples.get(path, []) | |
| path_info = { | |
| "types": types, | |
| "primary_type": None, | |
| "is_array": "array" in types, | |
| # "samples": samples, | |
| # "sample_count": len(samples) | |
| } | |
| if get_samples: | |
| path_info["samples"] = samples | |
| path_info["sample_count"] = len(samples) | |
| # Add array length if it's an array | |
| if path_info["is_array"]: | |
| path_info["array_length"] = self.array_lengths.get(path, 0) | |
| # Determine primary type | |
| if path_info["is_array"]: | |
| path_info["primary_type"] = "array" | |
| elif len(types) == 1: | |
| path_info["primary_type"] = types[0] | |
| elif len(types) > 1: | |
| path_info["primary_type"] = f"mixed({', '.join(sorted(types))})" | |
| else: | |
| path_info["primary_type"] = "unknown" | |
| detailed_report[path] = path_info | |
| return detailed_report | |
| def compare_json_structures(self, other_data: dict) -> Dict[str, Any]: | |
| """ | |
| Compare this analyzer's data with another JSON structure | |
| Args: | |
| other_data: JSON data to compare against | |
| Returns: | |
| Dictionary containing detailed comparison results | |
| """ | |
| # Analyze the other data | |
| other_analyzer = JSONPathStructureAnalyzer() | |
| other_analyzer.extract_all_paths(other_data) | |
| # Get paths and types for both | |
| self_paths_types = self.get_paths_with_types() | |
| other_paths_types = other_analyzer.get_paths_with_types() | |
| # Get array lengths | |
| self_array_lengths = self.get_array_lengths() | |
| other_array_lengths = other_analyzer.get_array_lengths() | |
| # Find path differences | |
| self_only_paths = set(self_paths_types.keys()) - set(other_paths_types.keys()) | |
| other_only_paths = set(other_paths_types.keys()) - set(self_paths_types.keys()) | |
| common_paths = set(self_paths_types.keys()) & set(other_paths_types.keys()) | |
| # Analyze changes | |
| type_changes = {} | |
| value_differences = {} | |
| array_size_changes = {} | |
| for path in common_paths: | |
| self_type = self_paths_types[path] | |
| other_type = other_paths_types[path] | |
| # Check for type changes | |
| if self_type != other_type: | |
| type_changes[path] = { | |
| "old_type": self_type, | |
| "new_type": other_type | |
| } | |
| # Check for array size changes (now much cleaner!) | |
| if self_type == "array" and other_type == "array": | |
| self_length = self_array_lengths.get(path, 0) | |
| other_length = other_array_lengths.get(path, 0) | |
| if self_length != other_length: | |
| array_size_changes[path] = { | |
| "old_size": self_length, | |
| "new_size": other_length, | |
| "size_change": other_length - self_length | |
| } | |
| # Check for value differences (non-array paths) | |
| if self_type != "array" and other_type != "array": | |
| self_samples = self.samples.get(path, []) | |
| other_samples = other_analyzer.samples.get(path, []) | |
| if self_samples and other_samples: | |
| # Compare first sample values | |
| if self_samples[0] != other_samples[0]: | |
| value_differences[path] = { | |
| "old_value": self_samples[0], | |
| "new_value": other_samples[0], | |
| "old_samples": self_samples, | |
| "new_samples": other_samples | |
| } | |
| return { | |
| "added_paths": {path: other_paths_types[path] for path in other_only_paths}, | |
| "removed_paths": {path: self_paths_types[path] for path in self_only_paths}, | |
| "common_paths": {path: self_paths_types[path] for path in common_paths}, | |
| "type_changes": type_changes, | |
| "value_differences": value_differences, | |
| "array_size_changes": array_size_changes, | |
| "array_lengths_old": {path: length for path, length in self_array_lengths.items() if path in common_paths or path in self_only_paths}, | |
| "array_lengths_new": {path: length for path, length in other_array_lengths.items() if path in common_paths or path in other_only_paths}, | |
| "summary": { | |
| "total_paths_old": len(self_paths_types), | |
| "total_paths_new": len(other_paths_types), | |
| "paths_added": len(other_only_paths), | |
| "paths_removed": len(self_only_paths), | |
| "paths_common": len(common_paths), | |
| "type_changes_count": len(type_changes), | |
| "value_changes_count": len(value_differences), | |
| "array_size_changes_count": len(array_size_changes) | |
| } | |
| } | |
| def filter_paths_excluding_keys(self, exclude_keys: set[str]) -> set[str]: | |
| """ | |
| Filter existing paths to exclude those containing specific keys | |
| Args: | |
| exclude_keys: set of keys to exclude | |
| Returns: | |
| Filtered set of paths | |
| """ | |
| filtered_paths = set() | |
| for path in self.paths: | |
| # Check if any excluded key appears in the path | |
| path_contains_excluded = False | |
| for exclude_key in exclude_keys: | |
| if f".{exclude_key}" in path or f".{exclude_key}[" in path: | |
| path_contains_excluded = True | |
| break | |
| if not path_contains_excluded: | |
| filtered_paths.add(path) | |
| return filtered_paths | |
| def get_filtered_structure_report(self, exclude_keys: set[str] = None) -> str: | |
| """ | |
| Generate structure report excluding specific keys | |
| Args: | |
| exclude_keys: set of keys to exclude from report | |
| Returns: | |
| Filtered structure report | |
| """ | |
| if exclude_keys is None: | |
| exclude_keys = set() | |
| filtered_paths = self.filter_paths_excluding_keys(exclude_keys) | |
| report = [] | |
| for path in sorted(filtered_paths): | |
| types = list(self.types[path]) | |
| samples = self.samples.get(path, []) | |
| if "array" in types: | |
| array_length = self.array_lengths.get(path, 0) | |
| report.append(f"{path} -- array[{array_length}]") | |
| elif samples: | |
| if len(samples) > 1: | |
| unique_count = len(set(samples)) | |
| if unique_count > 1: | |
| sample_range = f"{samples[0]} .. {samples[-1]} ({unique_count} unique values)" | |
| else: | |
| sample_range = samples[0] | |
| else: | |
| sample_range = samples[0] | |
| report.append(f"{path} -- {sample_range}") | |
| else: | |
| type_info = "/".join(types) | |
| report.append(f"{path} -- {type_info}") | |
| return "\n".join(report) | |
| def get_filtered_paths_with_types(self, exclude_keys: set[str] = None) -> dict[str, str]: | |
| """ | |
| Get paths with types excluding specific keys | |
| Args: | |
| exclude_keys: set of keys to exclude | |
| Returns: | |
| Dictionary mapping filtered JSONPath expressions to their value types | |
| """ | |
| if exclude_keys is None: | |
| exclude_keys = set() | |
| filtered_paths = self.filter_paths_excluding_keys(exclude_keys) | |
| paths_with_types = {} | |
| for path in filtered_paths: | |
| types = list(self.types[path]) | |
| if "array" in types: | |
| paths_with_types[path] = "array" | |
| elif len(types) == 1: | |
| paths_with_types[path] = types[0] | |
| elif len(types) > 1: | |
| paths_with_types[path] = f"mixed({', '.join(sorted(types))})" | |
| else: | |
| paths_with_types[path] = "unknown" | |
| return paths_with_types | |
| def get_filtered_detailed_type_report(self, exclude_keys: set[str] = None) -> dict[str, dict[str, Any]]: | |
| """ | |
| Get detailed type information excluding specific keys | |
| Args: | |
| exclude_keys: set of keys to exclude | |
| Returns: | |
| Dictionary with detailed type information for filtered paths | |
| """ | |
| if exclude_keys is None: | |
| exclude_keys = set() | |
| filtered_paths = self.filter_paths_excluding_keys(exclude_keys) | |
| detailed_report = {} | |
| for path in sorted(filtered_paths): | |
| types = list(self.types[path]) | |
| samples = self.samples.get(path, []) | |
| path_info = { | |
| "types": types, | |
| "primary_type": None, | |
| "is_array": "array" in types, | |
| "samples": samples, | |
| "sample_count": len(samples) | |
| } | |
| if path_info["is_array"]: | |
| path_info["array_length"] = self.array_lengths.get(path, 0) | |
| if path_info["is_array"]: | |
| path_info["primary_type"] = "array" | |
| elif len(types) == 1: | |
| path_info["primary_type"] = types[0] | |
| elif len(types) > 1: | |
| path_info["primary_type"] = f"mixed({', '.join(sorted(types))})" | |
| else: | |
| path_info["primary_type"] = "unknown" | |
| detailed_report[path] = path_info | |
| return detailed_report | |
| def analyze_with_jsonpath(data: dict) -> str: | |
| """ | |
| Analyze JSON structure using JSONPath | |
| Args: | |
| data: Dictionary containing JSON data to analyze | |
| Returns: | |
| Formatted structure report string | |
| """ | |
| analyzer = JSONPathStructureAnalyzer() | |
| analyzer.extract_all_paths(data) | |
| return analyzer.get_structure_report() | |
| def analyze_with_jsonpath_types(data: dict) -> Dict[str, str]: | |
| """ | |
| Analyze JSON structure and return paths with their types | |
| Args: | |
| data: Dictionary containing JSON data to analyze | |
| Returns: | |
| Dictionary mapping JSONPath expressions to their value types | |
| """ | |
| analyzer = JSONPathStructureAnalyzer() | |
| analyzer.extract_all_paths(data) | |
| return analyzer.get_paths_with_types() | |
| def analyze_with_jsonpath_detailed(data: dict) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Analyze JSON structure and return detailed type information | |
| Args: | |
| data: Dictionary containing JSON data to analyze | |
| Returns: | |
| Dictionary with detailed type information for each path | |
| """ | |
| analyzer = JSONPathStructureAnalyzer() | |
| analyzer.extract_all_paths(data) | |
| return analyzer.get_detailed_type_report() | |
| def compare_json_with_jsonpath_structures(old_data: dict, new_data: dict, print_report: bool = True) -> Dict[str, Any]: | |
| """ | |
| Compare two JSON structures using JSONPath analysis | |
| Args: | |
| old_data: Original JSON structure | |
| new_data: New JSON structure to compare against | |
| print_report: Whether to print the comparison report | |
| Returns: | |
| Dictionary containing detailed comparison results | |
| """ | |
| # Analyze old structure | |
| old_analyzer = JSONPathStructureAnalyzer() | |
| old_analyzer.extract_all_paths(old_data) | |
| # Compare with new structure | |
| comparison = old_analyzer.compare_json_structures(new_data) | |
| if print_report: | |
| print_comparison_report(comparison) | |
| return comparison | |
| def print_comparison_report(comparison: Dict[str, Any]): | |
| """ | |
| Print a formatted comparison report | |
| Args: | |
| comparison: Result from compare_json_structures method | |
| """ | |
| print("=== JSON STRUCTURE COMPARISON REPORT ===\n") | |
| # Summary | |
| summary = comparison["summary"] | |
| print(f"📊 SUMMARY:") | |
| print(f" Old structure: {summary['total_paths_old']} paths") | |
| print(f" New structure: {summary['total_paths_new']} paths") | |
| print(f" Added: {summary['paths_added']} paths") | |
| print(f" Removed: {summary['paths_removed']} paths") | |
| print(f" Common: {summary['paths_common']} paths") | |
| print(f" Type changes: {summary['type_changes_count']}") | |
| print(f" Value changes: {summary['value_changes_count']}") | |
| print(f" Array size changes: {summary['array_size_changes_count']}") | |
| print() | |
| # Added paths | |
| if comparison["added_paths"]: | |
| print("➕ ADDED PATHS:") | |
| for path, type_info in sorted(comparison["added_paths"].items()): | |
| print(f" {path} ({type_info})") | |
| print() | |
| # Removed paths | |
| if comparison["removed_paths"]: | |
| print("➖ REMOVED PATHS:") | |
| for path, type_info in sorted(comparison["removed_paths"].items()): | |
| print(f" {path} ({type_info})") | |
| print() | |
| # Type changes | |
| if comparison["type_changes"]: | |
| print("🔄 TYPE CHANGES:") | |
| for path, change in sorted(comparison["type_changes"].items()): | |
| print(f" {path}: {change['old_type']} → {change['new_type']}") | |
| print() | |
| # Array size changes | |
| if comparison["array_size_changes"]: | |
| print("📏 ARRAY SIZE CHANGES:") | |
| for path, change in sorted(comparison["array_size_changes"].items()): | |
| size_change = change['size_change'] | |
| direction = "📈" if size_change > 0 else "📉" | |
| print(f" {direction} {path}: {change['old_size']} → {change['new_size']} (Δ{size_change:+d})") | |
| print() | |
| # Value differences | |
| if comparison["value_differences"]: | |
| print("💱 VALUE CHANGES:") | |
| for path, change in sorted(comparison["value_differences"].items()): | |
| print(f" {path}: '{change['old_value']}' → '{change['new_value']}'") | |
| print() | |
| def analyze_dict_list_simple(dict_list: list[dict], exclude_keys: set[str] = None) -> list[dict[str, Any]]: | |
| """ | |
| Analyze each dict separately and return list of results | |
| Args: | |
| dict_list: list of dictionaries to analyze | |
| exclude_keys: set of keys to exclude from analysis | |
| Returns: | |
| list of individual analysis results | |
| """ | |
| if exclude_keys is None: | |
| exclude_keys = set() | |
| results = [] | |
| for i, data_dict in enumerate(dict_list): | |
| analyzer = JSONPathStructureAnalyzer() | |
| analyzer.extract_all_paths(data_dict) | |
| result = { | |
| "index": i, | |
| "paths_with_types": analyzer.get_filtered_paths_with_types(exclude_keys), | |
| "detailed_report": analyzer.get_filtered_detailed_type_report(exclude_keys), | |
| "array_lengths": {k: v for k, v in analyzer.get_array_lengths().items() | |
| if k in analyzer.filter_paths_excluding_keys(exclude_keys)}, | |
| "structure_report": analyzer.get_filtered_structure_report(exclude_keys) | |
| } | |
| results.append(result) | |
| return results | |