Spaces:
Paused
Paused
| import os | |
| import tempfile | |
| import torch | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| import cv2 | |
| from diffusers import DiffusionPipeline | |
| import cupy as cp | |
| from cupyx.scipy.ndimage import label as cp_label | |
| from cupyx.scipy.ndimage import binary_dilation | |
| from sklearn.cluster import DBSCAN | |
| import trimesh | |
| class GPUSatelliteModelGenerator: | |
| def __init__(self, building_height=0.05): | |
| self.building_height = building_height | |
| # Add grass and tree colors | |
| self.grass_colors = cp.array([ | |
| [47, 70, 69], # Light green grass | |
| [40, 60, 55], | |
| [45, 65, 60], | |
| [50, 75, 65] | |
| ]) | |
| self.tree_colors = cp.array([ | |
| [19, 25, 16], # Dark green trees | |
| [26, 33, 23], | |
| [22, 30, 20], | |
| [24, 35, 25] | |
| ]) | |
| # Expanded water colors | |
| self.water_colors = cp.array([ | |
| [40, 18, 4], # Dark blue water | |
| [39, 25, 6], | |
| [167, 225, 217], | |
| [67, 101, 97], | |
| [53, 83, 84], | |
| [47, 94, 100], | |
| [73, 131, 135] | |
| ]) | |
| # Existing color arrays with optimized memory layout | |
| self.shadow_colors = cp.asarray([ | |
| [31, 42, 76], | |
| [58, 64, 92], | |
| [15, 27, 56], | |
| [21, 22, 50], | |
| [76, 81, 99] | |
| ], order='C') # Use C-contiguous memory layout | |
| self.road_colors = cp.asarray([ | |
| [187, 182, 175], | |
| [138, 138, 138], | |
| [142, 142, 129], | |
| [202, 199, 189] | |
| ], order='C') | |
| # Output colors (BGR for OpenCV) - optimized memory layout | |
| self.colors = { | |
| 'black': cp.asarray([0, 0, 0], order='C'), # Shadows | |
| 'blue': cp.asarray([255, 0, 0], order='C'), # Water | |
| 'dark_green': cp.asarray([0, 100, 0], order='C'), # Trees | |
| 'light_green': cp.asarray([0, 255, 0], order='C'), # Grass | |
| 'gray': cp.asarray([128, 128, 128], order='C'), # Roads | |
| 'brown': cp.asarray([0, 140, 255], order='C'), # Terrain | |
| 'white': cp.asarray([255, 255, 255], order='C'), # Buildings | |
| 'salmon': cp.asarray([128, 128, 255], order='C') # Roofs | |
| } | |
| # Convert all color arrays to HSV space at initialization | |
| self.initialize_hsv_colors() | |
| # Pre-compute kernels for morphological operations | |
| self.cleanup_kernel = cp.ones((3, 3), dtype=bool) | |
| self.cleanup_kernel[1, 1] = False | |
| self.tree_kernel = cp.ones((5, 5), dtype=bool) | |
| # Optimization parameters | |
| self.min_area = 1000 | |
| self.eps = 0.3 | |
| self.min_samples = 5 | |
| def initialize_hsv_colors(self): | |
| """Initialize all HSV color spaces at once""" | |
| color_arrays = { | |
| 'grass': self.grass_colors, | |
| 'tree': self.tree_colors, | |
| 'water': self.water_colors, | |
| 'shadow': self.shadow_colors, | |
| 'road': self.road_colors | |
| } | |
| self.hsv_colors = {} | |
| self.tolerances = { | |
| 'grass': {'hue': 15, 'sat': 0.2, 'val': 0.15}, | |
| 'tree': {'hue': 12, 'sat': 0.25, 'val': 0.15}, | |
| 'water': {'hue': 25, 'sat': 0.2, 'val': 0.25}, | |
| 'shadow': {'hue': 15, 'sat': 0.15, 'val': 0.12}, | |
| 'road': {'hue': 10, 'sat': 0.12, 'val': 0.15} | |
| } | |
| for name, colors in color_arrays.items(): | |
| hsv = cv2.cvtColor(colors.get().reshape(-1, 1, 3).astype(np.uint8), | |
| cv2.COLOR_RGB2HSV) | |
| hsv_gpu = cp.asarray(hsv.reshape(-1, 3)) | |
| hsv_gpu[:, 0] = hsv_gpu[:, 0] * 2 # Scale hue to 0-360 | |
| hsv_gpu[:, 1:] = hsv_gpu[:, 1:] / 255 # Normalize S and V | |
| self.hsv_colors[name] = hsv_gpu | |
| # Use CuPy's JIT compilation | |
| def gpu_color_distance_hsv(pixel_hsv, reference_hsv, hue_tolerance, sat_tolerance, val_tolerance): | |
| """Optimized HSV color distance calculation using CuPy's JIT""" | |
| h_diff = cp.minimum(cp.abs(pixel_hsv[0] - reference_hsv[0]), | |
| 360 - cp.abs(pixel_hsv[0] - reference_hsv[0])) | |
| s_diff = cp.abs(pixel_hsv[1] - reference_hsv[1]) | |
| v_diff = cp.abs(pixel_hsv[2] - reference_hsv[2]) | |
| return (h_diff <= hue_tolerance) & \ | |
| (s_diff <= sat_tolerance) & \ | |
| (v_diff <= val_tolerance) | |
| def generate_tree_vertices(self, tree_mask, base_vertices): | |
| """Generate randomized tree heights and positions""" | |
| tree_positions = cp.where(tree_mask) | |
| num_trees = len(tree_positions[0]) | |
| # Return original vertices if no trees detected | |
| if num_trees == 0: | |
| return base_vertices | |
| # Random height variation for trees | |
| tree_heights = cp.random.uniform(0.15, 0.25, num_trees) | |
| # Create vertex displacements for tree geometry | |
| tree_vertices = base_vertices.copy() | |
| # Get indices for tree positions | |
| tree_indices = cp.ravel_multi_index(tree_positions, tree_mask.shape) | |
| # Add height offsets to tree positions | |
| tree_vertices[tree_indices, 1] += tree_heights | |
| return tree_vertices | |
| def segment_image_gpu(self, img): | |
| """Optimized GPU-accelerated image segmentation""" | |
| # Transfer image to GPU with optimal memory layout | |
| gpu_img = cp.asarray(img, order='C') | |
| gpu_hsv = cp.asarray(cv2.cvtColor(img, cv2.COLOR_BGR2HSV), order='C') | |
| height, width = img.shape[:2] | |
| output = cp.zeros_like(gpu_img, order='C') | |
| # Prepare HSV data | |
| hsv_pixels = gpu_hsv.reshape(-1, 3) | |
| h, s, v = hsv_pixels.T | |
| h = h * 2 # Convert to 0-360 range | |
| s = s / 255 | |
| v = v / 255 | |
| # Initialize masks with pre-allocated memory | |
| masks = { | |
| 'shadow': cp.zeros(height * width, dtype=bool), | |
| 'road': cp.zeros(height * width, dtype=bool), | |
| 'water': cp.zeros(height * width, dtype=bool), | |
| 'grass': cp.zeros(height * width, dtype=bool), | |
| 'tree': cp.zeros(height * width, dtype=bool) | |
| } | |
| # Parallel color matching using CuPy's optimized operations | |
| for category, hsv_refs in self.hsv_colors.items(): | |
| tolerance = self.tolerances[category] | |
| for ref_hsv in hsv_refs: | |
| masks[category] |= self.gpu_color_distance_hsv( | |
| cp.stack([h, s, v]), | |
| ref_hsv, | |
| tolerance['hue'], | |
| tolerance['sat'], | |
| tolerance['val'] | |
| ) | |
| # Optimized terrain and building detection | |
| vegetation_mask = ((h >= 40) & (h <= 150) & (s >= 0.15)) | |
| terrain_mask = ((h >= 15) & (h <= 35) & (s >= 0.15) & (s <= 0.6)) | |
| building_mask = ~(masks['shadow'] | masks['water'] | masks['road'] | | |
| masks['grass'] | masks['tree'] | vegetation_mask | | |
| terrain_mask) | |
| # Apply masks efficiently using CuPy's advanced indexing | |
| output_flat = output.reshape(-1, 3) | |
| for category, color_name in [ | |
| ('shadow', 'black'), | |
| ('water', 'blue'), | |
| ('grass', 'light_green'), | |
| ('tree', 'dark_green'), | |
| ('road', 'gray') | |
| ]: | |
| output_flat[masks[category]] = self.colors[color_name] | |
| output_flat[terrain_mask] = self.colors['brown'] | |
| output_flat[building_mask] = self.colors['white'] | |
| # Reshape and clean up | |
| segmented = output.reshape(height, width, 3) | |
| segmented = self.apply_morphological_cleanup(segmented) | |
| return segmented | |
| def apply_morphological_cleanup(self, segmented): | |
| """Apply optimized morphological operations for cleanup""" | |
| for _ in range(2): # Two passes for better results | |
| for color_name, color_value in self.colors.items(): | |
| if color_name in ['white', 'dark_green']: # Skip buildings and trees | |
| continue | |
| color_mask = cp.all(segmented == color_value, axis=2) | |
| dilated = binary_dilation(color_mask, structure=self.cleanup_kernel) | |
| building_pixels = cp.all(segmented == self.colors['white'], axis=2) | |
| neighbor_count = cp.sum(dilated) | |
| if neighbor_count > 5: | |
| segmented[building_pixels & dilated] = color_value | |
| return segmented | |
| def estimate_heights_gpu(self, img, segmented): | |
| """GPU-accelerated height estimation with roof consideration""" | |
| gpu_segmented = cp.asarray(segmented) | |
| buildings_mask = cp.logical_or( | |
| cp.all(gpu_segmented == self.colors['white'], axis=2), | |
| cp.all(gpu_segmented == self.colors['salmon'], axis=2) | |
| ) | |
| shadows_mask = cp.all(gpu_segmented == self.colors['black'], axis=2) | |
| # Connected components labeling on GPU | |
| labeled_array, num_features = cp_label(buildings_mask) | |
| # Calculate areas using GPU | |
| areas = cp.bincount(labeled_array.ravel())[1:] | |
| max_area = cp.max(areas) if len(areas) > 0 else 1 | |
| height_map = cp.zeros_like(labeled_array, dtype=cp.float32) | |
| # Process each building/roof | |
| for label in range(1, num_features + 1): | |
| building_mask = (labeled_array == label) | |
| if not cp.any(building_mask): | |
| continue | |
| area = areas[label-1] | |
| size_factor = 0.3 + 0.7 * (area / max_area) | |
| # Check if this is a roof (salmon color) | |
| is_roof = cp.any(cp.all(gpu_segmented[building_mask] == self.colors['salmon'], axis=1)) | |
| # Adjust height for roofs (typically smaller residential buildings) | |
| if is_roof: | |
| size_factor *= 0.8 # Slightly lower height for residential buildings | |
| # Calculate shadow influence | |
| dilated = binary_dilation(building_mask, structure=cp.ones((5,5))) | |
| shadow_ratio = cp.sum(dilated & shadows_mask) / cp.sum(dilated) | |
| shadow_factor = 0.2 + 0.8 * shadow_ratio | |
| final_height = size_factor * shadow_factor | |
| height_map[building_mask] = final_height | |
| return height_map.get() * 0.25 | |
| def generate_mesh_gpu(self, height_map, texture_img): | |
| """Generate optimized 3D mesh with tree geometry""" | |
| height_map_gpu = cp.asarray(height_map) | |
| texture_img_gpu = cp.asarray(texture_img) | |
| height, width = height_map.shape | |
| # Generate base vertices | |
| x, z = cp.meshgrid(cp.arange(width), cp.arange(height)) | |
| vertices = cp.stack([x, height_map_gpu * self.building_height, z], axis=-1) | |
| vertices = vertices.reshape(-1, 3) | |
| # Detect tree areas and generate tree geometry | |
| tree_mask = cp.all(texture_img_gpu == self.colors['dark_green'], axis=2) | |
| vertices = self.generate_tree_vertices(tree_mask, vertices) | |
| # Normalize coordinates | |
| scale = max(width, height) | |
| vertices[:, 0] = vertices[:, 0] / scale * 2 - (width / scale) | |
| vertices[:, 2] = vertices[:, 2] / scale * 2 - (height / scale) | |
| vertices[:, 1] = vertices[:, 1] * 2 - 1 | |
| # Generate optimized faces and UVs | |
| faces = self.generate_faces_gpu(height, width) | |
| uvs = self.generate_uvs_gpu(vertices, width, height) | |
| # Create textured mesh using the original texture image | |
| return self.create_textured_mesh(vertices, faces, uvs, texture_img) | |
| def generate_faces_gpu(height, width): | |
| """Generate optimized face indices""" | |
| i, j = cp.meshgrid(cp.arange(height-1), cp.arange(width-1), indexing='ij') | |
| v0 = (i * width + j).flatten() | |
| v1 = v0 + 1 | |
| v2 = ((i + 1) * width + j).flatten() | |
| v3 = v2 + 1 | |
| return cp.vstack(( | |
| cp.column_stack((v0, v2, v1)), | |
| cp.column_stack((v1, v2, v3)) | |
| )) | |
| def generate_uvs_gpu(vertices, width, height): | |
| """Generate optimized UV coordinates""" | |
| uvs = cp.zeros((vertices.shape[0], 2), order='C') | |
| # Fix: Use width-1 and height-1 for proper UV scaling, and swap coordinates | |
| uvs[:, 0] = vertices[:, 0] * width / ((width - 1) * 2) + 0.5 # Scale and center X coordinate | |
| uvs[:, 1] = 1 - (vertices[:, 2] * height / ((height - 1) * 2) + 0.5) # Scale, flip and center Y coordinate | |
| return uvs | |
| def create_textured_mesh(vertices, faces, uvs, texture_img): | |
| """Create textured mesh with proper color conversion""" | |
| # Ensure we're working with the original texture image | |
| if isinstance(texture_img, cp.ndarray): | |
| texture_img = texture_img.get() | |
| # Convert texture image to RGB format for PIL | |
| if len(texture_img.shape) == 3: | |
| if texture_img.shape[2] == 4: # BGRA | |
| texture_img = cv2.cvtColor(texture_img, cv2.COLOR_BGRA2RGB) | |
| else: # BGR | |
| texture_img = cv2.cvtColor(texture_img, cv2.COLOR_BGR2RGB) | |
| # Create PIL Image from the texture | |
| texture_pil = Image.fromarray(texture_img) | |
| # Create the mesh with texture | |
| mesh = trimesh.Trimesh( | |
| vertices=vertices.get() if isinstance(vertices, cp.ndarray) else vertices, | |
| faces=faces.get() if isinstance(faces, cp.ndarray) else faces, | |
| visual=trimesh.visual.TextureVisuals( | |
| uv=uvs.get() if isinstance(uvs, cp.ndarray) else uvs, | |
| image=texture_pil | |
| ) | |
| ) | |
| return mesh | |
| def generate_and_process_map(prompt: str) -> tuple[str | None, np.ndarray | None]: | |
| """Generate satellite image from prompt and convert to 3D model using GPU acceleration""" | |
| try: | |
| # Set dimensions and device | |
| width = height = 1024 | |
| # Generate random seed | |
| seed = np.random.randint(0, np.iinfo(np.int32).max) | |
| # Set random seeds | |
| torch.manual_seed(seed) | |
| np.random.seed(seed) | |
| # Generate satellite image using FLUX | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| generated_image = flux_pipe( | |
| prompt=f"satellite view in the style of TOK, {prompt}", | |
| width=width, | |
| height=height, | |
| num_inference_steps=25, | |
| generator=generator, | |
| guidance_scale=7.5 | |
| ).images[0] | |
| # Convert PIL Image to OpenCV format | |
| cv_image = cv2.cvtColor(np.array(generated_image), cv2.COLOR_RGB2BGR) | |
| # Initialize GPU-accelerated generator | |
| generator = GPUSatelliteModelGenerator(building_height=0.09) | |
| # Process image using GPU | |
| print("Segmenting image using GPU...") | |
| segmented_img = generator.segment_image_gpu(cv_image) | |
| print("Estimating heights using GPU...") | |
| height_map = generator.estimate_heights_gpu(cv_image, segmented_img) | |
| # Generate mesh using GPU-accelerated calculations | |
| print("Generating mesh using GPU...") | |
| mesh = generator.generate_mesh_gpu(height_map, cv_image) | |
| # Export to GLB | |
| temp_dir = tempfile.mkdtemp() | |
| output_path = os.path.join(temp_dir, 'output.glb') | |
| mesh.export(output_path) | |
| # Save segmented image to a temporary file | |
| segmented_path = os.path.join(temp_dir, 'segmented.png') | |
| cv2.imwrite(segmented_path, segmented_img.get()) | |
| return output_path, segmented_path | |
| except Exception as e: | |
| print(f"Error during generation: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Text to Map") | |
| gr.Markdown("Generate a 3D map from text!") | |
| with gr.Row(): | |
| prompt_input = gr.Text( | |
| label="Enter your prompt", | |
| placeholder="classic american town" | |
| ) | |
| with gr.Row(): | |
| generate_btn = gr.Button("Generate", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_output = gr.Model3D( | |
| label="Generated 3D Map", | |
| clear_color=[0.0, 0.0, 0.0, 0.0], | |
| ) | |
| with gr.Column(): | |
| segmented_output = gr.Image( | |
| label="Segmented Map", | |
| type="filepath" | |
| ) | |
| # Event handler | |
| generate_btn.click( | |
| fn=generate_and_process_map, | |
| inputs=[prompt_input], | |
| outputs=[model_output, segmented_output], | |
| api_name="generate" | |
| ) | |
| if __name__ == "__main__": | |
| # Initialize FLUX pipeline | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| dtype = torch.bfloat16 | |
| repo_id = "black-forest-labs/FLUX.1-dev" | |
| adapter_id = "jbilcke-hf/flux-satellite" | |
| flux_pipe = DiffusionPipeline.from_pretrained( | |
| repo_id, | |
| torch_dtype=torch.bfloat16 | |
| ) | |
| flux_pipe.load_lora_weights(adapter_id) | |
| flux_pipe = flux_pipe.to(device) | |
| # Launch Gradio app | |
| demo.queue().launch() |