-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflight_manager.py
112 lines (93 loc) · 4.03 KB
/
flight_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Reusable controller for starting flight processes"""
import argparse
import logging
from typing import Tuple
from multiprocessing import Process, Queue
from multiprocessing.managers import BaseManager
from logger import init_logger, worker_configurer
from communication import Communication
from flight.flight import flight
from flight.state_settings import StateSettings
class FlightManager:
def __init__(self, state_settings: StateSettings) -> None:
"""
Constructor for the flight manager
Parameters
----------
state_settings: StateSettings
Settings for flight state machine
"""
self.state_settings = state_settings
def main(self) -> None:
"""
Central function to run all threads of state machine
"""
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument("-s", "--simulation", help="using the simulator", action="store_true")
args = parser.parse_args()
logging.debug("Simulation flag %s", "enabled" if args.simulation else "disabled")
self.run_threads(args.simulation)
def init_flight(self, flight_args: Tuple[Communication, bool, Queue, worker_configurer, StateSettings]) -> Process:
"""
Initializes the flight state machine process
Parameters
----------
flight_args: Tuple[Communication, bool, Queue, worker_configurer, StateSettings]
Arguments necessary for flight logging and state machine settings
Returns
-------
Process
Multiprocessing object for flight state machine
"""
return Process(target=flight, name="flight", args=flight_args)
def run_threads(self, sim: bool) -> None:
"""
Runs the various threads present in the state machine
Parameters
----------
sim: bool
Decides if the simulator is active
"""
# Register Communication object to Base Manager
BaseManager.register("Communication", Communication)
# Create manager object
manager: BaseManager = BaseManager()
# Start manager
manager.start()
# Create Communication object from manager
comm_obj = manager.Communication()
log_queue: Queue = Queue(-1)
logging_process = init_logger(log_queue)
logging_process.start()
worker_configurer(log_queue)
# Create new processes
logging.info("Spawning Processes")
flight_args = (comm_obj, sim, log_queue, worker_configurer, self.state_settings)
flight_process: Process = self.init_flight(flight_args)
# Start flight function
flight_process.start()
logging.debug("Flight process with id %d started", flight_process.pid)
logging.debug(f"Title: {self.state_settings.run_title}")
logging.debug(f"Description: {self.state_settings.run_description}")
try:
while comm_obj.get_state() != "Final State":
# If the process is no longer alive,
# (i.e. error has been raised in this case)
# then create a new instance and start the new process
# (i.e. restart the process)
if flight_process.is_alive() is not True:
logging.error("Flight process terminated, restarting")
flight_process: Process = self.init_flight(flight_args)
flight_process.start()
except KeyboardInterrupt:
# Ctrl-C was pressed
# TODO send a message to the flight process to land instead of
# basically overwriting the process
logging.info("Ctrl-C Pressed, forcing drone to land")
comm_obj.set_state("land")
flight_process: Process = self.init_flight(flight_args)
flight_process.start()
# Join flight process before exiting function
flight_process.join()
logging.info("All processes ended, Goodbye!")
logging_process.stop()