Skip to content

Commit

Permalink
refactor: simplify serial test functions by removing unused parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
dido18 committed Jan 10, 2025
1 parent a16b5f5 commit ea54ce9
Showing 1 changed file with 19 additions and 74 deletions.
93 changes: 19 additions & 74 deletions tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,77 +49,28 @@ def test_list(socketio, message):
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_default(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "default")


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timed(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "timed")


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timedraw(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "timedraw")

def test_open_serial(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message)

# NOTE run the following tests with a board connected to the PC and with the sketch found in tests/testdata/SerialEcho.ino on it be sure to change serial_address in conftest.py
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_default(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "default")


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_timed(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "timed")
def test_send_serial(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message)


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_timedraw(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "timedraw")
def test_send_emoji_serial(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message)


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_default(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "default")


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_timed(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "timed")


@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_timedraw(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "timedraw")


def general_open_serial(socketio, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
def general_open_serial(socketio, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# test the closing of the serial port, we are gonna use close_port for the other tests
socketio.emit('command', 'close ' + serial_port)
time.sleep(.2)
Expand All @@ -128,42 +79,36 @@ def general_open_serial(socketio, serial_port, baudrate, message, buffertype):
assert any("\"IsOpen\": false," in i for i in message)


def general_send_serial(socketio, close_port, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
def general_send_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# send the string "ciao" using the serial connection
socketio.emit('command', 'send ' + serial_port + ' ciao')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " ciao" in i for i in message)
#check if message has been sent back by the connected board
if buffertype == "timedraw":
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
output = extract_serial_data(message)
assert "ciao" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure


def general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
def general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# send a lot of emoji: they can be messed up
socketio.emit('command', 'send ' + serial_port + ' /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
if buffertype == "timedraw":
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
output = extract_serial_data(message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure


def open_serial_port(socketio, serial_port, baudrate, message, buffertype):
#open a new serial connection with the specified buffertype
socketio.emit('command', 'open ' + serial_port + ' ' + baudrate + ' ' + buffertype)
def open_serial_port(socketio, serial_port, baudrate, message):
#open a new serial connection
socketio.emit('command', 'open ' + serial_port + ' ' + baudrate)
# give time to the message var to be filled
time.sleep(.5)
print(message)
Expand All @@ -176,7 +121,7 @@ def open_serial_port(socketio, serial_port, baudrate, message, buffertype):
reason="VMs have no serial ports",
)
def test_sendraw_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message, "timedraw")
open_serial_port(socketio, serial_port, baudrate, message)
#test with bytes
integers = [1, 2, 3, 4, 5]
bytes_array=bytearray(integers)
Expand All @@ -202,7 +147,7 @@ def extract_serial_data(msg):
serial_data+=json.loads(i)["D"]
print("serialdata:"+serial_data)
return serial_data

def decode_output(raw_output):
# print(raw_output)
base64_bytes = raw_output.encode('ascii') #encode rawoutput message into a bytes-like object
Expand Down

0 comments on commit ea54ce9

Please sign in to comment.