Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Windows/Mac compatibility, implement the UBX-RXM-RAWX message, fix timeout bug. #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@
author='SparkFun Electronics',
author_email='info@sparkfun.com',

install_requires=[
],
install_requires=["pyserial", ],

# Choose your license
license='MIT',
Expand Down
17 changes: 17 additions & 0 deletions ublox_gps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
"""The core structure definitions"""

import struct
import time
from collections import namedtuple
from serial import SerialException
from typing import List, Iterator, Union

from serial.serialutil import SerialException

__all__ = ['PadByte', 'Field', 'Flag', 'BitField', 'RepeatedBlock', 'Message', 'Cls', 'Parser', ]


Expand Down Expand Up @@ -435,6 +439,15 @@ def _read_until(stream, terminator: bytes, size=None):
"""
term_len = len(terminator)
line = bytearray()

def check_timeout(start_time) -> bool:
"""Check to see if the timeout has been reached."""
if not hasattr(stream, "timeout"):
return False

return (time.time() - start_time) > stream.timeout

_read_start_time = time.time()
while True:
c = stream.read(1)
if c:
Expand All @@ -443,6 +456,10 @@ def _read_until(stream, terminator: bytes, size=None):
break
if size is not None and len(line) >= size:
break
elif check_timeout(_read_start_time):
raise SerialException(
"Failed to find terminator before timeout."
)
else:
break

Expand Down
47 changes: 46 additions & 1 deletion ublox_gps/sparkfun_predefines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from . import core

__all__ = ['ACK_CLS', 'CFG_CLS', 'ESF_CLS', 'INF_CLS', 'MGA_CLS', 'MON_CLS',
'NAV_CLS', 'TIM_CLS', ]
'NAV_CLS', 'TIM_CLS', 'RXM_CLS', ]

ACK_CLS = core.Cls(0x05, 'ACK', [
core.Message(0x01, 'ACK', [
Expand Down Expand Up @@ -1130,3 +1130,48 @@
]),
]),
])

# From the u-blox M8 protocol spec
# https://www.u-blox.com/sites/default/files/products/documents/u-blox8-M8_ReceiverDescrProtSpec_%28UBX-13003221%29.pdf
RXM_CLS = core.Cls(0x02, "RXM", [ # 32.18 UBX-RXM (0x02)
core.Message(0x15, "RAWX", [ # 32.18.4 UBX-RXM-RAWX (0x02 0x15)
core.Field("rcvTow", "R8"),
core.Field("week", "U2"),
core.Field("leapS", "I1"),
core.Field("numMeas", "U1"),
core.BitField("recStat", "X1", [
core.Flag("leapSec", 0, 1),
core.Flag("clkReset", 1, 2),
]),
core.Field("version", "U1"),
core.Field("reserved1a", "U1"), # Try U2?
core.Field("reserved1b", "U1"),
core.RepeatedBlock("RB", [
core.Field("prMeas", "R8"),
core.Field("cpMeas", "R8"),
core.Field("doMeas", "R4"),
core.Field("gnssId", "U1"),
core.Field("svId", "U1"),
core.Field("sigId", "U1"),
core.Field("freqId", "U1"),
core.Field("locktime", "U2"),
core.Field("cno", "U1"),
core.BitField("prStdev", "X1", [
core.Flag("prStd", 0, 4),
]),
core.BitField("cpStdev", "X1", [
core.Flag("cpStd", 0, 4),
]),
core.BitField("doStdev", "X1", [
core.Flag("doStd", 0, 4),
]),
core.BitField("trkStat", "X1", [
core.Flag("prValid", 0, 1),
core.Flag("coValid", 1, 2),
core.Flag("halfCyc", 2, 3),
core.Flag("subHalfCyc", 3, 4),
]),
core.Field("reserved2", "U1"),
]),
]),
])
165 changes: 120 additions & 45 deletions ublox_gps/ublox_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,23 @@

import struct
import serial
import spidev

from . import sparkfun_predefines as sp
from . import core

try:
import spidev
SPI_AVAILABLE = True
except ModuleNotFoundError as err:
import sys

# If the platform is MacOS or Windows
if sys.platform in ["darwin", "win32", ]:
print("spidev only available for linux")
SPI_AVAILABLE = False
else:
raise err

class UbloxGps(object):
"""
UbloxGps
Expand All @@ -58,7 +70,7 @@ class UbloxGps(object):

:param hard_port: The port to use to communicate with the module, this
can be a serial or SPI port. If no port is given, then the library
assumes serial at a 38400 baud rate.
assumes serial0 at a 38400 baud rate.

:return: The UbloxGps object.
:rtype: Object
Expand All @@ -67,12 +79,17 @@ class UbloxGps(object):
def __init__(self, hard_port = None):
if hard_port is None:
self.hard_port = serial.Serial("/dev/serial0/", 38400, timeout=1)
elif type(hard_port) == spidev.SpiDev:
sfeSpi = sfeSpiWrapper(hard_port)
self.hard_port = sfeSpi
else:
elif isinstance(hard_port, serial.Serial):
self.hard_port = hard_port

if SPI_AVAILABLE:
if type(hard_port) == spidev.SpiDev:
sfeSpi = sfeSpiWrapper(hard_port)
self.hard_port = sfeSpi

if not hasattr(self, "hard_port"):
raise IOError("Unable to connect to port: {}".format(hard_port))

