Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add a PacketCollection class to work with multiple packets #110

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion space_packet_parser/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import lxml.etree as ElementTree
# Local
from space_packet_parser.exceptions import ElementNotFoundError, InvalidParameterTypeError
from space_packet_parser import comparisons, parameters, packets
from space_packet_parser import comparisons, encodings, parameters, packets

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,6 +353,63 @@ def _get_container_base_container(
restrictions = []
return self._find_container(base_container_element.attrib['containerRef']), restrictions

def _get_minimum_numpy_datatype(self, name: str, *, raw_value: bool = False) -> str | None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where this made the most sense... Should we actually add the _min_dtype to the encoding classes themselves? The issue I see is that I think it would have to be a method/property because the size_in_bits are dynamic and not known until read in.

# too-many-branches
"""
Get the minimum datatype for a given variable.

Parameters
----------
name : str
The variable name.
raw_value : bool, default False
Whether or not the raw value from the XTCE definition should be used.

Returns
-------
datatype : str
The minimum datatype.
"""
data_encoding = self.named_parameters[name].parameter_type.encoding

if isinstance(data_encoding, encodings.NumericDataEncoding):
if not raw_value and (
data_encoding.context_calibrators is not None
or data_encoding.default_calibrator is not None
):
# If there are calibrators, we need to default to None and
# let numpy infer the datatype
return None

if isinstance(data_encoding, encodings.IntegerDataEncoding):
if data_encoding.encoding == "unsigned":
datatype = "uint"
else:
datatype = "int"
else: # FloatDataEncoding
datatype = "float"

nbits = data_encoding.size_in_bits
if nbits <= 8:
datatype += "8"
elif nbits <= 16:
datatype += "16"
elif nbits <= 32:
datatype += "32"
else:
datatype += "64"
elif isinstance(data_encoding, encodings.BinaryDataEncoding):
# TODO: Use the new StringDType instead
# or try to use frombuffer and create an array of uint8 values for each byte
datatype = "object"
elif isinstance(data_encoding, encodings.StringDataEncoding):
# TODO: Use the new StringDType instead?
datatype = "str"
else:
raise ValueError(f"Unsupported data encoding: {data_encoding}")

return datatype

@staticmethod
def _parse_header(packet_data: bytes) -> dict:
"""Parses the CCSDS standard header.
Expand Down
176 changes: 175 additions & 1 deletion space_packet_parser/packets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@

"""Packet containers and parsing utilities for space packets."""

from collections import defaultdict, Counter
from dataclasses import dataclass, field
from typing import List, Optional, Protocol, Union
from pathlib import Path
from typing import Iterable, List, Optional, Protocol, Union

# Check if extra libraries are available
try:
import numpy as np
_NP_AVAILABLE = True
except ImportError:
_NP_AVAILABLE = False

try:
import xarray as xr
_XR_AVAILABLE = True
except ImportError:
_XR_AVAILABLE = False

BuiltinDataTypes = Union[bytes, float, int, str]

Expand Down Expand Up @@ -157,6 +172,165 @@ def user_data(self) -> dict:
return dict(list(self.items())[7:])


class PacketCollection(list):
"""Stores a list of packets."""
def __init__(
self,
packets: Iterable[CCSDSPacket],
*,
# TODO: Figure out typing with imports from definitions causing circular imports
# definitions.XtcePacketDefinition | None
packet_definition=None,
):
"""
Create a PacketCollection.

Parameters
----------
apid_dict : dict
Mapping of APID to a list of packets with that apid.
packet_definition : XtcePacketDefinition
The packet definition to use for this collection.
"""
super().__init__(packets)
self.packet_definition = packet_definition

def __str__(self):
apids = Counter(packet["PKT_APID"] for packet in self)
return (f"<PacketCollection>: {len(self)} packets\n"
+ "Packets per apid (apid: npackets)\n"
+ "\n".join(f" {apid}: {count}" for apid, count in apids.items()))

@classmethod
def from_packet_file(
cls,
packet_file: str | Path,
# TODO: Figure out typing with imports from definitions causing circular imports
# str | Path | definitions.XtcePacketDefinition | None
packet_definition=None,
) -> "PacketCollection":
"""
Create a PacketCollection from a packet file.

Parameters
----------
packet_file : str
Path to a file containing CCSDS packets.
packet_definition : str or Path or XtcePacketDefinition, optional
XTCE packet definition, or the path to the XTCE packet definition file.

Returns
-------
packet_collection : PacketCollection
A list of packets grouped together.
"""
# TODO: Bring this import to the top of the file once circular dependencies are resolved
from space_packet_parser import definitions
if packet_definition is not None and not isinstance(packet_definition, definitions.XtcePacketDefinition):
# We got the path to a packet definition, so read it in
packet_definition = definitions.XtcePacketDefinition(packet_definition)

with open(packet_file, "rb") as binary_data:
# packet_generator = packets.packet_generator(binary_data, definition=packet_definition)
packet_generator = packet_definition.packet_generator(binary_data)
return cls(packet_generator, packet_definition=packet_definition)

def to_numpy(self, variable, raw_value=False):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this to output a recarray/multi-dimensional array based on variables in the packets similar to the xarray dataset (i.e. shape: (npackets, nvariables))? Right now, I made it just do one specific variable a user requests.

"""Turn the requested variable into a numpy array.

Parameters
----------
raw_value : bool, default False
Whether or not to use the raw value from the packet.

Returns
-------
data : numpy.ndarray
A numpy array of values for the requested variable.
"""
if not _NP_AVAILABLE:
raise ImportError("Numpy is required to use this function, you can install it with `pip install numpy`.")
data = [packet[variable].raw_value if raw_value else packet[variable]
for packet in self
if variable in packet]
if self.packet_definition is not None:
min_dtype = self.packet_definition._get_minimum_numpy_datatype(variable, raw_value=raw_value)
else:
min_dtype = None
return np.array(data, dtype=min_dtype)

def to_xarray(self, *, apid=None, raw_value=False, ignore_header=False):
"""Turn this collection into an xarray dataset.

The collection must have a single apid to be turned into a dataset, or
the desired apid must be specified. The collection must have a consistent
structure across all packets with that apid (i.e. it cannot be a nested
packet structure).

Parameters
----------
apid : int, optional
Turn this specific apid into a dataset, by default None
raw_value : bool, optional
_description_, by default False
ignore_header : bool, optional
_description_, by default False
"""
if not _XR_AVAILABLE:
raise ImportError("Xarray is required to use this function, you can install it with `pip install xarray`.")
if len(self) == 0:
return xr.Dataset()

# Create a mapping of {variables: [values]}}
variable_dict = defaultdict(list)
# Keep track of the packet number for the coordinate
# useful if we have interspersed packets with different APIDs
packet_number = []

if apid is None:
apid = self[0]["PKT_APID"]
if any(packet["PKT_APID"] != apid for packet in self):
raise ValueError("All packets must have the same APID to convert to an xarray dataset.")

for i, packet in enumerate(self):
if packet["PKT_APID"] != apid:
continue
packet_number.append(i)

if ignore_header:
packet_content = packet.user_data
else:
packet_content = packet

if len(variable_dict):
# TODO: Can we relax this requirement and combine the variables together somehow?
if variable_dict.keys() != packet_content.keys():
raise ValueError("All packets must have the same variables to convert to an xarray dataset. "
"This likely means that the packet definition has a nested packet structure "
"with variables spread across multiple packets.")

for key, value in packet_content.items():
if raw_value:
value = value.raw_value
variable_dict[key].append(value)

ds = xr.Dataset(
{
variable: (
"packet",
np.asarray(list_of_values, dtype=self.packet_definition._get_minimum_numpy_datatype(
variable, raw_value=raw_value)),
)
for variable, list_of_values in variable_dict.items()
},
# Default to packet number as the coordinate
# TODO: Allow a user to specify this as a keyword argument?
# Or give an example of how to change this after the fact
coords={"packet": packet_number},
)
return ds


class Parseable(Protocol):
"""Defines an object that can be parsed from packet data."""
def parse(self, packet: CCSDSPacket, **parse_value_kwargs) -> None:
Expand Down
Loading