Skip to content

Commit

Permalink
Develop (#80)
Browse files Browse the repository at this point in the history
* delete device_info version

* add 8100Bx upgrade worker

* upgrade sdk_Bx

* add option to select config file

* cli app add para_path option

* fix 5.8.16 BA upgrade issue

* add test case to test ans-devices

* Update rtk_provider_base.py

Co-authored-by: yiweisong <48662698+yiweisong@users.noreply.github.com>
  • Loading branch information
daichenghe and yiweisong committed Mar 10, 2022
1 parent 8d2a9cd commit 9aa2aab
Show file tree
Hide file tree
Showing 13 changed files with 1,085 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/aceinna/bootstrap/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CommandLine:
def __init__(self, **kwargs):
self._build_options(**kwargs)
APP_CONTEXT.mode = APP_TYPE.CLI

APP_CONTEXT.para_path = kwargs['para_path']
# self.communication = 'uart'
# self.device_provider = None
# self.communicator = None
Expand Down
3 changes: 2 additions & 1 deletion src/aceinna/bootstrap/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Default:
def __init__(self, **kwargs):
self._build_options(**kwargs)
APP_CONTEXT.mode = APP_TYPE.DEFAULT

APP_CONTEXT.para_path = kwargs['para_path']

def listen(self):
'''
Prepare components, initialize the application
Expand Down
9 changes: 6 additions & 3 deletions src/aceinna/devices/base/rtk_provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,12 @@ def load_properties(self):
return

# Load the openimu.json based on its app
app_file_path = os.path.join(
self.setting_folder_path, product_name, app_name, self.config_file_name)

if APP_CONTEXT.para_path == None:
app_file_path = os.path.join(
self.setting_folder_path, product_name, app_name, self.config_file_name)
else:
app_file_path = os.path.join(
self.setting_folder_path, product_name, app_name, APP_CONTEXT.para_path)
if not self.is_app_matched:
print_yellow(
'Failed to extract app version information from unit.' +
Expand Down
6 changes: 3 additions & 3 deletions src/aceinna/devices/upgrade_workers/sdk_8100Bx_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,9 @@ def send_sync(self):

sync = [0xfd, 0xc6, 0x49, 0x28]
self._uart.write(sync)
self._uart.write(sync)
self._uart.write(sync)
self._uart.write(sync)
time.sleep(0.2)

return self.read_until([0x3A, 0x54, 0x2C, 0xA6], 100)
Expand Down Expand Up @@ -996,9 +999,6 @@ def work(self):
fs_len = len(self._file_content)
bin_info_list = self.get_bin_info_list(fs_len, self._file_content)

# if not self.connect_serail_port():
# return self._raise_error('Connect serial Port failed')

if not self.send_sync():
return self._raise_error('Sync failed')

Expand Down
7 changes: 7 additions & 0 deletions src/aceinna/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,12 @@ def mode(self):
def mode(self, value):
self._mode = value

@property
def para_path(self):
return self._para_path

@para_path.setter
def para_path(self, value):
self._para_path = value

APP_CONTEXT = AppContext()
2 changes: 2 additions & 0 deletions src/aceinna/framework/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def _build_args():
help="Contains internal data log (OpenIMU only)", default=False)
parser.add_argument("-s", "--set-user-para", dest='set_user_para', action='store_true',
help="Set user parameters (OpenRTK only)", default=False)
parser.add_argument("--para-path", dest="para_path", type=str,
metavar='')
parser.add_argument("--cli", dest='use_cli', action='store_true',
help="start as cli mode", default=False)

Expand Down
3 changes: 2 additions & 1 deletion src/aceinna/models/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class WebserverArgs(KeyValuesArgumentBase):
'console_log': False,
'set_user_para': False,
'ntrip_client': False,
'force_bootloader': False
'force_bootloader': False,
'para_path': None
}


Expand Down
81 changes: 81 additions & 0 deletions tests/test_ans_devices/StringStream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
class StringStream:
"""A simple class to hold text so that when passed
between functions, the object is passed by reference
and memory does not need to be repeatedly allocated for the string.
This class was written here to avoid adding a dependency
to the project.
"""

def __init__(self, raw_text, debug=False):
self.raw_text = raw_text
self.index = 0
self.len = len(raw_text)

def read(self, count):
"""Read count characters starting at self.index,
and return those characters as a string
"""
new_index = self.index + count
if new_index > self.len:
buf = self.raw_text[self.index :] # return to the end, don't fail
else:
buf = self.raw_text[self.index : new_index]
self.index = new_index

return buf

def seek(self, offset):
"""Advance the index of this StringStream by offset characters"""
self.index = self.index + offset

def advance_past_chars(self, chars):
"""Advance the index past specific chars
Args chars (list): list of characters to advance past
Return substring that was advanced past
"""
start_index = self.index
while True:
current_char = self.raw_text[self.index]
self.index += 1
if current_char in chars:
break

elif self.index == self.len:
break

return self.raw_text[start_index : self.index - 1]

def advance_past_string_with_gdb_escapes(self, chars_to_remove_gdb_escape=None):
"""characters that gdb escapes that should not be
escaped by this parser
"""

if chars_to_remove_gdb_escape is None:
chars_to_remove_gdb_escape = ['"']

buf = ""
while True:
c = self.raw_text[self.index]
self.index += 1

if c == "\\":
# We are on a backslash and there is another character after the backslash
# to parse. Handle this case specially since gdb escaped it for us

# Get the next char that is being escaped
c2 = self.raw_text[self.index]
self.index += 1
# only store the escaped character in the buffer; don't store the backslash
# (don't leave it escaped)
buf += c2

elif c == '"':
# Quote is closed. Exit (and don't include the end quote).
break

else:
# capture this character, and keep capturing
buf += c
return buf
11 changes: 11 additions & 0 deletions tests/test_ans_devices/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

DEFAULT_PROCESS_TIMEOUT_SEC = 1
DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC = 0.2
USING_WINDOWS = os.name == "nt"


class ProcessTimeoutError(ValueError):
"""Raised when no response is recieved from python driver after the timeout has been triggered"""

pass
192 changes: 192 additions & 0 deletions tests/test_ans_devices/driver_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""This module defines the `ProcessController` class
which runs app as a subprocess and can write to it and read from it to get
structured output.
"""

import logging
import subprocess
from distutils.spawn import find_executable
from typing import Union, List, Optional
from io_manager import IoManager
from constants import (
DEFAULT_PROCESS_TIMEOUT_SEC,
DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
)
import time
import process_parser
import sys

DEFAULT_PROCESS_LAUNCH_COMMAND = ["./ans-devices.exe", "--cli"]
logger = logging.getLogger(__name__)


class ProcessController:
def __init__(
self,
command,
time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
):
"""
Run a subprocess. Send commands and receive structured output.
Create new object, along with subprocess
Args:
command: Command to run in shell to spawn new subprocess
time_to_check_for_additional_output_sec: When parsing responses, wait this amout of time before exiting (exits before timeout is reached to save time). If <= 0, full timeout time is used.
Returns:
New ProcessController object
"""

if command is None:
command = DEFAULT_PROCESS_LAUNCH_COMMAND

# if not any([("--interpreter=mi" in c) for c in command]):
# logger.warning(
# "warning. "
# )
self.abs_app_path = None # abs path to executable
self.command = command # type: List[str]
self.time_to_check_for_additional_output_sec = (
time_to_check_for_additional_output_sec
)
self.app_process = None
self._allow_overwrite_timeout_times = (
self.time_to_check_for_additional_output_sec > 0
)
app_path = command.split(' ')[0]

if not app_path:
raise ValueError("a valid path to app must be specified")

else:
abs_app_path = find_executable(app_path)
if abs_app_path is None:
raise ValueError(
'executable could not be resolved from "%s"' % app_path
)

else:
self.abs_app_path = abs_app_path
self.spawn_new_subprocess()

def spawn_new_subprocess(self):
"""Spawn a new subprocess with the arguments supplied to the object
during initialization. If subprocess already exists, terminate it before
spanwing a new one.
Return int: process id
"""
if self.app_process:
logger.debug(
"Killing current subprocess (pid %d)" % self.app_process.pid
)
self.exit()

logger.debug(f'Launching app: {" ".join(self.command)}')
# print('xxxxxxxxxxxxxxxxxxxxx', self.command)
# Use pipes to the standard streams
self.app_process = subprocess.Popen(
self.command,
shell=False,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)

self.io_manager = IoManager(
self.app_process.stdin,
self.app_process.stdout,
self.app_process.stderr,
self.time_to_check_for_additional_output_sec,
)
return self.app_process.pid

def get_process_response(
self, timeout_sec: float = DEFAULT_PROCESS_TIMEOUT_SEC, raise_error_on_timeout=True
):
"""Get process response. See IoManager.get_process_response() for details"""
return self.io_manager.get_process_response(timeout_sec, raise_error_on_timeout)

def write(
self,
mi_cmd_to_write: Union[str, List[str]],
timeout_sec=DEFAULT_PROCESS_TIMEOUT_SEC,
raise_error_on_timeout: bool = True,
read_response: bool = True,
):
print('cmd: ', mi_cmd_to_write)
"""Write command to process. See IoManager.write() for details"""
return self.io_manager.write(
mi_cmd_to_write, timeout_sec, raise_error_on_timeout, read_response
)

def exit(self) -> None:
"""Terminate process"""
if self.app_process:
self.app_process.terminate()
self.app_process.communicate()
self.app_process = None
return None

if __name__ == '__main__':
if len(sys.argv) < 2:
print('input upgrade file')
exit(-1)
upgrade_file = sys.argv[1]
fs_log = open('./log.txt', 'w')
driver_cmd = './ans-devices.exe --cli'
app_handle = ProcessController(driver_cmd)
suc_count = 0
fail_count = 0
data = ''
process_ret = ''
while True:
while True:
try:
response = app_handle.get_process_response()
try:
process_ret += process_parser.get_gdb_response_str(response)
except Exception as e:
print(e)
if 'Connected' in process_ret:
print('python drivder connected...')
fs_log.write(process_ret)
process_ret = ''
break
except Exception as e:
time.sleep(1)
print('wait to connect')
time.sleep(2)
while True:
response = app_handle.write('upgrade {0}'.format(upgrade_file), 1, read_response = False)
time_used = 0
while True:

try:
response = app_handle.get_process_response()
try:
process_ret += process_parser.get_gdb_response_str(response)
except Exception as e:
print(e)
if 'RTK_INS App' in process_ret:
print('upgrade suc...')
suc_count+= 1
break
elif 'failed' in process_ret:
print('upgrade fail...')
fail_count+= 1
break
except Exception as e:
time.sleep(1)
time_used+= 2
print("\rtime used: %ds" %(time_used), end="")
if time_used > 600:
print('time out')
time_used = 0
fail_count+= 1
break

print('suc_count = {0}, fail_count = {1}'.format(suc_count, fail_count))
fs_log.write(process_ret)
time.sleep(5)
process_ret = ''
Loading

0 comments on commit 9aa2aab

Please sign in to comment.