From 2c6e4e14146cdc8ede01e1f3f003c53d476c11d5 Mon Sep 17 00:00:00 2001 From: RexBerry <59031902+RexBerry@users.noreply.github.com> Date: Tue, 12 Nov 2024 19:26:37 -0600 Subject: [PATCH] Write initial version of mapping state #29 --- flight/camera.py | 38 +++++--- state_machine/states/impl/mapping_impl.py | 107 +++++++++++++++++++++- 2 files changed, 128 insertions(+), 17 deletions(-) diff --git a/flight/camera.py b/flight/camera.py index d5bdbe98..2321d42a 100644 --- a/flight/camera.py +++ b/flight/camera.py @@ -42,16 +42,27 @@ class Camera: The default path is the images folder in the current working directory. The file name will be the file format attribute. Returns the file name and the file path. + mapping_move_to( + drone: dronekit.Vehicle, + latitude: float, + longitude: float, + altitude: float, + interval: float, + heading: float, + ) + Move the drone to the specified latitude, longitude, and altitude. + Takes photos along the way. + odlc_move_to( - drone: Drone, + drone: dronekit.Vehicle, latitude: float, longitude: float, altitude: float, - fast_param: float, - take_photos: float + take_photos: bool, + heading: float, ) Move the drone to the specified latitude, longitude, and altitude. - Takes photos along the way if take_photos is True. + Takes a photo at the end if take_photos is True. """ def __init__(self) -> None: @@ -69,9 +80,7 @@ def __init__(self) -> None: logging.info("Camera initialized") - async def capture_photo( - self, path: str = f"{os.getcwd()}/images/" - ) -> tuple[str, str]: + async def capture_photo(self, path: str = f"{os.getcwd()}/images/") -> tuple[str, str]: """ Capture a photo and save it to the specified path. @@ -139,6 +148,7 @@ async def mapping_move_to( The yaw in which the camera should point, in degrees (0 is north, 90 is west). """ + await self.capture_photo(f"{os.getcwd()}/mapping_images/") goto_task: asyncio.Task[None] = asyncio.ensure_future( move_to( drone, @@ -150,9 +160,7 @@ async def mapping_move_to( ) ) - start_pos: dronekit.LocationGlobalRelative = ( - drone.location.global_relative_frame - ) + start_pos: dronekit.LocationGlobalRelative = drone.location.global_relative_frame start_lat: float = start_pos.lat start_lon: float = start_pos.lon @@ -165,9 +173,7 @@ async def mapping_move_to( -drone.attitude.pitch - 90, 0, heading # pitch is relative to the drone ) - position: dronekit.LocationGlobalRelative = ( - drone.location.global_relative_frame - ) + position: dronekit.LocationGlobalRelative = drone.location.global_relative_frame drone_lat: float = position.lat drone_long: float = position.lon @@ -183,6 +189,8 @@ async def mapping_move_to( await asyncio.sleep(0.25) + await self.capture_photo(f"{os.getcwd()}/mapping_images/") + async def odlc_move_to( self, drone: dronekit.Vehicle, @@ -194,8 +202,8 @@ async def odlc_move_to( ) -> None: """ This function takes in a latitude, longitude and altitude and autonomously - moves the drone to that waypoint. It will take photos along the path if passed - true in take_photos and add the point and name of photo to a json. + moves the drone to that waypoint. It will take a photo at the end if passed + True in take_photos and add the point and name of photo to a json. Parameters ---------- diff --git a/state_machine/states/impl/mapping_impl.py b/state_machine/states/impl/mapping_impl.py index abc6431d..c52cc858 100644 --- a/state_machine/states/impl/mapping_impl.py +++ b/state_machine/states/impl/mapping_impl.py @@ -2,6 +2,15 @@ import asyncio import logging +import math +from typing import Final + +import utm + +from flight.camera import Camera +from flight.extract_gps import extract_gps, GPSData +from flight.extract_gps import BoundaryPointUtm +from flight.waypoint.goto import move_to from state_machine.state_tracker import ( update_state, @@ -12,6 +21,11 @@ from state_machine.states.mapping import Mapping from state_machine.states.state import State +# TODO: Determine good values +MAPPING_ALTITUDE: Final[float] = 30 # meters +HORIZONTAL_PHOTO_SPACING: Final[float] = 15 # meters +VERTICAL_PHOTO_SPACING: Final[float] = 15 # meters + async def run(self: Mapping) -> State: """ @@ -31,8 +45,97 @@ async def run(self: Mapping) -> State: logging.info("Mapping") - # TODO: Implement the mapping state - raise NotImplementedError("Mapping state not implemented yet") + gps_dict: GPSData = extract_gps(self.flight_settings.path_data_path) + mapping_boundary_utm: list[BoundaryPointUtm] = gps_dict["mapping_boundary_utm"] + + # The mapping area should be roughly rectangular with 4 vertices + # We are going to travel in line segments parallel to the long edges + + is_long_edge_first: bool = math.hypot( + mapping_boundary_utm[1].easting - mapping_boundary_utm[0].easting, + mapping_boundary_utm[1].northing - mapping_boundary_utm[0].northing, + ) >= math.hypot( + mapping_boundary_utm[2].easting - mapping_boundary_utm[0].easting, + mapping_boundary_utm[2].northing - mapping_boundary_utm[0].northing, + ) + + # Ensure the first edge is long + if not is_long_edge_first: + mapping_boundary_utm[1], mapping_boundary_utm[3] = ( + mapping_boundary_utm[3], + mapping_boundary_utm[1], + ) + + # Take the max because it's better to take too many photos than too few + short_edge_length = max( + math.hypot( + mapping_boundary_utm[3].easting - mapping_boundary_utm[0].easting, + mapping_boundary_utm[3].northing - mapping_boundary_utm[0].northing, + ), + math.hypot( + mapping_boundary_utm[2].easting - mapping_boundary_utm[1].easting, + mapping_boundary_utm[2].northing - mapping_boundary_utm[1].northing, + ), + ) + + # Get average direction of short edges + camera_heading: float = ( + math.degrees( + math.atan2( + (mapping_boundary_utm[3].northing - mapping_boundary_utm[0].northing) + + (mapping_boundary_utm[2].northing - mapping_boundary_utm[1].northing), + (mapping_boundary_utm[3].easting - mapping_boundary_utm[0].easting) + + (mapping_boundary_utm[2].easting - mapping_boundary_utm[1].easting), + ) + ) + - 90.0 + ) % 360.0 + step_count: int = math.ceil(short_edge_length / VERTICAL_PHOTO_SPACING) + + utm_zone_number = mapping_boundary_utm[0].zone_number + utm_zone_letter = mapping_boundary_utm[0].zone_letter + camera: Camera = Camera() + reverse_direction: bool = False + for i in range(step_count + 1): + lerp_t = i / step_count + + # Linearly interpolate along the short edges + start_easting: float = (1 - lerp_t) * mapping_boundary_utm[ + 0 + ].easting + lerp_t * mapping_boundary_utm[3].easting + start_northing: float = (1 - lerp_t) * mapping_boundary_utm[ + 0 + ].northing + lerp_t * mapping_boundary_utm[3].northing + end_easting: float = (1 - lerp_t) * mapping_boundary_utm[ + 1 + ].easting + lerp_t * mapping_boundary_utm[2].easting + end_northing: float = (1 - lerp_t) * mapping_boundary_utm[ + 1 + ].northing + lerp_t * mapping_boundary_utm[2].northing + + # Move in a serpentine pattern + if reverse_direction: + start_easting, end_easting = end_easting, start_easting + start_northing, end_northing = end_northing, start_northing + + lat: float + lon: float + lat, lon = utm.to_latlon( + start_easting, start_northing, utm_zone_number, utm_zone_letter + ) + await move_to(self.drone.vehicle, lat, lon, MAPPING_ALTITUDE) + + lat, lon = utm.to_latlon(end_easting, end_northing, utm_zone_number, utm_zone_letter) + await camera.mapping_move_to( + self.drone.vehicle, + lat, + lon, + MAPPING_ALTITUDE, + HORIZONTAL_PHOTO_SPACING, + camera_heading, + ) + + reverse_direction = not reverse_direction logging.info("Mapping state complete.") except asyncio.CancelledError as ex: