From a12e2b0cf53b6cf13fea49ef046ce6638c3096bb Mon Sep 17 00:00:00 2001 From: Benjamin Pelletier Date: Wed, 24 Jan 2024 17:40:18 +0000 Subject: [PATCH] [uss_qualifier] Add KML visualization to sequence view artifact (#462) * Factor out Volume4D KML operations from make_flight_intent_kml * Generate KML visualizations for scenarios in sequence view artifact * Fix general_flight_auth CI * Improve Python compatibility * `make format` * Add resiliency to render failure * Address comments --- monitoring/monitorlib/geo.py | 9 + monitoring/monitorlib/kml.py | 249 +++++++++++++++++- .../configurations/configuration.py | 3 + monitoring/uss_qualifier/reports/artifacts.py | 4 +- .../reports/sequence_view/__init__.py | 0 .../generate.py} | 220 +++------------- .../reports/sequence_view/kml.py | 246 +++++++++++++++++ .../reports/sequence_view/summary_types.py | 197 ++++++++++++++ .../templates/sequence_view/scenario.html | 5 + .../test_data/make_flight_intent_kml.py | 168 ++---------- .../SequenceViewConfiguration.json | 4 + 11 files changed, 766 insertions(+), 339 deletions(-) create mode 100644 monitoring/uss_qualifier/reports/sequence_view/__init__.py rename monitoring/uss_qualifier/reports/{sequence_view.py => sequence_view/generate.py} (81%) create mode 100644 monitoring/uss_qualifier/reports/sequence_view/kml.py create mode 100644 monitoring/uss_qualifier/reports/sequence_view/summary_types.py diff --git a/monitoring/monitorlib/geo.py b/monitoring/monitorlib/geo.py index 82b19a8036..ac20a86ea6 100644 --- a/monitoring/monitorlib/geo.py +++ b/monitoring/monitorlib/geo.py @@ -89,6 +89,15 @@ def match(self, other: LatLngPoint) -> bool: and abs(self.lng - other.lng) < COORD_TOLERANCE_DEG ) + def offset(self, east_meters: float, north_meters: float) -> LatLngPoint: + dlat = 360 * north_meters / EARTH_CIRCUMFERENCE_M + dlng = ( + 360 + * east_meters + / (EARTH_CIRCUMFERENCE_M * math.cos(math.radians(self.lat))) + ) + return LatLngPoint(lat=self.lat + dlat, lng=self.lng + dlng) + class Radius(ImplicitDict): value: float diff --git a/monitoring/monitorlib/kml.py b/monitoring/monitorlib/kml.py index 7b67eeae06..b4e8ab9441 100644 --- a/monitoring/monitorlib/kml.py +++ b/monitoring/monitorlib/kml.py @@ -1,8 +1,33 @@ -from pykml import parser +import math import re +from typing import List, Optional, Union + +import s2sphere +from pykml import parser +from pykml.factory import KML_ElementMaker as kml +from monitoring.monitorlib.geo import ( + Altitude, + AltitudeDatum, + DistanceUnits, + egm96_geoid_offset, + Radius, + EARTH_CIRCUMFERENCE_M, + LatLngPoint, +) +from monitoring.monitorlib.geotemporal import Volume4D KML_NAMESPACE = {"kml": "http://www.opengis.net/kml/2.2"} +METERS_PER_FOOT = 0.3048 + +# Hexadecimal colors +GREEN = "ff00c000" +YELLOW = "ff00ffff" +RED = "ff0000ff" +CYAN = "ffc0c000" +TRANSLUCENT_GRAY = "80808080" +TRANSLUCENT_GREEN = "8000ff00" +TRANSLUCENT_LIGHT_CYAN = "80ffffaa" def get_kml_root(kml_obj, from_string=False): @@ -111,3 +136,225 @@ def get_kml_content(kml_file, from_string=False): if folder_details: kml_content.update(folder_details) return kml_content + + +def _altitude_mode_of(altitude: Altitude) -> str: + if altitude.reference == AltitudeDatum.W84: + return "absolute" + elif altitude.reference == AltitudeDatum.SFC: + return "relativeToGround" + else: + raise NotImplementedError( + f"Altitude reference {altitude.reference} not yet supported" + ) + + +def _distance_value_of(distance: Union[Altitude, Radius]) -> float: + if distance.units == DistanceUnits.M: + return distance.value + elif distance.units == DistanceUnits.FT: + return distance.value * METERS_PER_FOOT + else: + raise NotImplementedError(f"Distance units {distance.units} not yet supported") + + +def make_placemark_from_volume( + v4: Volume4D, + name: Optional[str] = None, + style_url: Optional[str] = None, + description: Optional[str] = None, +) -> kml.Placemark: + if "outline_polygon" in v4.volume and v4.volume.outline_polygon: + vertices = v4.volume.outline_polygon.vertices + elif "outline_circle" in v4.volume and v4.volume.outline_circle: + center = v4.volume.outline_circle.center + r = _distance_value_of(v4.volume.outline_circle.radius) + N_VERTICES = 32 + vertices = [ + center.offset( + r * math.sin(2 * math.pi * theta / N_VERTICES), + r * math.cos(2 * math.pi * theta / N_VERTICES), + ) + for theta in range(0, N_VERTICES) + ] + else: + raise NotImplementedError("Volume footprint type not supported") + + # Create placemark + args = [] + if name is not None: + args.append(kml.name(name)) + if style_url is not None: + args.append(kml.styleUrl(style_url)) + placemark = kml.Placemark(*args) + if description: + placemark.append(kml.description(description)) + + # Set time range + timespan = None + if "time_start" in v4 and v4.time_start: + timespan = kml.TimeSpan(kml.begin(v4.time_start.datetime.isoformat())) + if "time_end" in v4 and v4.time_end: + if timespan is None: + timespan = kml.TimeSpan() + timespan.append(kml.end(v4.time_end.datetime.isoformat())) + if timespan is not None: + placemark.append(timespan) + + # Create top and bottom of the volume + avg = s2sphere.LatLng.from_degrees( + lat=sum(v.lat for v in vertices) / len(vertices), + lng=sum(v.lng for v in vertices) / len(vertices), + ) + geoid_offset = egm96_geoid_offset(avg) + lower_coords = [] + upper_coords = [] + alt_lo = ( + _distance_value_of(v4.volume.altitude_lower) - geoid_offset + if "altitude_lower" in v4.volume + else 0 + ) + alt_hi = ( + _distance_value_of(v4.volume.altitude_upper) - geoid_offset + if "altitude_upper" in v4.volume + else 0 + ) + for vertex in vertices: + lower_coords.append((vertex.lng, vertex.lat, alt_lo)) + upper_coords.append((vertex.lng, vertex.lat, alt_hi)) + geo = kml.MultiGeometry() + make_sides = True + if "altitude_lower" in v4.volume: + geo.append( + kml.Polygon( + kml.altitudeMode( + _altitude_mode_of( + v4.volume.altitude_lower + if "altitude_lower" in v4.volume + else AltitudeDatum.SFC + ) + ), + kml.outerBoundaryIs( + kml.LinearRing( + kml.coordinates( + " ".join(",".join(str(v) for v in c) for c in lower_coords) + ) + ) + ), + ) + ) + else: + make_sides = False + if "altitude_upper" in v4.volume: + geo.append( + kml.Polygon( + kml.altitudeMode( + _altitude_mode_of( + v4.volume.altitude_upper + if "altitude_upper" in v4.volume + else AltitudeDatum.SFC + ) + ), + kml.outerBoundaryIs( + kml.LinearRing( + kml.coordinates( + " ".join(",".join(str(v) for v in c) for c in upper_coords) + ) + ) + ), + ) + ) + else: + make_sides = False + + # We can only create the sides of the volume if the altitude references are the same + if ( + make_sides + and v4.volume.altitude_lower.reference == v4.volume.altitude_upper.reference + ): + indices = list(range(len(vertices))) + for i1, i2 in zip(indices, indices[1:] + [0]): + coords = [ + (vertices[i1].lng, vertices[i1].lat, alt_lo), + (vertices[i1].lng, vertices[i1].lat, alt_hi), + (vertices[i2].lng, vertices[i2].lat, alt_hi), + (vertices[i2].lng, vertices[i2].lat, alt_lo), + ] + geo.append( + kml.Polygon( + kml.altitudeMode(_altitude_mode_of(v4.volume.altitude_lower)), + kml.outerBoundaryIs( + kml.LinearRing( + kml.coordinates( + " ".join(",".join(str(v) for v in c) for c in coords) + ) + ) + ), + ) + ) + + placemark.append(geo) + return placemark + + +def flight_planning_styles() -> List[kml.Style]: + """Provides KML styles with names in the form {FlightPlanState}_{AirspaceUsageState}.""" + return [ + kml.Style( + kml.LineStyle(kml.color(GREEN), kml.width(3)), + kml.PolyStyle(kml.color(TRANSLUCENT_GRAY)), + id="Planned_Nominal", + ), + kml.Style( + kml.LineStyle(kml.color(GREEN), kml.width(3)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="InUse_Nominal", + ), + kml.Style( + kml.LineStyle(kml.color(YELLOW), kml.width(5)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="InUse_OffNominal", + ), + kml.Style( + kml.LineStyle(kml.color(RED), kml.width(5)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="InUse_Contingent", + ), + ] + + +def query_styles() -> List[kml.Style]: + """Provides KML styles for query areas.""" + return [ + kml.Style( + kml.LineStyle(kml.color(CYAN), kml.width(3)), + kml.PolyStyle(kml.color(TRANSLUCENT_LIGHT_CYAN)), + id="QueryArea", + ), + ] + + +def f3548v21_styles() -> List[kml.Style]: + """Provides KML styles according to F3548-21 operational intent states.""" + return [ + kml.Style( + kml.LineStyle(kml.color(GREEN), kml.width(3)), + kml.PolyStyle(kml.color(TRANSLUCENT_GRAY)), + id="F3548v21Accepted", + ), + kml.Style( + kml.LineStyle(kml.color(GREEN), kml.width(3)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="F3548v21Activated", + ), + kml.Style( + kml.LineStyle(kml.color(YELLOW), kml.width(5)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="F3548v21Nonconforming", + ), + kml.Style( + kml.LineStyle(kml.color(RED), kml.width(5)), + kml.PolyStyle(kml.color(TRANSLUCENT_GREEN)), + id="F3548v21Contingent", + ), + ] diff --git a/monitoring/uss_qualifier/configurations/configuration.py b/monitoring/uss_qualifier/configurations/configuration.py index 9359674fdb..9c0297fe14 100644 --- a/monitoring/uss_qualifier/configurations/configuration.py +++ b/monitoring/uss_qualifier/configurations/configuration.py @@ -170,6 +170,9 @@ class SequenceViewConfiguration(ImplicitDict): redact_access_tokens: bool = True """When True, look for instances of "Authorization" keys in the report with values starting "Bearer " and redact the signature from those access tokens""" + render_kml: bool = True + """When True, visualize geographic data for each scenario as a KML file.""" + class ReportHTMLConfiguration(ImplicitDict): redact_access_tokens: bool = True diff --git a/monitoring/uss_qualifier/reports/artifacts.py b/monitoring/uss_qualifier/reports/artifacts.py index f259fe1f96..5adc120f80 100644 --- a/monitoring/uss_qualifier/reports/artifacts.py +++ b/monitoring/uss_qualifier/reports/artifacts.py @@ -8,7 +8,9 @@ from monitoring.uss_qualifier.configurations.configuration import ArtifactsConfiguration from monitoring.uss_qualifier.reports.documents import make_report_html from monitoring.uss_qualifier.reports.report import TestRunReport, redact_access_tokens -from monitoring.uss_qualifier.reports.sequence_view import generate_sequence_view +from monitoring.uss_qualifier.reports.sequence_view.generate import ( + generate_sequence_view, +) from monitoring.uss_qualifier.reports.templates import render_templates from monitoring.uss_qualifier.reports.tested_requirements import ( generate_tested_requirements, diff --git a/monitoring/uss_qualifier/reports/sequence_view/__init__.py b/monitoring/uss_qualifier/reports/sequence_view/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/monitoring/uss_qualifier/reports/sequence_view.py b/monitoring/uss_qualifier/reports/sequence_view/generate.py similarity index 81% rename from monitoring/uss_qualifier/reports/sequence_view.py rename to monitoring/uss_qualifier/reports/sequence_view/generate.py index eb17220920..a2757fd48f 100644 --- a/monitoring/uss_qualifier/reports/sequence_view.py +++ b/monitoring/uss_qualifier/reports/sequence_view/generate.py @@ -2,21 +2,20 @@ import math import os -from dataclasses import dataclass -from datetime import datetime -from enum import Enum import html -from typing import List, Dict, Optional, Iterator, Union +from typing import List, Dict, Iterator from implicitdict import ImplicitDict +from loguru import logger -from monitoring.monitorlib.fetch import Query +from monitoring.monitorlib.errors import stacktrace_string from monitoring.uss_qualifier.action_generators.action_generator import ( action_generator_type_from_name, ) from monitoring.uss_qualifier.configurations.configuration import ( ParticipantID, SequenceViewConfiguration, + TestConfiguration, ) from monitoring.uss_qualifier.fileio import load_dict_with_references from monitoring.uss_qualifier.reports import jinja_env @@ -24,16 +23,30 @@ TestRunReport, TestSuiteActionReport, TestScenarioReport, - PassedCheck, - FailedCheck, Severity, SkippedActionReport, - ErrorReport, +) +from monitoring.uss_qualifier.reports.sequence_view.kml import make_scenario_kml +from monitoring.uss_qualifier.reports.sequence_view.summary_types import ( + TestedScenario, + Indexer, + Event, + NoteEvent, + Epoch, + TestedParticipant, + ActionNode, + ActionNodeType, + SkippedAction, + OverviewRow, + SuiteCell, + EpochType, + EventType, + TestedStep, + TestedCase, ) from monitoring.uss_qualifier.reports.tested_requirements import ( compute_test_run_information, ) -from monitoring.uss_qualifier.scenarios.definitions import TestScenarioTypeName from monitoring.uss_qualifier.scenarios.documentation.parsing import ( get_documentation_by_name, ) @@ -43,184 +56,6 @@ UNATTRIBUTED_PARTICIPANT = "unattributed" -class NoteEvent(ImplicitDict): - key: str - message: str - timestamp: datetime - - -class EventType(str, Enum): - PassedCheck = "PassedCheck" - FailedCheck = "FailedCheck" - Query = "Query" - Note = "Note" - - -class Event(ImplicitDict): - event_index: int = 0 - passed_check: Optional[PassedCheck] = None - failed_check: Optional[FailedCheck] = None - query_events: Optional[List[Union[Event, str]]] = None - query: Optional[Query] = None - note: Optional[NoteEvent] = None - - @property - def type(self) -> EventType: - if self.passed_check: - return EventType.PassedCheck - elif self.failed_check: - return EventType.FailedCheck - elif self.query: - return EventType.Query - elif self.note: - return EventType.Note - else: - raise ValueError("Invalid Event type") - - @property - def timestamp(self) -> datetime: - if self.passed_check: - return self.passed_check.timestamp.datetime - elif self.failed_check: - return self.failed_check.timestamp.datetime - elif self.query: - return self.query.request.timestamp - elif self.note: - return self.note.timestamp - else: - raise ValueError("Invalid Event type") - - def get_query_links(self) -> str: - links = [] - for e in self.query_events: - if isinstance(e, str): - links.append(e) - else: - links.append(f'{e.event_index}') - return ", ".join(links) - - -class TestedStep(ImplicitDict): - name: str - url: str - events: List[Event] - - @property - def rows(self) -> int: - return len(self.events) - - -class TestedCase(ImplicitDict): - name: str - url: str - steps: List[TestedStep] - - @property - def rows(self) -> int: - return sum(s.rows for s in self.steps) - - -class EpochType(str, Enum): - Case = "Case" - Events = "Events" - - -class Epoch(ImplicitDict): - case: Optional[TestedCase] = None - events: Optional[List[Event]] = None - - @property - def type(self) -> EpochType: - if self.case: - return EpochType.Case - elif self.events: - return EpochType.Events - else: - raise ValueError("Invalid Epoch did not specify case or events") - - @property - def rows(self) -> int: - if self.case: - return self.case.rows - elif self.events: - return len(self.events) - else: - raise ValueError("Invalid Epoch did not specify case or events") - - -@dataclass -class TestedParticipant(object): - has_failures: bool = False - has_infos: bool = False - has_successes: bool = False - has_queries: bool = False - - -@dataclass -class TestedScenario(object): - type: TestScenarioTypeName - name: str - url: str - scenario_index: int - duration: str - epochs: List[Epoch] - participants: Dict[ParticipantID, TestedParticipant] - execution_error: Optional[ErrorReport] - - @property - def rows(self) -> int: - return sum(c.rows for c in self.epochs) - - -@dataclass -class SkippedAction(object): - reason: str - - -class ActionNodeType(str, Enum): - Scenario = "Scenario" - Suite = "Suite" - ActionGenerator = "ActionGenerator" - SkippedAction = "SkippedAction" - - -class ActionNode(ImplicitDict): - name: str - node_type: ActionNodeType - children: List[ActionNode] - scenario: Optional[TestedScenario] = None - skipped_action: Optional[SkippedAction] = None - - @property - def rows(self) -> int: - return sum(c.rows for c in self.children) if self.children else 1 - - @property - def cols(self) -> int: - return 1 + max(c.cols for c in self.children) if self.children else 1 - - -@dataclass -class Indexer(object): - scenario_index: int = 1 - - -@dataclass -class SuiteCell(object): - node: Optional[ActionNode] - first_row: bool - rowspan: int = 1 - colspan: int = 1 - - -@dataclass -class OverviewRow(object): - suite_cells: List[SuiteCell] - scenario_node: Optional[ActionNode] = None - skipped_action_node: Optional[ActionNode] = None - filled: bool = False - - def _compute_tested_scenario( report: TestScenarioReport, indexer: Indexer ) -> TestedScenario: @@ -612,12 +447,14 @@ def _generate_scenario_pages( scenario_file = os.path.join( output_path, f"s{node.scenario.scenario_index}.html" ) + kml_file = f"./s{node.scenario.scenario_index}.kml" template = jinja_env.get_template("sequence_view/scenario.html") with open(scenario_file, "w") as f: f.write( template.render( test_scenario=node.scenario, all_participants=all_participants, + kml_file=kml_file if config.render_kml else None, EpochType=EpochType, EventType=EventType, UNATTRIBUTED_PARTICIPANT=UNATTRIBUTED_PARTICIPANT, @@ -626,6 +463,15 @@ def _generate_scenario_pages( Severity=Severity, ) ) + if config.render_kml: + try: + kml_file = os.path.join( + output_path, f"s{node.scenario.scenario_index}.kml" + ) + with open(kml_file, "w") as f: + f.write(make_scenario_kml(node.scenario)) + except (ValueError, KeyError, NotImplementedError) as e: + logger.error(f"Error generating {kml_file}:\n" + stacktrace_string(e)) else: for child in node.children: _generate_scenario_pages(child, config, output_path) diff --git a/monitoring/uss_qualifier/reports/sequence_view/kml.py b/monitoring/uss_qualifier/reports/sequence_view/kml.py new file mode 100644 index 0000000000..e50519aaa7 --- /dev/null +++ b/monitoring/uss_qualifier/reports/sequence_view/kml.py @@ -0,0 +1,246 @@ +from dataclasses import dataclass +from typing import Dict, List, Type, Optional, get_type_hints, Protocol + +from implicitdict import ImplicitDict +from loguru import logger +from lxml import etree +from pykml.factory import KML_ElementMaker as kml +from pykml.util import format_xml_with_cdata + +from monitoring.monitorlib.scd import priority_of +from monitoring.uss_qualifier.reports.sequence_view.summary_types import TestedScenario +from uas_standards.astm.f3548.v21.api import ( + QueryOperationalIntentReferenceParameters, + QueryOperationalIntentReferenceResponse, + GetOperationalIntentDetailsResponse, +) + +from monitoring.monitorlib.errors import stacktrace_string +from monitoring.monitorlib.fetch import QueryType, Query +from monitoring.monitorlib.geotemporal import Volume4D +from monitoring.monitorlib.kml import ( + make_placemark_from_volume, + query_styles, + f3548v21_styles, + flight_planning_styles, +) +from uas_standards.interuss.automated_testing.flight_planning.v1.api import ( + UpsertFlightPlanRequest, + UpsertFlightPlanResponse, +) + + +class QueryKMLRenderer(Protocol): + def __call__( + self, query: Query, req: ImplicitDict, resp: ImplicitDict + ) -> List[kml.Element]: + """Function that renders the provided query information into KML elements. + + Args: + query: Raw query to render to KML. + req: Query request, parsed into the annotated type. + resp: Query response, parsed into the annotated type. + + Returns: List of KML elements to include the folder for the query. + """ + + +@dataclass +class QueryKMLRenderInfo(object): + renderer: QueryKMLRenderer + include_query: bool + request_type: Optional[Type[ImplicitDict]] + response_type: Optional[Type[ImplicitDict]] + + +_query_kml_renderers: Dict[QueryType, QueryKMLRenderInfo] = {} + + +def query_kml_renderer(query_type: QueryType): + """Decorator to label a function that renders KML for a particular query type. + + Decorated functions should follow the QueryKMLRenderer Protocol, but may omit any of the parameters. + + Args: + query_type: The type of query the decorated function can render KML for. + """ + + def register_renderer(func: QueryKMLRenderer) -> QueryKMLRenderer: + hints = get_type_hints(func) + _query_kml_renderers[query_type] = QueryKMLRenderInfo( + renderer=func, + include_query="query" in hints, + request_type=hints.get("req", None), + response_type=hints.get("resp", None), + ) + return func + + return register_renderer + + +def make_scenario_kml(scenario: TestedScenario) -> str: + """Make KML file visualizing the provided scenario. + + Args: + scenario: Summarized scenario to visualize with KML. + + Returns: KML text that can be written to file. + """ + top_folder = kml.Folder(kml.name(scenario.name)) + for epoch in scenario.epochs: + if not epoch.case: + continue # Only support test cases for now + case_folder = kml.Folder(kml.name(epoch.case.name)) + top_folder.append(case_folder) + for step in epoch.case.steps: + step_folder = kml.Folder(kml.name(step.name)) + case_folder.append(step_folder) + for event in step.events: + if not event.query or "query_type" not in event.query: + continue # Only visualize queries of known types + if event.query.query_type not in _query_kml_renderers: + continue # Only visualize queries with renderers + render_info = _query_kml_renderers[event.query.query_type] + participant = ( + f"{event.query.participant_id} " + if "participant_id" in event.query + else "" + ) + query_folder = kml.Folder( + kml.name( + f"E{event.event_index}: {participant}{event.query.query_type.value}" + ) + ) + step_folder.append(query_folder) + + kwargs = {} + if render_info.include_query: + kwargs["query"] = event.query + if render_info.request_type: + try: + kwargs["req"] = ImplicitDict.parse( + event.query.request.json, + render_info.request_type, + ) + except ValueError as e: + msg = f"Error parsing request into {render_info.request_type.__name__}" + logger.warning(msg) + query_folder.append( + kml.Folder( + kml.name(msg), + kml.description(stacktrace_string(e)), + ) + ) + continue + if ( + render_info.response_type is not type(None) + and render_info.request_type + ): + try: + kwargs["resp"] = ImplicitDict.parse( + event.query.response.json, + render_info.response_type, + ) + except ValueError as e: + msg = f"Error parsing response into {render_info.response_type.__name__}" + logger.warning(msg) + query_folder.append( + kml.Folder( + kml.name(msg), + kml.description(stacktrace_string(e)), + ) + ) + continue + try: + query_folder.extend(render_info.renderer(**kwargs)) + except TypeError as e: + msg = f"Error rendering {render_info.renderer.__name__}" + logger.warning(msg) + query_folder.append( + kml.Folder( + kml.name(msg), + kml.description(stacktrace_string(e)), + ) + ) + doc = kml.kml( + kml.Document( + *query_styles(), *f3548v21_styles(), *flight_planning_styles(), top_folder + ) + ) + return etree.tostring(format_xml_with_cdata(doc), pretty_print=True).decode("utf-8") + + +@query_kml_renderer(QueryType.F3548v21DSSQueryOperationalIntentReferences) +def render_query_op_intent_references( + req: QueryOperationalIntentReferenceParameters, + resp: QueryOperationalIntentReferenceResponse, +): + if "area_of_interest" not in req or not req.area_of_interest: + return [ + kml.Folder(kml.name("Error: area_of_interest not specified in request")) + ] + v4 = Volume4D.from_f3548v21(req.area_of_interest) + items = "".join( + f"
  • {oi.manager}'s {oi.state.value} {oi.id}[{oi.version}]
  • " + for oi in resp.operational_intent_references + ) + description = ( + f"" if items else "(no operational intent references found)" + ) + return [ + make_placemark_from_volume( + v4, name="area_of_interest", style_url="#QueryArea", description=description + ) + ] + + +@query_kml_renderer(QueryType.F3548v21USSGetOperationalIntentDetails) +def render_get_op_intent_details(resp: GetOperationalIntentDetailsResponse): + ref = resp.operational_intent.reference + name = f"{ref.manager}'s P{priority_of(resp.operational_intent.details)} {ref.state.value} {ref.id}[{ref.version}] @ {ref.ovn}" + folder = kml.Folder(kml.name(name)) + if "volumes" in resp.operational_intent.details: + for i, v4_f3548 in enumerate(resp.operational_intent.details.volumes): + v4 = Volume4D.from_f3548v21(v4_f3548) + folder.append( + make_placemark_from_volume( + v4, + name=f"Nominal volume {i}", + style_url=f"#F3548v21{resp.operational_intent.reference.state.value}", + ) + ) + if "off_nominal_volumes" in resp.operational_intent.details: + for i, v4_f3548 in enumerate( + resp.operational_intent.details.off_nominal_volumes + ): + v4 = Volume4D.from_f3548v21(v4_f3548) + folder.append( + make_placemark_from_volume( + v4, + name=f"Off-nominal volume {i}", + style_url=f"#F3548v21{resp.operational_intent.reference.state.value}", + ) + ) + return [folder] + + +@query_kml_renderer(QueryType.InterUSSFlightPlanningV1UpsertFlightPlan) +def render_flight_planning_upsert_flight_plan( + req: UpsertFlightPlanRequest, resp: UpsertFlightPlanResponse +): + folder = kml.Folder( + kml.name( + f"Activity {resp.planning_result.value}, flight {resp.flight_plan_status.value}" + ) + ) + basic_info = req.flight_plan.basic_information + for i, v4_flight_planning in enumerate(basic_info.area): + v4 = Volume4D.from_flight_planning_api(v4_flight_planning) + folder.append( + make_placemark_from_volume( + v4, + name=f"Volume {i}", + style_url=f"#{basic_info.usage_state.value}_{basic_info.uas_state.value}", + ) + ) + return [folder] diff --git a/monitoring/uss_qualifier/reports/sequence_view/summary_types.py b/monitoring/uss_qualifier/reports/sequence_view/summary_types.py new file mode 100644 index 0000000000..85862595cf --- /dev/null +++ b/monitoring/uss_qualifier/reports/sequence_view/summary_types.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import List, Dict, Optional, Union + +from implicitdict import ImplicitDict + +from monitoring.monitorlib.fetch import Query +from monitoring.uss_qualifier.configurations.configuration import ( + ParticipantID, +) +from monitoring.uss_qualifier.reports.report import ( + PassedCheck, + FailedCheck, + ErrorReport, +) +from monitoring.uss_qualifier.scenarios.definitions import TestScenarioTypeName + + +class NoteEvent(ImplicitDict): + key: str + message: str + timestamp: datetime + + +class EventType(str, Enum): + PassedCheck = "PassedCheck" + FailedCheck = "FailedCheck" + Query = "Query" + Note = "Note" + + +class Event(ImplicitDict): + event_index: int = 0 + passed_check: Optional[PassedCheck] = None + failed_check: Optional[FailedCheck] = None + query_events: Optional[List[Union[Event, str]]] = None + query: Optional[Query] = None + note: Optional[NoteEvent] = None + + @property + def type(self) -> EventType: + if self.passed_check: + return EventType.PassedCheck + elif self.failed_check: + return EventType.FailedCheck + elif self.query: + return EventType.Query + elif self.note: + return EventType.Note + else: + raise ValueError("Invalid Event type") + + @property + def timestamp(self) -> datetime: + if self.passed_check: + return self.passed_check.timestamp.datetime + elif self.failed_check: + return self.failed_check.timestamp.datetime + elif self.query: + return self.query.request.timestamp + elif self.note: + return self.note.timestamp + else: + raise ValueError("Invalid Event type") + + def get_query_links(self) -> str: + links = [] + for e in self.query_events: + if isinstance(e, str): + links.append(e) + else: + links.append(f'{e.event_index}') + return ", ".join(links) + + +class TestedStep(ImplicitDict): + name: str + url: str + events: List[Event] + + @property + def rows(self) -> int: + return len(self.events) + + +class TestedCase(ImplicitDict): + name: str + url: str + steps: List[TestedStep] + + @property + def rows(self) -> int: + return sum(s.rows for s in self.steps) + + +class EpochType(str, Enum): + Case = "Case" + Events = "Events" + + +class Epoch(ImplicitDict): + case: Optional[TestedCase] = None + events: Optional[List[Event]] = None + + @property + def type(self) -> EpochType: + if self.case: + return EpochType.Case + elif self.events: + return EpochType.Events + else: + raise ValueError("Invalid Epoch did not specify case or events") + + @property + def rows(self) -> int: + if self.case: + return self.case.rows + elif self.events: + return len(self.events) + else: + raise ValueError("Invalid Epoch did not specify case or events") + + +@dataclass +class TestedParticipant(object): + has_failures: bool = False + has_infos: bool = False + has_successes: bool = False + has_queries: bool = False + + +@dataclass +class TestedScenario(object): + type: TestScenarioTypeName + name: str + url: str + scenario_index: int + duration: str + epochs: List[Epoch] + participants: Dict[ParticipantID, TestedParticipant] + execution_error: Optional[ErrorReport] + + @property + def rows(self) -> int: + return sum(c.rows for c in self.epochs) + + +@dataclass +class SkippedAction(object): + reason: str + + +class ActionNodeType(str, Enum): + Scenario = "Scenario" + Suite = "Suite" + ActionGenerator = "ActionGenerator" + SkippedAction = "SkippedAction" + + +class ActionNode(ImplicitDict): + name: str + node_type: ActionNodeType + children: List[ActionNode] + scenario: Optional[TestedScenario] = None + skipped_action: Optional[SkippedAction] = None + + @property + def rows(self) -> int: + return sum(c.rows for c in self.children) if self.children else 1 + + @property + def cols(self) -> int: + return 1 + max(c.cols for c in self.children) if self.children else 1 + + +@dataclass +class Indexer(object): + scenario_index: int = 1 + + +@dataclass +class SuiteCell(object): + node: Optional[ActionNode] + first_row: bool + rowspan: int = 1 + colspan: int = 1 + + +@dataclass +class OverviewRow(object): + suite_cells: List[SuiteCell] + scenario_node: Optional[ActionNode] = None + skipped_action_node: Optional[ActionNode] = None + filled: bool = False diff --git a/monitoring/uss_qualifier/reports/templates/sequence_view/scenario.html b/monitoring/uss_qualifier/reports/templates/sequence_view/scenario.html index c6a2a03d67..69c75b6e16 100644 --- a/monitoring/uss_qualifier/reports/templates/sequence_view/scenario.html +++ b/monitoring/uss_qualifier/reports/templates/sequence_view/scenario.html @@ -17,6 +17,11 @@

    {{ test_scenario.name }}

    {{ test_scenario.name }}

    {% endif %}

    {{ test_scenario.type }}

    + {% if kml_file %} +

    + KML visualization +

    + {% endif %} diff --git a/monitoring/uss_qualifier/test_data/make_flight_intent_kml.py b/monitoring/uss_qualifier/test_data/make_flight_intent_kml.py index f3b5795cf7..8588758d9a 100644 --- a/monitoring/uss_qualifier/test_data/make_flight_intent_kml.py +++ b/monitoring/uss_qualifier/test_data/make_flight_intent_kml.py @@ -6,14 +6,13 @@ import sys import arrow -from loguru import logger from lxml import etree from pykml.factory import KML_ElementMaker as kml from pykml.util import format_xml_with_cdata import yaml from implicitdict import ImplicitDict -from monitoring.monitorlib.geo import AltitudeDatum, Altitude, DistanceUnits +from monitoring.monitorlib.kml import make_placemark_from_volume, flight_planning_styles from monitoring.monitorlib.temporal import Time, TimeDuringTest from monitoring.uss_qualifier.fileio import load_dict_with_references, resolve_filename from monitoring.uss_qualifier.resources.flight_planning.flight_intent import ( @@ -37,31 +36,9 @@ def parse_args() -> argparse.Namespace: help="When start_of_test_run should be. Defaults to now.", ) - parser.add_argument( - "--geoid_offset", - default=None, - help="Height of the EGM96 geoid above the WGS84 ellipsoid in the area (meters). Can be obtained as 'EGM96' at https://geographiclib.sourceforge.io/cgi-bin/GeoidEval", - ) - return parser.parse_args() -def _altitude_mode_of(altitude: Altitude) -> str: - if altitude.reference == AltitudeDatum.W84: - return "absolute" - else: - raise NotImplementedError( - f"Altitude reference {altitude.reference} not yet supported" - ) - - -def _altitude_value_of(altitude: Altitude) -> float: - if altitude.units == DistanceUnits.M: - return altitude.value - else: - raise NotImplementedError(f"Altitude units {altitude.units} not yet supported") - - def main() -> int: args = parse_args() @@ -74,13 +51,6 @@ def main() -> int: TimeDuringTest.StartOfScenario: start_of_test_run, TimeDuringTest.TimeOfEvaluation: start_of_test_run, } - if args.geoid_offset is None: - logger.warning( - "geoid_offset was not provided. Assuming 0 offset, and this may cause altitude errors of up to tens of meters." - ) - geoid_offset = 0 - else: - geoid_offset = float(args.geoid_offset) raw = load_dict_with_references(path) collection: FlightIntentCollection = ImplicitDict.parse(raw, FlightIntentCollection) @@ -89,135 +59,33 @@ def main() -> int: folders = [] for name, template in flight_intents.items(): flight_intent = template.resolve(times) + folder = kml.Folder(kml.name(name)) + non_basic_info = json.loads( json.dumps( {k: v for k, v in flight_intent.items() if k != "basic_information"} ) ) - description = yaml.dump(non_basic_info) if non_basic_info else None - folder = kml.Folder(kml.name(name)) - basic_info = flight_intent.basic_information - for i, v4 in enumerate(basic_info.area): - if "outline_polygon" in v4.volume and v4.volume.outline_polygon: - # Create placemark - placemark = kml.Placemark( - kml.name(f"{name}: volume {i}"), - kml.styleUrl( - f"#{basic_info.usage_state.value}_{basic_info.uas_state.value}" - ), - ) - if description: - placemark.append(kml.description("
    " + description + "
    ")) - - # Set time range - timespan = None - if "time_start" in v4 and v4.time_start: - timespan = kml.TimeSpan( - kml.begin(v4.time_start.datetime.isoformat()) - ) - if "time_end" in v4 and v4.time_end: - if timespan is None: - timespan = kml.TimeSpan() - timespan.append(kml.end(v4.time_end.datetime.isoformat())) - if timespan is not None: - placemark.append(timespan) - - # Create top and bottom of the volume - vertices = v4.volume.outline_polygon.vertices - lower_coords = [] - upper_coords = [] - alt_lo = _altitude_value_of(v4.volume.altitude_lower) - geoid_offset - alt_hi = _altitude_value_of(v4.volume.altitude_upper) - geoid_offset - for vertex in vertices: - lower_coords.append((vertex.lng, vertex.lat, alt_lo)) - upper_coords.append((vertex.lng, vertex.lat, alt_hi)) - geo = kml.MultiGeometry( - kml.Polygon( - kml.altitudeMode(_altitude_mode_of(v4.volume.altitude_lower)), - kml.outerBoundaryIs( - kml.LinearRing( - kml.coordinates( - " ".join( - ",".join(str(v) for v in c) - for c in lower_coords - ) - ) - ) - ), - ), - kml.Polygon( - kml.altitudeMode(_altitude_mode_of(v4.volume.altitude_upper)), - kml.outerBoundaryIs( - kml.LinearRing( - kml.coordinates( - " ".join( - ",".join(str(v) for v in c) - for c in upper_coords - ) - ) - ) - ), - ), + if non_basic_info: + folder.append( + kml.Folder( + kml.name("Flight intent information"), + kml.description("
    " + yaml.dump(non_basic_info) + "
    "), ) + ) - # We can only create the sides of the volume if the altitude references are the same - if ( - v4.volume.altitude_lower.reference - == v4.volume.altitude_upper.reference - ): - indices = list(range(len(vertices))) - for i1, i2 in zip(indices, indices[1:] + [0]): - coords = [ - (vertices[i1].lng, vertices[i1].lat, alt_lo), - (vertices[i1].lng, vertices[i1].lat, alt_hi), - (vertices[i2].lng, vertices[i2].lat, alt_hi), - (vertices[i2].lng, vertices[i2].lat, alt_lo), - ] - geo.append( - kml.Polygon( - kml.altitudeMode( - _altitude_mode_of(v4.volume.altitude_lower) - ), - kml.outerBoundaryIs( - kml.LinearRing( - kml.coordinates( - " ".join( - ",".join(str(v) for v in c) - for c in coords - ) - ) - ) - ), - ) - ) - - placemark.append(geo) - folder.append(placemark) - else: - raise NotImplementedError("Volume footprint type not supported") + basic_info = flight_intent.basic_information + for i, v4 in enumerate(basic_info.area): + placemark = make_placemark_from_volume( + v4, + name=f"{name}: volume {i}", + style_url=f"#{basic_info.usage_state.value}_{basic_info.uas_state.value}", + ) + folder.append(placemark) folders.append(folder) doc = kml.kml( kml.Document( - kml.Style( - kml.LineStyle(kml.color("ff00c000"), kml.width(3)), - kml.PolyStyle(kml.color("80808080")), - id="Planned_Nominal", - ), - kml.Style( - kml.LineStyle(kml.color("ff00c000"), kml.width(3)), - kml.PolyStyle(kml.color("8000ff00")), - id="InUse_Nominal", - ), - kml.Style( - kml.LineStyle(kml.color("ff00ffff"), kml.width(5)), - kml.PolyStyle(kml.color("8000ff00")), - id="InUse_OffNominal", - ), - kml.Style( - kml.LineStyle(kml.color("ff0000ff"), kml.width(5)), - kml.PolyStyle(kml.color("8000ff00")), - id="InUse_Contingent", - ), + *flight_planning_styles(), *folders, ) ) diff --git a/schemas/monitoring/uss_qualifier/configurations/configuration/SequenceViewConfiguration.json b/schemas/monitoring/uss_qualifier/configurations/configuration/SequenceViewConfiguration.json index 844cf2a771..3919267511 100644 --- a/schemas/monitoring/uss_qualifier/configurations/configuration/SequenceViewConfiguration.json +++ b/schemas/monitoring/uss_qualifier/configurations/configuration/SequenceViewConfiguration.json @@ -10,6 +10,10 @@ "redact_access_tokens": { "description": "When True, look for instances of \"Authorization\" keys in the report with values starting \"Bearer \" and redact the signature from those access tokens", "type": "boolean" + }, + "render_kml": { + "description": "When True, visualize geographic data for each scenario as a KML file.", + "type": "boolean" } }, "type": "object"
    Case