-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhoffman_obs.py
215 lines (172 loc) · 6.16 KB
/
hoffman_obs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Main driver code for moving drone to each waypoint
"""
import asyncio
import json
from mavsdk import System
import mavsdk as sdk
import logging
import math
import typing
from typing import Dict, List
from avoidance import rrt_flight_test
def boundary_parsing(filename: str) -> List[Dict[str, float]]:
"""
Parses the json file for all mission-critical waypoints
Args:
filename (str): name of the json file
Returns:
Waypoint_Locs ([Dict[str,float]]): List of dictionaries containing
a string identifier and float for lattitude, longitude and altitude
"""
f = open(filename, )
data_set = json.load(f)
# print(data_set)
f.close()
boundary_Locs: List[Dict[str, float]] = []
for i in range(0, len(data_set["boundaryPoints"])):
boundary_Locs.append(data_set["boundaryPoints"][i])
return boundary_Locs
def waypoint_parsing(filename: str) -> List[Dict[str, float]]:
"""
Parses the json file for all mission-critical waypoints
Args:
filename (str): name of the json file
Returns:
Waypoint_Locs ([Dict[str,float]]): List of dictionaries containing
a string identifier and float for lattitude, longitude and altitude
"""
f = open(filename, )
data_set = json.load(f)
# print(data_set)
f.close()
waypoint_Locs: List[Dict[str, float]] = []
for i in range(0, len(data_set["waypoints"])):
waypoint_Locs.append(data_set["waypoints"][i])
return waypoint_Locs
def stationary_obstacle_parsing(filename: str) -> List[Dict[str, float]]:
"""
Opens passed JSON file and extracts the Stationary obstacle attributes
Parameters
----------
filename: str
String of JSON file name and file type
Returns
-------
List[Dict[str, float]]
list of dictionaries containing latitude, longitude, radius, and height of obstacles
"""
with open(filename) as f:
try:
data_set: Dict[str, List] = json.load(f)
except:
f.close()
f.close()
stationary_obs: List[Dict[str, float]] = [obs for obs in data_set["stationaryObstacles"]]
return stationary_obs
async def move_to(drone: System, latitude: float, longitude: float, altitude: float) -> None:
"""
This function takes in a latitude, longitude and altitude and autonomously
moves the drone to that waypoint. This function will also auto convert the altitude
from feet to meters.
Parameters
----------
drone: System
a drone object that has all offboard data needed for computation
latitude: float
a float containing the requested latittude to move to
longitude: float
a float containing the requested longitude to move to
altitude: float
a float contatining the requested altitude to go to (in feet)
Returns
-------
None
"""
# converts feet into meters
altitude = altitude * .3048
# get current altitude
async for terrain_info in drone.telemetry.home():
absolute_altitude: float = terrain_info.absolute_altitude_m
break
await drone.action.goto_location(latitude, longitude, altitude + absolute_altitude, 0)
location_reached: bool = False
# Loops until the waypoint is reached
while (not location_reached):
print("Going to waypoint")
async for position in drone.telemetry.position():
# continuously checks current latitude, longitude and altitude of the drone
drone_lat: float = position.latitude_deg
drone_long: float = position.longitude_deg
drone_alt: float = position.relative_altitude_m
# checks if location is reached and moves on if so
if ((round(drone_lat, 4) == round(latitude, 4)) and
(round(drone_long, 4) == round(longitude, 4)) and
(round(drone_alt, 1) == round(altitude, 1))):
location_reached = True
print("arrived")
break
await asyncio.sleep(1)
return
async def run() -> None:
"""
This function is just a driver to test the goto function.
Parameters
----------
None
Returns
-------
None
"""
# Put all latitudes, longitudes and altitudes into seperate arrays
lats: List[float] = []
longs: List[float] = []
altitudes: List[float] = []
waypoints: List[Dict[str, float]] = waypoint_parsing("test_data.json")
stationary_obs: List[Dict[str, float]] = stationary_obstacle_parsing("test_data.json")
boundary: List[Dict[str, float]] = boundary_parsing("test_data.json")
new_path = rrt_flight_test.rrt_flight_test(stationary_obs, waypoints, boundary)
for i in waypoints:
for key, val in i.items():
if (key == "latitude"):
lats.append(val)
if (key == "longitude"):
longs.append(val)
if (key == "altitude"):
altitudes.append(val)
# create a drone object
drone: System = System()
await drone.connect(system_address="udp://:14540")
# connect to the drone
logging.info("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
logging.info("Drone discovered!")
break
print("Waiting for drone to have a global position estimate...")
async for health in drone.telemetry.health():
if health.is_global_position_ok:
logging.info("Global position estimate ok")
break
print("-- Arming")
await drone.action.arm()
print("-- Taking off")
await drone.action.takeoff()
# wait for drone to take off
await asyncio.sleep(10)
# move to each waypoint in mission
i=0
for point in new_path:
await move_to(drone, point[0], point[1], 100)
i=i+1
# return home
print("Last waypoint reached")
print("Returning to home")
await drone.action.return_to_launch()
print("Staying connected, press Ctrl-C to exit")
# infinite loop till forced disconnect
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())