Skip to content

Commit

Permalink
feat: work in progress for geometry merging functionality (hull / union)
Browse files Browse the repository at this point in the history
  • Loading branch information
spwoodcock committed Jan 7, 2025
1 parent a634df1 commit 9b68f3b
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 88 deletions.
211 changes: 125 additions & 86 deletions geojson_aoi/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import warnings
from itertools import chain
from pathlib import Path
from typing import Any

Expand All @@ -24,6 +25,9 @@
log = logging.getLogger(__name__)


### Normalise Funcs ###


def _normalize_featcol(featcol: FeatureCollection) -> FeatureCollection:
"""Normalize a FeatureCollection into a standardised format.
Expand Down Expand Up @@ -65,14 +69,13 @@ def _normalize_featcol(featcol: FeatureCollection) -> FeatureCollection:


def _remove_z_dimension(coords: Any) -> Any:
"""Remove the Z dimension from coordinates."""
if isinstance(coords, list):
if all(isinstance(coord, (list, tuple)) and len(coord) > 2 for coord in coords):
# Only process if three elements [x, y, z]
return [coord[:2] for coord in coords]
# Recursively process nested lists
return [_remove_z_dimension(coord) for coord in coords]
return coords
"""Recursively remove the Z dimension from coordinates."""
if isinstance(coords[0], (list, tuple)):
# If the first element is a list, recurse into each sub-list
return [_remove_z_dimension(sub_coord) for sub_coord in coords]
else:
# If the first element is not a list, it's a coordinate pair (x, y, z)
return coords[:2] # Return only [x, y]


def _multigeom_to_singlegeom(featcol: FeatureCollection) -> FeatureCollection:
Expand All @@ -90,20 +93,35 @@ def split_multigeom(
geom: dict[str, Any], properties: dict[str, Any]
) -> list[Feature]:
"""Splits multi-geometries into individual geometries."""
geom_type = geom["type"]
coordinates = geom["coordinates"]

# Handle MultiPolygon correctly
if geom_type == "MultiPolygon":
return [
{
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": polygon},
"properties": properties,
}
for polygon in coordinates
]

# Handle other MultiXXX types
return [
{
"type": "Feature",
"geometry": {"type": geom["type"][5:], "coordinates": coord},
"geometry": {"type": geom_type[5:], "coordinates": coord},
"properties": properties,
}
for coord in geom["coordinates"]
for coord in coordinates
]

final_features = []

for feature in featcol.get("features", []):
properties = feature["properties"]
geom = feature["geometry"]
properties = feature.get("properties", {})
geom = feature.get("geometry")
if not geom or "type" not in geom:
continue

Expand All @@ -119,6 +137,7 @@ def split_multigeom(

### CRS Funcs ###


def _check_crs(featcol: FeatureCollection) -> None:
"""Warn the user if an invalid CRS is detected.
Expand Down Expand Up @@ -147,7 +166,7 @@ def is_valid_coordinate(coord):
"GeoJSON file in WGS84(EPSG 4326) standard."
)
log.warning(warning_msg)
warnings.warn(UserWarning(warning_msg))
warnings.warn(UserWarning(warning_msg), stacklevel=2)

features = featcol.get("features", [])
coordinates = (
Expand All @@ -166,7 +185,10 @@ def is_valid_coordinate(coord):
"Is the file empty?"
)
log.warning(warning_msg)
warnings.warn(UserWarning(warning_msg))
warnings.warn(UserWarning(warning_msg), stacklevel=2)


### Geom Merging Funcs ###


def _ensure_right_hand_rule(
Expand All @@ -175,6 +197,7 @@ def _ensure_right_hand_rule(
"""Ensure the outer ring follows the right-hand rule (clockwise)."""

def is_clockwise(ring: list[PointGeom]) -> bool:
"""Check coords are in clockwise direction."""
return (
sum(
(ring[i][0] - ring[i - 1][0]) * (ring[i][1] + ring[i - 1][1])
Expand All @@ -183,21 +206,49 @@ def is_clockwise(ring: list[PointGeom]) -> bool:
> 0
)

# Validate input
if not isinstance(coordinates[0], list) or not all(
isinstance(pt, list) and len(pt) == 2 for pt in coordinates[0]
):
raise ValueError(
"Invalid input: coordinates[0] must be a list "
f"of [x, y] points. Got: {coordinates[0]}"
)

# Ensure the first ring is the exterior ring and follows clockwise direction
if not is_clockwise(coordinates[0]):
coordinates[0] = coordinates[0][::-1]

for i in range(1, len(coordinates)): # Reverse holes to counter-clockwise
# Ensure any holes follow counter-clockwise direction
for i in range(1, len(coordinates)):
if is_clockwise(coordinates[i]):
coordinates[i] = coordinates[i][::-1]

return coordinates


def _create_convex_hull(polygons: PolygonGeom) -> list[PointGeom]:
"""Create a convex hull from a list of polygons."""
from itertools import chain
def _remove_holes(polygon: list) -> list:
"""Remove holes from a polygon by keeping only the exterior ring.
Args:
polygon: A list of coordinate rings, where the first is the exterior
and subsequent ones are interior holes.
Returns:
list: A list containing only the exterior ring coordinates.
"""
if not polygon:
return [] # Return an empty list if the polygon is empty
return polygon[0] # Only return the exterior ring


def _create_convex_hull(points: list[PointGeom]) -> list[PointGeom]:
"""Create a convex hull from a list of polygons.
points = list(chain.from_iterable(chain.from_iterable(polygons)))
This essentially draws a boundary around the outside of the polygons.
Most appropriate when the boundaries are not touching (disjoint).
"""

def cross(o: PointGeom, a: PointGeom, b: PointGeom) -> float:
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
Expand All @@ -207,12 +258,10 @@ def cross(o: PointGeom, a: PointGeom, b: PointGeom) -> float:
return points

lower, upper = [], []

for p in points:
while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0:
lower.pop()
lower.append(p)

for p in reversed(points):
while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0:
upper.pop()
Expand All @@ -221,111 +270,101 @@ def cross(o: PointGeom, a: PointGeom, b: PointGeom) -> float:
return lower[:-1] + upper[:-1]


# def _polygons_disjoint(poly1: list[list[float]], poly2: list[list[float]]) -> bool:
# """Check if two polygons are disjoint.

# Test bounding boxes and edge intersections.
# """

# def bounding_box(polygon: list[list[float]]) -> tuple:
# """Compute the bounding box of a polygon."""
# xs, ys = zip(*polygon, strict=False)
# return min(xs), min(ys), max(xs), max(ys)
def _polygons_disjoint(poly1: list[list[float]], poly2: list[list[float]]) -> bool:
"""Check if two polygons are disjoint.
# def bounding_boxes_overlap(bb1: tuple, bb2: tuple) -> bool:
# """Check if two bounding boxes overlap."""
# return not (
# bb1[2] < bb2[0] or bb2[2] < bb1[0] or bb1[3] < bb2[1] or bb2[3] < bb1[1]
# )
Test bounding boxes and edge intersections.
"""

# def line_segments_intersect(p1, p2, q1, q2) -> bool:
# """Check if two line segments (p1->p2 and q1->q2) intersect."""
def bounding_box(polygon: list[list[float]]) -> tuple:
xs, ys = zip(*polygon, strict=False)
return min(xs), min(ys), max(xs), max(ys)

# def ccw(a, b, c):
# return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0])
def bounding_boxes_overlap(bb1: tuple, bb2: tuple) -> bool:
return not (
bb1[2] < bb2[0] or bb2[2] < bb1[0] or bb1[3] < bb2[1] or bb2[3] < bb1[1]
)

# return (
# ccw(p1, q1, q2) != ccw(p2, q1, q2)
# and
# ccw(p1, p2, q1) != ccw(p1, p2, q2)
# )
bb1, bb2 = bounding_box(poly1), bounding_box(poly2)
if not bounding_boxes_overlap(bb1, bb2):
return True

# # Check bounding boxes
# bb1, bb2 = bounding_box(poly1), bounding_box(poly2)
# if not bounding_boxes_overlap(bb1, bb2):
# return True
def line_segments_intersect(p1, p2, q1, q2) -> bool:
def ccw(a, b, c):
return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0])

# # Check for edge intersections
# for i in range(len(poly1)):
# p1, p2 = poly1[i], poly1[(i + 1) % len(poly1)]
# for j in range(len(poly2)):
# q1, q2 = poly2[j], poly2[(j + 1) % len(poly2)]
# if line_segments_intersect(p1, p2, q1, q2):
# return False
return ccw(p1, q1, q2) != ccw(p2, q1, q2) and ccw(p1, p2, q1) != ccw(p1, p2, q2)

# return True
for i in range(len(poly1)):
p1, p2 = poly1[i], poly1[(i + 1) % len(poly1)]
for j in range(len(poly2)):
q1, q2 = poly2[j], poly2[(j + 1) % len(poly2)]
if line_segments_intersect(p1, p2, q1, q2):
return False

return True

def _remove_holes(polygon: list) -> list:
"""Remove holes from a polygon by keeping only the exterior ring.

Args:
polygon: A list of coordinate rings, where the first is the exterior
and subsequent ones are interior holes.
def _create_unary_union(polygons: list[list[list[float]]]) -> list[list[list[float]]]:
"""Create a unary union from a list of polygons.
Returns:
list: A list containing only the exterior ring coordinates.
This merges the polygons by their boundaries exactly.
Most appropriate when the boundaries are touching (not disjoint).
"""
if not polygon:
return [] # Return an empty list if the polygon is empty
return [polygon[0]] # Only keep the exterior ring
# Pure Python union implementation is non-trivial, so this is simplified:
# Merge all coordinates into a single outer ring.
all_points = chain.from_iterable(polygon[0] for polygon in polygons)
return [list(set(all_points))]


def merge_polygons(
featcol: FeatureCollection, dissolve_polygon: bool = False
) -> FeatureCollection:
def merge_polygons(featcol: FeatureCollection) -> FeatureCollection:
"""Merge multiple Polygons or MultiPolygons into a single Polygon.
It is used to create a single polygon boundary.
Automatically determine whether to use union (for overlapping polygons)
or convex hull (for disjoint polygons).
As a result of the processing, any Feature properties will be lost.
Args:
featcol (FeatureCollection): a FeatureCollection containing geometries.
dissolve_polygon (bool): True to dissolve polygons to single polygon.
Returns:
FeatureCollection: a FeatureCollection of a single Polygon.
"""
merged_coordinates = []
if not featcol.get("features"):
raise ValueError("FeatureCollection must contain at least one feature")

polygons = []
for feature in featcol.get("features", []):
geom = feature["geometry"]
if geom["type"] == "Polygon":
# Remove holes from the polygon
polygon_without_holes = _remove_holes(geom["coordinates"])
merged_coordinates.append(_ensure_right_hand_rule(polygon_without_holes))
polygons.append([_remove_holes(geom["coordinates"])])
elif geom["type"] == "MultiPolygon":
# Remove holes from each polygon in the MultiPolygon
for polygon in geom["coordinates"]:
polygon_without_holes = _remove_holes(polygon)
merged_coordinates.append(
_ensure_right_hand_rule(polygon_without_holes)
)
polygons.append([_remove_holes(polygon)])

polygons = [_ensure_right_hand_rule(polygon[0]) for polygon in polygons]

if dissolve_polygon:
# Combine all disjoint (i.e. non-overlapping) polygons into
# a single convex hull-like structure
# TODO ideally do this automatically using function like _polygons_disjoint
merged_coordinates = _create_convex_hull(merged_coordinates)
if all(
_polygons_disjoint(p1[0], p2[0])
for i, p1 in enumerate(polygons)
for p2 in polygons[i + 1 :]
):
merged_coordinates = _create_convex_hull(
list(chain.from_iterable(chain.from_iterable(polygons)))
)
else:
merged_coordinates = _create_unary_union(polygons)

return {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": merged_coordinates},
"geometry": {"type": "Polygon", "coordinates": [merged_coordinates]},
"properties": {},
}
],
}
Expand Down
Loading

0 comments on commit 9b68f3b

Please sign in to comment.