-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmissionmode.py
133 lines (112 loc) · 4.98 KB
/
missionmode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# Justin Reini
import asyncio
from mavsdk import System
from mavsdk.mission import (MissionItem, MissionPlan)
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print("Drone discovered!")
break
print_mission_progress_task = asyncio.ensure_future(
print_mission_progress(drone))
running_tasks = [print_mission_progress_task]
termination_task = asyncio.ensure_future(
observe_is_in_air(drone, running_tasks))
mission_items = []
mission_items.append(MissionItem(37.9485145,
-91.7840395,
60,
10,
False,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan'),
float('nan'),
float('nan')))
mission_items.append(MissionItem(37.9485145,
-91.7840395,
80,
10,
False,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan'),
float('nan'),
float('nan')))
mission_items.append(MissionItem(37.9485145,
-91.7840395,
80,
10,
False,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan'),
float('nan'),
float('nan')))
mission_items.append(MissionItem(37.9485145,
-91.7840395,
80,
5,
False,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan'),
float('nan'),
float('nan')))
mission_items.append(MissionItem(37.9485145,
-91.7840395,
60,
10,
False,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan'),
float('nan'),
float('nan')))
mission_plan = MissionPlan(mission_items)
await drone.mission.set_return_to_launch_after_mission(True)
print("-- Uploading mission")
await drone.mission.upload_mission(mission_plan)
print("-- Arming")
await drone.action.arm()
print("-- Starting mission")
await drone.mission.start_mission()
await termination_task
async def print_mission_progress(drone):
async for mission_progress in drone.mission.mission_progress():
print(f"Mission progress: "
f"{mission_progress.current}/"
f"{mission_progress.total}")
async def observe_is_in_air(drone, running_tasks):
""" Monitors whether the drone is flying or not and
returns after landing """
was_in_air = False
async for is_in_air in drone.telemetry.in_air():
if is_in_air:
was_in_air = is_in_air
if was_in_air and not is_in_air:
for task in running_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await asyncio.get_event_loop().shutdown_asyncgens()
return
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())