summit/backend/process_dem.py

153 lines
5.5 KiB
Python

import os
import sys
import numpy as np
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling
import mercantile
from PIL import Image
# Configuration
RAW_DEM = "data/raw/synthetic_everest.tif"
RGB_DEM = "data/raw/synthetic_everest_rgb.tif"
TILES_DIR = "backend/upload/tiles"
MIN_ZOOM = 6
MAX_ZOOM = 12 # Limit zoom for synthetic demo to save time
def dem_to_rgb(data, interval=0.1, base_val=-10000):
"""
Manually implement Mapbox Terrain-RGB encoding.
height = -10000 + ((R * 256 * 256 + G * 256 + B) * 0.1)
Inverse:
v = (height + 10000) / 0.1
R = floor(v / 65536)
G = floor((v % 65536) / 256)
B = floor(v % 256)
"""
# 1. Calculate value
v = (data - base_val) / interval
v = np.clip(v, 0, 16777215) # 24-bit max
v = np.round(v).astype(np.uint32)
# 2. Extract channels
r = (v >> 16) & 0xFF
g = (v >> 8) & 0xFF
b = v & 0xFF
return np.stack([r, g, b], axis=0).astype(np.uint8)
def process_dem():
print(f"Processing {RAW_DEM}...")
with rasterio.open(RAW_DEM) as src:
# 1. Reproject to Web Mercator (EPSG:3857) first for correct tiling
dst_crs = 'EPSG:3857'
transform, width, height = calculate_default_transform(
src.crs, dst_crs, src.width, src.height, *src.bounds)
kwargs = src.meta.copy()
kwargs.update({
'crs': dst_crs,
'transform': transform,
'width': width,
'height': height,
'dtype': 'int16' # keep elevation as int/float before RGBify
})
# Read and reproject elevation data
print("Reprojecting to EPSG:3857...")
source_data = src.read(1)
# Create a temporary in-memory array for reprojected data
destination_data = np.zeros((height, width), dtype=np.int16)
reproject(
source=source_data,
destination=destination_data,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=dst_crs,
resampling=Resampling.bilinear
)
# 2. Convert to RGB
print("Encoding to Terrain-RGB...")
rgb_data = dem_to_rgb(destination_data) # Shape: (3, H, W)
# 3. Generate Tiles
print(f"Generating tiles (Zoom {MIN_ZOOM}-{MAX_ZOOM})...")
# We need the bounds in EPSG:4326 to find tile indices
# But we work with EPSG:3857 data for cropping
# Simplified tiling approach:
# Iterate over all tiles intersecting the bounds
w, s, e, n = rasterio.transform.array_bounds(height, width, transform)
# Convert 3857 bounds back to 4326 for mercantile
# Actually rasterio array_bounds of a 3857 dataset are in meters.
# We need lon/lat bounds for mercantile.tiles
# Let's get lon/lat bounds from the original src (since it's small and covers the same area)
# Or re-transform the 3857 bounds.
# Using original src bounds (WGS84) is safer for mercantile
lon_min, lat_min, lon_max, lat_max = src.bounds
for zoom in range(MIN_ZOOM, MAX_ZOOM + 1):
tiles = list(mercantile.tiles(lon_min, lat_min, lon_max, lat_max, zoom))
print(f"Z{zoom}: {len(tiles)} tiles")
for tile in tiles:
# Calculate tile bounds in EPSG:3857
tile_bounds = mercantile.xy_bounds(tile)
# Window logic
# Convert tile bounds (meters) to pixel window in our reprojected dataset
# transform is (pixel -> meters)
# ~transform is (meters -> pixel)
inv_transform = ~transform
# Top-Left
col_min, row_min = inv_transform * (tile_bounds.left, tile_bounds.top)
# Bottom-Right
col_max, row_max = inv_transform * (tile_bounds.right, tile_bounds.bottom)
# Round and clip
row_min = max(0, int(row_min))
col_min = max(0, int(col_min))
row_max = min(height, int(row_max))
col_max = min(width, int(col_max))
# Check if window is valid
if row_max <= row_min or col_max <= col_min:
continue
# Extract chunk
# Note: row/col might be flipped in your mind, but for numpy it's [y, x] i.e. [row, col]
# RGB data shape is (3, H, W)
chunk = rgb_data[:, row_min:row_max, col_min:col_max]
if chunk.shape[1] == 0 or chunk.shape[2] == 0:
continue
# Resize chunk to 256x256 (standard tile size)
# PIL expects (W, H, C)
# Transpose chunk from (C, H, W) -> (H, W, C)
chunk_img = np.transpose(chunk, (1, 2, 0))
img = Image.fromarray(chunk_img, mode='RGB')
img = img.resize((256, 256), resample=Image.Resampling.BILINEAR)
# Save
tile_dir = os.path.join(TILES_DIR, str(tile.z), str(tile.x))
os.makedirs(tile_dir, exist_ok=True)
tile_path = os.path.join(tile_dir, f"{tile.y}.png")
img.save(tile_path)
print("Processing complete.")
if __name__ == "__main__":
if not os.path.exists(RAW_DEM):
print(f"Error: {RAW_DEM} not found.")
sys.exit(1)
process_dem()