# Class message values
self.ack_ms= {
'ACK':0x01, 'NAK':0x00
Expand Down Expand Up @@ -113,6 +130,9 @@ def __init__(self, hard_port = None):
self.time_ms= {
'TM2':0x03, 'TP':0x01, 'VRFY':0x06
}
self.rxm_ms= {
"RAWX":0x15,
}

def send_message(self, ubx_class, ubx_id, ubx_payload = None):
"""
Expand Down Expand Up @@ -356,6 +376,21 @@ def esf_status(self):
cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port)
return payload

def rawx_measurements(self):
"""
Sends a poll request for the RXM class with the RAWX measurements and
parses ublox messages for the response. The payload is extracted from
the response which is then passed to the user.

:return: The payload of the RXM Class and RAWX Message ID
:rtype: namedtuple
"""
self.send_message(sp.RXM_CLS, self.rxm_ms.get('RAWX'))
parse_tool = core.Parser([sp.RXM_CLS])
cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port)
s_payload = self.scale_RXM_RAWX(payload)
return s_payload

def port_settings(self):
"""
Sends a poll request for the MON class with the COMMS Message ID and
Expand Down Expand Up @@ -511,6 +546,23 @@ def module_software_version(self):
msg = parse_tool.receive_from(self.hard_port)
return msg

def get_ubx_rxm_rawx(self):
"""
Sends a poll request for the RXM class that contains the raw
pseudorange, carrier phase, and doppler information for each
satellite, among other information. This payload is extracted from the
response, scaled, and then passed to the user.

:return: The payload of the RXM Class and RAWX measurements
:rtype: namedtuples
"""
self.send_message(sp.RXM_CLS, self.rxm_ms.get("RAWX"))
parse_tool = core.Parser([sp.RXM_CLS])
cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port)
s_payload = self.scale_RXM_RAWX(payload)
return s_payload


def scale_NAV_ATT(self, nav_payload):
"""
This takes the UBX-NAV-ATT payload and scales the relevant fields
Expand Down Expand Up @@ -715,60 +767,83 @@ def scale_NAV_VALNED(self, nav_payload):
nav_payload = nav_payload._replace(heading= att_head * (10**-5))

return nav_payload

def scale_RXM_RAWX(self, nav_payload):
"""
This takes the UBX-RXM-RAWX payload and scales the relevant fields as
they're described in the datasheet.

:return: Scaled version of the given payload.
:rtype: namedtyple
"""
for _i, _rb in enumerate(nav_payload.RB):
_pr_std = _rb.prStdev
_pr_std = _pr_std._replace(prStd=_pr_std.prStd * 0.01 * (2 ** 4))

_cp_std = _rb.cpStdev
_cp_std = _cp_std._replace(cpStd=_cp_std.cpStd * 0.004)

_do_std = _rb.doStdev
_do_std = _do_std._replace(doStd=_do_std.doStd * 0.002 * (2 ** 4))

class sfeSpiWrapper(object):
"""
sfeSpiWrapper

Initialize the library with the given port.
nav_payload.RB[_i] = nav_payload.RB[_i]._replace(prStdev=_pr_std)
nav_payload.RB[_i] = nav_payload.RB[_i]._replace(cpStdev=_cp_std)
nav_payload.RB[_i] = nav_payload.RB[_i]._replace(doStdev=_do_std)

:param spi_port: This library simply provides some ducktyping for spi so
that the ubxtranslator library doesn't complain. It
takes a spi port and then sets it to the ublox module's
specifications.
return nav_payload

:return: The sfeSpiWrapper object.
:rtype: Object
"""

def __init__(self, spi_port = None):
if SPI_AVAILABLE:
class sfeSpiWrapper(object):
"""
sfeSpiWrapper

if spi_port is None:
self.spi_port = spidev.SpiDev()
else:
self.spi_port = spi_port
Initialize the library with the given port.

self.spi_port.open(0,0)
self.spi_port.max_speed_hz = 5500 #Hz
self.spi_port.mode = 0b00
:param spi_port: This library simply provides some ducktyping for spi so
that the ubxtranslator library doesn't complain. It
takes a spi port and then sets it to the ublox module's
specifications.

def read(self, read_data = 1):
:return: The sfeSpiWrapper object.
:rtype: Object
"""
Reads a byte or bytes of data from the SPI port. The bytes are
converted to a bytes object before being returned.

:return: The requested bytes
:rtype: bytes
"""
def __init__(self, spi_port = None):

data = self.spi_port.readbytes(read_data)
byte_data = bytes([])
for d in data:
byte_data = byte_data + bytes([d])
return byte_data
if spi_port is None:
self.spi_port = spidev.SpiDev()
else:
self.spi_port = spi_port

def write(self, data):
"""
Writes a byte or bytes of data to the SPI port.
self.spi_port.open(0,0)
self.spi_port.max_speed_hz = 5500 #Hz
self.spi_port.mode = 0b00

:return: True on completion
:rtype: boolean
"""
self.spi_port.xfer2(list(data))
def read(self, read_data = 1):
"""
Reads a byte or bytes of data from the SPI port. The bytes are
converted to a bytes object before being returned.

return True
:return: The requested bytes
:rtype: bytes
"""

data = self.spi_port.readbytes(read_data)
byte_data = bytes([])
for d in data:
byte_data = byte_data + bytes([d])
return byte_data

def write(self, data):
"""
Writes a byte or bytes of data to the SPI port.

:return: True on completion
:rtype: boolean
"""
self.spi_port.xfer2(list(data))

return True