Skip to content

Commit

Permalink
network: add tcp test
Browse files Browse the repository at this point in the history
JIRA: CI-343
  • Loading branch information
adamdebek committed Feb 21, 2024
1 parent 42f07d1 commit 9616f35
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 0 deletions.
5 changes: 5 additions & 0 deletions network/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
NAME := test-network
LOCAL_SRCS := test-network.c
DEP_LIBS := unity

include $(binary.mk)
167 changes: 167 additions & 0 deletions network/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import random
import time
import re
import socket
import subprocess

from typing import Optional
import psh.tools.psh as psh
from trunner.ctx import TestContext
from trunner.dut import Dut
from trunner.types import TestResult, Status


def host_interface():
out = subprocess.run(
"ifconfig",
encoding="ascii",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
)

eth_device = re.findall("eth[0-9]:.+\n.+[0-9]+", out.stdout)
inet = re.findall("inet [0-9.]+", str(eth_device))
netmask = re.findall("netmask [0-9.]+", str(eth_device))

host_setup = inet + netmask
return host_setup


def en_setup(
p,
eth="en1",
up: Optional[bool] = True,
wan: Optional[bool] = False,
ip: Optional[str] = None,
dynamic: Optional[bool] = False,
):
"""
This setup is used to create links with the ethernet interface using ifconfig and route.
It is strictly connected with the host IP because the setup will take it as a new IP for the
target with light modification.
up: It indicates that you want this interface to be active.
wan: It sets the default gateway using host parameters
ip: It is determinate custom set ip for interface
dynamic: If True it will set ifconfig with dynamic flag, without netmask and ip
"""

host_ip, host_netmask = host_interface()

if ip is None:
ip_split = re.findall(r"([0-9]{1,3})", host_ip)
if int(ip_split[3]) >= 100:
ip_split[3] = "150"
else:
ip_split[3] = "1" + ip_split[3]

desired_target_ip = ".".join(ip_split)
ip_split[3] = "1"
default_gw = ".".join(ip_split)

else:
desired_target_ip = ip
ip_split = re.findall(r"([0-9]{1,3})", ip)
ip_split[3] = "1"
default_gw = ".".join(ip_split)

ifconfig_setup_cmd = f"ifconfig {eth} {desired_target_ip} {host_netmask} "

if wan:
route_setup_cmd = f"route add default gw {default_gw} {eth}"
else:
route_setup_cmd = f"route add -net {desired_target_ip} {eth}"

Check warning on line 73 in network/network.py

View workflow job for this annotation

GitHub Actions / build (3.9)

F841 local variable 'route_setup_cmd' is assigned to but never used

Check warning on line 73 in network/network.py

View workflow job for this annotation

GitHub Actions / build (3.10)

F841 local variable 'route_setup_cmd' is assigned to but never used

Check warning on line 73 in network/network.py

View workflow job for this annotation

GitHub Actions / build (3.11)

F841 local variable 'route_setup_cmd' is assigned to but never used

if up:
ifconfig_setup_cmd += "up"

if not dynamic:
# psh._send(p, ifconfig_setup_cmd + '\n')
psh.assert_prompt_after_cmd(p, ifconfig_setup_cmd, "success")
psh.assert_prompt_after_cmd(p, 'ls', "success")
else:
psh.assert_prompt_after_cmd(p, "ifconfig en1 dynamic", "success")

time.sleep(1)


def con_setup(dut: Dut, host_ip, host_port):
print("con setup")
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

if host_socket.bind((host_ip, host_port)) is not None:
print('bind failed')

host_socket.listen()

dut.sendline('/bin/test-network ' + host_ip + '\n')

peer_socket, peer_address = host_socket.accept()

host_socket.close()

return peer_socket, peer_address


def harness(dut: Dut, ctx: TestContext, result: TestResult):
assert_re = r"ASSERTION (?P<path>[\S]+):(?P<line>\d+):(?P<status>FAIL|INFO|IGNORE): (?P<msg>.*?)\r"
result_re = r"TEST\((?P<group>\w+), (?P<name>\w+)\) (?P<status>PASS|IGNORE)"
# Fail need to have its own regex due to greedy matching
result_fail_re = r"TEST\((?P<group>\w+), (?P<name>\w+)\) (?P<status>FAIL) at (?P<path>.*?):(?P<line>\d+)\r"
final_re = r"(?P<total>\d+) Tests (?P<fail>\d+) Failures (?P<ignore>\d+) Ignored \r+\n(?P<result>OK|FAIL)"

last_assertion = {}
stats = {"FAIL": 0, "IGNORE": 0, "PASS": 0}
results = []
timeout_val = 30
if ctx.nightly:
timeout_val = 60

ipv4_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'

host_ip = re.findall(ipv4_pattern, host_interface()[0])[0]
en_setup(dut)
peer_socket, peer_address = con_setup(dut, host_ip, 1025)
random_bytes = bytes([random.randint(0, 255) for _ in range(128)])
peer_socket.send(random_bytes)

while True:
idx = dut.expect([assert_re, result_re, result_fail_re, final_re], timeout=timeout_val)
parsed = dut.match.groupdict()

if idx == 0:
if parsed["status"] in ["FAIL", "IGNORE"]:
last_assertion = parsed
elif idx in (1, 2):
if last_assertion:
parsed["msg"] = last_assertion["msg"]
last_assertion = {}

status = Status.from_str(parsed["status"])
subname = f"{parsed['group']}.{parsed['name']}"
if "path" in parsed and "line" in parsed:
parsed["msg"] = f"[{parsed['path']}:{parsed['line']}] " + parsed["msg"]
result.add_subresult(subname, status, parsed.get("msg", ""))

stats[parsed["status"]] += 1
results.append(parsed)
elif idx == 3:
for k, v in parsed.items():
if k != "result":
parsed[k] = int(v)

assert (
parsed["total"] == sum(stats.values())
and parsed["fail"] == stats["FAIL"]
and parsed["ignore"] == stats["IGNORE"]
), "".join(("There is a mismatch between the number of parsed tests and overall results!\n",
"Parsed results from the final Unity message (total, failed, ignored): ",
f"{parsed['total']}, {parsed['fail']}, {parsed['ignore']}\n",
"Found test summary lines (total, failed, ignored): ",
f"{sum(stats.values())}, {stats['FAIL']}, {stats['IGNORE']}"))

break

status = Status.FAIL if stats["FAIL"] != 0 else Status.OK
return TestResult(status=status)
110 changes: 110 additions & 0 deletions network/test-network.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Phoenix-RTOS
*
* test-network-client
*
* network tests
*
* Copyright 2024 Phoenix Systems
* Author: Adam Debek
*
* This file is part of Phoenix-RTOS.
*
* %LICENSE%
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <unity_fixture.h>

int8_t send_data[128];
int8_t recv_data[128];
char *peer_ip;


TEST_GROUP(test_network);


TEST_SETUP(test_network)
{
}


TEST_TEAR_DOWN(test_network)
{
}


TEST(test_network, basic)
{
int fd;
size_t n, r, i;
struct sockaddr_in si;

memset(&si, 0, sizeof si);
si.sin_family = AF_INET;
si.sin_addr.s_addr = inet_addr(peer_ip);
si.sin_port = htons(1025);

fd = socket(AF_INET, SOCK_STREAM, 0);
TEST_ASSERT(fd > 0);

if (connect(fd, (struct sockaddr *)&si, sizeof(si)) < 0) {
perror("connect");
FAIL("connect");
}

r = read(fd, recv_data, sizeof(recv_data));
if (r < 0 && errno == EPIPE) {
FAIL("peer closed connection");
}
TEST_ASSERT(r == sizeof(recv_data));

srandom(time(NULL));

for (i = 0; i < sizeof(send_data); i++) {
send_data[i] = recv_data[i] + 1;
}

n = send(fd, send_data, sizeof(send_data), MSG_NOSIGNAL);
if (n < 0 && errno == EPIPE) {
FAIL("peer closed connection");
}
TEST_ASSERT(n == sizeof(send_data));

close(fd);
}


TEST_GROUP_RUNNER(test_network)
{
RUN_TEST_CASE(test_network, basic);
}

void runner(void)
{
RUN_TEST_GROUP(test_network);
}


int main(int argc, char *argv[])
{
if (argc == 2 && argv[1] != NULL) {
peer_ip = argv[1];
}
else {
fprintf(stderr, "Usage: %s <ip>\n", argv[0]);
exit(EXIT_FAILURE);
}

UnityMain(argc, (const char **)argv, runner);

return 0;
}
8 changes: 8 additions & 0 deletions network/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
test:
type: harness
harness: network.py
reboot: True
targets:
value: [armv7a7-imx6ull-evk]
tests:
- name: test-network

0 comments on commit 9616f35

Please sign in to comment.