From c3b625401dd9a99b4a3ecaad0c347558f00b2f8a Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Thu, 4 Jul 2024 16:39:43 +0200 Subject: [PATCH] add parameter type --- .github/workflows/windows-test.yml | 2 +- example/boolean.py | 3 +- example/example.py | 122 ++++--------- example/logo_7_8.py | 4 +- example/read_multi.py | 9 +- example/write_multi.py | 2 +- snap7/__init__.py | 15 +- snap7/{client/__init__.py => client.py} | 147 +++++++--------- snap7/common.py | 53 +----- snap7/{util => }/db.py | 221 +++++++++++++++++++----- snap7/error.py | 66 +++++++ snap7/exceptions.py | 4 - snap7/logo.py | 26 ++- snap7/partner.py | 40 ++--- snap7/server/__init__.py | 55 +++--- snap7/types.py | 87 ++++++---- snap7/util/__init__.py | 151 +--------------- snap7/util/getters.py | 63 ++++--- snap7/util/setters.py | 43 +++-- tests/test_client.py | 108 +++++------- tests/test_logo_client.py | 63 +++---- tests/test_mainloop.py | 16 +- tests/test_partner.py | 60 +++---- tests/test_server.py | 15 +- tests/test_util.py | 86 ++++----- tox.ini | 2 +- 26 files changed, 696 insertions(+), 767 deletions(-) rename snap7/{client/__init__.py => client.py} (92%) rename snap7/{util => }/db.py (79%) delete mode 100644 snap7/exceptions.py diff --git a/.github/workflows/windows-test.yml b/.github/workflows/windows-test.yml index 794d8f8d..882c8bb2 100644 --- a/.github/workflows/windows-test.yml +++ b/.github/workflows/windows-test.yml @@ -32,4 +32,4 @@ jobs: run: python3.exe -m pip install .[test] - name: Run tests - run: python3 -m pytest -m "server or util or client or mainloop or partner" + run: python3 -m pytest -m "server or db or client or mainloop or partner" diff --git a/example/boolean.py b/example/boolean.py index ffcce3c1..4200421a 100644 --- a/example/boolean.py +++ b/example/boolean.py @@ -18,7 +18,6 @@ """ import snap7 -import snap7.util.setters plc = snap7.client.Client() plc.connect("192.168.200.24", 0, 3) @@ -37,7 +36,7 @@ # play with these functions. -plc.read_area(area=Area.MK, dbnumber=0, start=20, size=2) +plc.read_area(area=Area.MK, db_number=0, start=20, size=2) data = bytearray() snap7.util.setters.set_int(data, 0, 127) diff --git a/example/example.py b/example/example.py index 3c886c77..9bb16cd7 100644 --- a/example/example.py +++ b/example/example.py @@ -1,29 +1,21 @@ +""" +This is an example of how to use the snap7 library to read and write data to a PLC. +It is used to manipulate a large DB object containing over 450 'rows' which represent valves +""" + import time -import snap7.util.db from db_layouts import rc_if_db_1_layout from db_layouts import tank_rc_if_db_layout -import snap7 -from snap7 import util - -print(""" - -THIS IS EXAMPLE CODE MEANTH TO BE READ. - -It is used to manipulate a large DB object with over -450 'rows' which represent valves - -You don't have a project and PLC like I have which I used -to create the test code with. +from snap7 import Client, Row, DB +from snap7.db import print_row -""") - -client = snap7.client.Client() +client = Client() client.connect("192.168.200.24", 0, 3) -def get_db1(): +def get_db1() -> None: """ Here we read out DB1, all data we is put in the all_data variable and is a bytearray with the raw plc data @@ -34,38 +26,10 @@ def get_db1(): row_size = 130 # size of item index = i * row_size offset = index + row_size # end of row in db - util.print_row(all_data[index:offset]) - + print_row(all_data[index:offset]) -def get_db_row(db, start, size): - """ - Here you see and example of readying out a part of a DB - Args: - db (int): The db to use - start (int): The index of where to start in db data - size (int): The size of the db data to read - """ - type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] - data = client.db_read(db, start, type_, size) - # print_row(data[:60]) - return data - - -def set_db_row(db, start, size, _bytearray): - """ - Here we replace a piece of data in a db block with new data - - Args: - db (int): The db to use - start(int): The start within the db - size(int): The size of the data in bytes - _butearray (enumerable): The data to put in the db - """ - client.db_write(db, start, size, _bytearray) - - -def show_row(x): +def show_row(x: int) -> None: """ print data in DB of row/object X in """ @@ -73,8 +37,8 @@ def show_row(x): row_size = 126 while True: - data = get_db_row(1, 4 + x * row_size, row_size) - row = snap7.util.db.DB_Row(data, rc_if_db_1_layout, layout_offset=4) + data = client.db_read(1, 4 + x * row_size, row_size) + row = Row(data, rc_if_db_1_layout, layout_offset=4) print("name", row["RC_IF_NAME"]) print(row["RC_IF_NAME"]) break @@ -83,24 +47,24 @@ def show_row(x): # do some check action.. -def get_row(x): +def get_row(x: int) -> Row: row_size = 126 - data = get_db_row(1, 4 + x * row_size, row_size) - row = snap7.util.db.DB_Row(data, rc_if_db_1_layout, layout_offset=4) + data = client.db_read(1, 4 + x * row_size, row_size) + row = Row(data, rc_if_db_1_layout, layout_offset=4) return row -def set_row(x, row): +def set_row(x: int, row: Row) -> None: """ We use db 1, use offset 4, we replace row x. To find the correct start_index we mulitpy by row_size by x and we put the byte array representation of row in the PLC """ row_size = 126 - set_db_row(1, 4 + x * row_size, row_size, row._bytearray) + client.db_write(1, 4 + x * row_size, row_size, row._bytearray) -def open_row(row): +def open_row(row: Row) -> None: """ open a valve """ @@ -115,13 +79,8 @@ def open_row(row): row["CloseAut"] = 0 row["OpenAut"] = 1 - # row['StartAut'] = True - # row['StopAut'] = False - # row['RstLi'] = True - # row['StringValue'] = 'test' - -def close_row(row): +def close_row(row: Row) -> None: """ close a valve """ @@ -132,11 +91,7 @@ def close_row(row): row["OpenAut"] = 0 -# show_row(0) -# show_row(1) - - -def open_and_close(): +def open_and_close() -> None: for x in range(450): row = get_row(x) open_row(row) @@ -150,18 +105,17 @@ def open_and_close(): set_row(x, row) -def set_part_db(start, size, _bytearray): +def set_part_db(start: int, size: int, _bytearray: bytearray) -> None: data = _bytearray[start : start + size] - set_db_row(1, start, size, data) + client.db_write(1, start, size, data) -def write_data_db(dbnumber, all_data, size): - area = snap7.types.S7AreaDB - dbnumber = 1 - client.write_area(area, dbnumber, 0, size, all_data) +# def write_data_db(dbnumber, all_data, size): +# area = snap7.types.S7AreaDB +# client.write_area(area, dbnumber, 0, size, all_data) -def open_and_close_db1(): +def open_and_close_db1() -> None: t = time.time() db1 = make_item_db(1) all_data = db1._bytearray @@ -172,7 +126,7 @@ def open_and_close_db1(): # set_part_db(4+x*126, 126, all_data) t = time.time() - write_data_db(1, all_data, 4 + 126 * 450) + client.write_area(1, all_data, 4 + 126 * 450) print(f"opening all valves took: {time.time() - t}") print("sleep...") @@ -184,24 +138,24 @@ def open_and_close_db1(): print(time.time() - t) t = time.time() - write_data_db(1, all_data, 4 + 126 * 450) + client.write_area(1, all_data, 4 + 126 * 450) print(f"closing all valves took: {time.time() - t}") -def read_tank_db(): +def read_tank_db() -> None: db73 = make_tank_db() print(len(db73)) for x, (name, row) in enumerate(db73): print(row) -def make_item_db(db_number): +def make_item_db(db_number: int) -> DB: t = time.time() - all_data = client.db_upload(db_number) + all_data = client.upload(db_number) print(f"getting all data took: {time.time() - t}") - db1 = snap7.util.db.DB( + db1 = DB( db_number, # the db we use all_data, # bytearray from the plc rc_if_db_1_layout, # layout specification @@ -216,18 +170,18 @@ def make_item_db(db_number): return db1 -def make_tank_db(): - tank_data = client.db_upload(73) - db73 = snap7.util.db.DB(73, tank_data, tank_rc_if_db_layout, 238, 2, id_field="RC_IF_NAME") +def make_tank_db() -> DB: + tank_data = client.upload(73) + db73 = DB(73, tank_data, tank_rc_if_db_layout, 238, 2, id_field="RC_IF_NAME") return db73 -def print_tag(): +def print_tag() -> None: db1 = make_item_db(1) print(db1["5V315"]) -def print_open(): +def print_open() -> None: db1 = make_item_db(1) for x, (name, row) in enumerate(db1): if row["BatchName"]: diff --git a/example/logo_7_8.py b/example/logo_7_8.py index f7903f25..4e3fb86b 100644 --- a/example/logo_7_8.py +++ b/example/logo_7_8.py @@ -1,6 +1,6 @@ import logging -import snap7 +from snap7.logo import Logo # for setup the Logo connection please follow this link # https://snap7.sourceforge.net/logo.html @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -plc = snap7.logo.Logo() +plc = Logo() plc.connect("192.168.0.41", 0x1000, 0x2000) if plc.get_connected(): diff --git a/example/read_multi.py b/example/read_multi.py index e783b1d9..ea03bb18 100644 --- a/example/read_multi.py +++ b/example/read_multi.py @@ -6,11 +6,12 @@ import ctypes -import snap7.util.getters -from snap7.common import check_error +from snap7 import Client +from error import check_error from snap7.types import S7DataItem, Area, WordLen +from snap7.util import get_real, get_int -client = snap7.client.Client() +client = Client() client.connect("10.100.5.2", 0, 2) data_items = (S7DataItem * 3)() @@ -53,7 +54,7 @@ result_values = [] # function to cast bytes to match data_types[] above -byte_to_value = [snap7.util.getters.get_real, snap7.util.getters.get_real, snap7.util.getters.get_int] +byte_to_value = [get_real, get_real, get_int] # unpack and test the result of each read for i in range(0, len(data_items)): diff --git a/example/write_multi.py b/example/write_multi.py index 22c15e6c..b5e39a3f 100644 --- a/example/write_multi.py +++ b/example/write_multi.py @@ -1,7 +1,7 @@ import ctypes import snap7 from snap7.types import Area, S7DataItem, WordLen -from snap7.util import set_int, set_real, get_int, get_real, get_s5time +from snap7.db import set_int, set_real, get_int, get_real, get_s5time client = snap7.client.Client() diff --git a/snap7/__init__.py b/snap7/__init__.py index 36c6e0ee..9c62a6e8 100644 --- a/snap7/__init__.py +++ b/snap7/__init__.py @@ -4,15 +4,14 @@ from importlib.metadata import version, PackageNotFoundError -from . import client -from . import common -from . import error -from . import logo -from . import server -from . import types -from . import util +from .client import Client +from .server import Server +from .logo import Logo +from .partner import Partner +from .db import Row, DB +from .types import Area, Block, WordLen, SrvEvent, SrvArea -__all__ = ["client", "common", "error", "logo", "server", "types", "util"] +__all__ = ["Client", "Server", "Logo", "Partner", "Row", "DB", "Area", "Block", "WordLen", "SrvEvent", "SrvArea"] try: __version__ = version("python-snap7") diff --git a/snap7/client/__init__.py b/snap7/client.py similarity index 92% rename from snap7/client/__init__.py rename to snap7/client.py index d6ee7764..d3ab9c23 100644 --- a/snap7/client/__init__.py +++ b/snap7/client.py @@ -4,32 +4,24 @@ import re import logging -from ctypes import CFUNCTYPE, byref, create_string_buffer, sizeof, c_int16 +from ctypes import CFUNCTYPE, byref, create_string_buffer, sizeof from ctypes import Array, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p from datetime import datetime -from typing import Any, Callable, Hashable, List, Optional, Tuple, Union, Type +from typing import Any, Callable, List, Optional, Tuple, Union, Type + +from .error import error_wrap, check_error from types import TracebackType -from ..common import check_error, ipv4, load_library -from ..protocol import Snap7CliProtocol -from ..types import S7SZL, Area, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem, Block -from ..types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen -from ..types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types -from ..types import RemotePort, CDataArrayType +from snap7.common import ipv4, load_library +from snap7.protocol import Snap7CliProtocol +from snap7.types import S7SZL, Area, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem, Block +from snap7.types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen +from snap7.types import S7Object, buffer_size, buffer_type, cpu_statuses +from snap7.types import CDataArrayType, Parameter logger = logging.getLogger(__name__) -def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: - """Parses a s7 error code returned the decorated function.""" - - def f(*args: tuple[Any, ...], **kwargs: dict[Hashable, Any]) -> None: - code = func(*args, **kwargs) - check_error(code, context="client") - - return f - - class Client: """ A snap7 client @@ -84,7 +76,7 @@ def __del__(self) -> None: def create(self) -> None: """Creates a SNAP7 client.""" logger.info("creating snap7 client") - self._lib.Cli_Create.restype = S7Object # type: ignore[attr-defined] + self._lib.Cli_Create.restype = S7Object self._s7_client = S7Object(self._lib.Cli_Create()) def destroy(self) -> Optional[int]: @@ -172,7 +164,7 @@ def get_cpu_info(self) -> S7CpuInfo: check_error(result, context="client") return info - @error_wrap + @error_wrap(context="client") def disconnect(self) -> int: """Disconnect a client. @@ -182,15 +174,15 @@ def disconnect(self) -> int: logger.info("disconnecting snap7 client") return self._lib.Cli_Disconnect(self._s7_client) - @error_wrap - def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int: + @error_wrap(context="client") + def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> int: """Connects a Client Object to a PLC. Args: address: IP address of the PLC. rack: rack number where the PLC is located. slot: slot number where the CPU is located. - tcpport: port of the PLC. + tcp_port: port of the PLC. Returns: Error code from snap7 library. @@ -200,9 +192,9 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int >>> client = snap7.client.Client() >>> client.connect("192.168.0.1", 0, 0) # port is implicit = 102. """ - logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}") + logger.info(f"connecting to {address}:{tcp_port} rack {rack} slot {slot}") - self.set_param(number=RemotePort, value=tcpport) + self.set_param(parameter=Parameter.RemotePort, value=tcp_port) return self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot)) def db_read(self, db_number: int, start: int, size: int) -> bytearray: @@ -235,14 +227,14 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray: check_error(result, context="client") return bytearray(data) - @error_wrap + @error_wrap(context="client") def db_write(self, db_number: int, start: int, data: bytearray) -> int: """Writes a part of a DB into a PLC. Args: - db_number: number of the DB to be read. + db_number: number of the DB to be written. start: byte index to start writing to. - data: buffer to be write. + data: buffer to be writen. Returns: Buffer written. @@ -315,7 +307,7 @@ def upload(self, block_num: int) -> bytearray: logger.info(f"received {size} bytes") return bytearray(_buffer) - @error_wrap + @error_wrap(context="client") def download(self, data: bytearray, block_num: int = -1) -> int: """Download a block into AG. A whole block (including header and footer) must be available into the @@ -340,7 +332,7 @@ def db_get(self, db_number: int) -> bytearray: """Uploads a DB from AG using DBRead. Note: - This method can't be use for 1200/1500 PLCs. + This method can't be used for 1200/1500 PLCs. Args: db_number: db number to be read from. @@ -363,26 +355,23 @@ def db_get(self, db_number: int) -> bytearray: return bytearray(_buffer) def read_area(self, area: Area, db_number: int, start: int, size: int) -> bytearray: - """Reads a data area from a PLC - With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters. + """Read a data area from a PLC + + With this you can read DB, Inputs, Outputs, Merkers, Timers and Counters. Args: area: area to be read from. - db_number: number of the db to be read from. In case of Inputs, Marks or Outputs, this should be equal to 0. + db_number: The DB number, only used when area=Areas.DB start: byte index to start reading. size: number of bytes to read. Returns: Buffer with the data read. - Raises: - :obj:`ValueError`: if the area is not defined in the `Areas` - Example: - >>> import snap7.util.db - >>> import snap7 + >>> from snap7 import Client, Area >>> Client().connect("192.168.0.1", 0, 0) - >>> buffer = Client().read_area(snap7.util.db.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14. + >>> buffer = Client().read_area(Area.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14. >>> buffer bytearray(b'\\x00\\x00') """ @@ -404,7 +393,7 @@ def read_area(self, area: Area, db_number: int, start: int, size: int) -> bytear check_error(result, context="client") return bytearray(data) - @error_wrap + @error_wrap(context="client") def write_area(self, area: Area, db_number: int, start: int, data: bytearray) -> int: """Writes a data area into a PLC. @@ -418,7 +407,7 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray) -> Snap7 error code. Exmaple: - >>> from snap7.util.db import DB + >>> from db import DB >>> import snap7 >>> client = snap7.client.Client() >>> client.connect("192.168.0.1", 0, 0) @@ -540,7 +529,7 @@ def get_block_info(self, block_type: Block, db_number: int) -> TS7BlockInfo: check_error(result, context="client") return data - @error_wrap + @error_wrap(context="client") def set_session_password(self, password: str) -> int: """Send the password to the PLC to meet its security level. @@ -557,7 +546,7 @@ def set_session_password(self, password: str) -> int: raise ValueError("Maximum password length is 8") return self._lib.Cli_SetSessionPassword(self._s7_client, c_char_p(password.encode())) - @error_wrap + @error_wrap(context="client") def clear_session_password(self) -> int: """Clears the password set for the current session (logout). @@ -651,7 +640,7 @@ def ab_write(self, start: int, data: bytearray) -> int: logger.debug(f"ab write: start: {start}: size: {size}: ") return self._lib.Cli_ABWrite(self._s7_client, start, size, byref(cdata)) - def as_ab_read(self, start: int, size: int, data: Union[Array[c_byte], Array[c_int16], Array[c_int32]]) -> int: + def as_ab_read(self, start: int, size: int, data: Union[Array[c_byte], CDataArrayType]) -> int: """Reads a part of IPU area from a PLC asynchronously. Args: @@ -758,7 +747,7 @@ def as_db_fill(self, db_number: int, filler: int) -> int: check_error(result, context="client") return result - def as_db_get(self, db_number: int, _buffer: CDataArrayType, size: int) -> int: + def as_db_get(self, db_number: int, data: CDataArrayType, size: int) -> int: """Uploads a DB from AG using DBRead. Note: @@ -766,13 +755,13 @@ def as_db_get(self, db_number: int, _buffer: CDataArrayType, size: int) -> int: Args: db_number: number of DB to get. - _buffer: buffer where the data read will be place. + data: buffer where the data read will be place. size: amount of bytes to be read. Returns: Snap7 code. """ - result = self._lib.Cli_AsDBGet(self._s7_client, db_number, byref(_buffer), byref(c_int(size))) + result = self._lib.Cli_AsDBGet(self._s7_client, db_number, byref(data), byref(c_int(size))) check_error(result, context="client") return result @@ -803,10 +792,10 @@ def as_db_write(self, db_number: int, start: int, size: int, data: CDataArrayTyp """Writes a part of a DB into a PLC. Args: - db_number: number of DB to be write. + db_number: number of DB to be writen. start: byte index from where start to write to. size: amount of bytes to write. - data: buffer to be write. + data: buffer to be writen. Returns: Snap7 code. @@ -835,7 +824,7 @@ def as_download(self, data: bytearray, block_num: int) -> int: check_error(result) return result - @error_wrap + @error_wrap(context="client") def compress(self, time: int) -> int: """Performs the Compress action. @@ -847,34 +836,32 @@ def compress(self, time: int) -> int: """ return self._lib.Cli_Compress(self._s7_client, time) - @error_wrap - def set_param(self, number: int, value: int) -> int: + @error_wrap(context="client") + def set_param(self, parameter: Parameter, value: int) -> int: """Writes an internal Server Parameter. Args: - number: number of argument to be written. + parameter: the parameter to be written. value: value to be written. Returns: Snap7 code. """ - logger.debug(f"setting param number {number} to {value}") - type_ = param_types[number] - return self._lib.Cli_SetParam(self._s7_client, number, byref(type_(value))) + logger.debug(f"setting param number {parameter} to {value}") + return self._lib.Cli_SetParam(self._s7_client, parameter, byref(parameter.ctype(value))) - def get_param(self, number: int) -> int: + def get_param(self, parameter: Parameter) -> int: """Reads an internal Server parameter. Args: - number: number of argument to be read. + parameter: number of argument to be read. Return: Value of the param read. """ - logger.debug(f"retrieving param number {number}") - type_ = param_types[number] - value = type_() - code = self._lib.Cli_GetParam(self._s7_client, c_int(number), byref(value)) + logger.debug(f"retrieving param number {parameter}") + value = parameter.ctype() + code = self._lib.Cli_GetParam(self._s7_client, c_int(parameter), byref(value)) check_error(code) return value.value @@ -914,7 +901,7 @@ def get_plc_datetime(self) -> datetime: year=buffer[5] + 1900, month=buffer[4] + 1, day=buffer[3], hour=buffer[2], minute=buffer[1], second=buffer[0] ) - @error_wrap + @error_wrap(context="client") def set_plc_datetime(self, dt: datetime) -> int: """Sets the PLC date/time with a given value. @@ -1136,33 +1123,33 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int: check_error(result, context="client") return result - def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size: int) -> int: + def as_read_szl(self, id_: int, index: int, data: S7SZL, size: int) -> int: """Reads a partial list of given ID and Index. Args: - ssl_id: TODO - index: TODO - s7_szl: TODO - size: TODO + id_: The list ID + index: The list index + data: the user buffer + size: buffer size available Returns: Snap7 code. """ - result = self._lib.Cli_AsReadSZL(self._s7_client, ssl_id, index, byref(s7_szl), byref(c_int(size))) + result = self._lib.Cli_AsReadSZL(self._s7_client, id_, index, byref(data), byref(c_int(size))) check_error(result, context="client") return result - def as_read_szl_list(self, szl_list: S7SZLList, items_count: int) -> int: + def as_read_szl_list(self, data: S7SZLList, items_count: int) -> int: """Reads the list of partial lists available in the CPU. Args: - szl_list: TODO - items_count: TODO + data: the user buffer list + items_count: buffer capacity Returns: Snap7 code. """ - result = self._lib.Cli_AsReadSZLList(self._s7_client, byref(szl_list), byref(c_int(items_count))) + result = self._lib.Cli_AsReadSZLList(self._s7_client, byref(data), byref(c_int(items_count))) check_error(result, context="client") return result @@ -1198,7 +1185,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int: check_error(result) return result - def as_upload(self, block_num: int, _buffer: CDataArrayType, size: int) -> int: + def as_upload(self, block_num: int, data: CDataArrayType, size: int) -> int: """Uploads a block from AG. Note: @@ -1206,13 +1193,13 @@ def as_upload(self, block_num: int, _buffer: CDataArrayType, size: int) -> int: Args: block_num: block number to upload. - _buffer: buffer where the data will be place. - size: amount of bytes to uplaod. + data: buffer where the data will be place. + size: amount of bytes to upload. Returns: Snap7 code. """ - result = self._lib.Cli_AsUpload(self._s7_client, Block.DB.ctype, block_num, byref(_buffer), byref(c_int(size))) + result = self._lib.Cli_AsUpload(self._s7_client, Block.DB.ctype, block_num, byref(data), byref(c_int(size))) check_error(result, context="client") return result @@ -1446,11 +1433,11 @@ def mb_write(self, start: int, size: int, data: bytearray) -> int: check_error(result) return result - def read_szl(self, ssl_id: int, index: int = 0x0000) -> S7SZL: + def read_szl(self, id_: int, index: int = 0) -> S7SZL: """Reads a partial list of given ID and Index. Args: - ssl_id: ssl id to be read. + id_: ssl id to be read. index: index to be read. Returns: @@ -1458,7 +1445,7 @@ def read_szl(self, ssl_id: int, index: int = 0x0000) -> S7SZL: """ s7_szl = S7SZL() size = c_int(sizeof(s7_szl)) - result = self._lib.Cli_ReadSZL(self._s7_client, ssl_id, index, byref(s7_szl), byref(size)) + result = self._lib.Cli_ReadSZL(self._s7_client, id_, index, byref(s7_szl), byref(size)) check_error(result, context="client") return s7_szl diff --git a/snap7/common.py b/snap7/common.py index 48493d50..794f5248 100644 --- a/snap7/common.py +++ b/snap7/common.py @@ -3,8 +3,7 @@ import pathlib import platform from pathlib import Path -from ctypes import Array, c_char, c_int, c_int32 -from typing import Callable, Literal, NoReturn, Optional, cast +from typing import NoReturn, Optional, cast from ctypes.util import find_library from functools import cache from .protocol import Snap7CliProtocol @@ -86,53 +85,3 @@ def load_library(lib_location: Optional[str] = None) -> Snap7CliProtocol: _raise_error() return cast(Snap7CliProtocol, cdll.LoadLibrary(lib_location)) - - -Context = Literal["client", "server", "partner"] - - -@cache -def check_error(code: int, context: Context = "client") -> None: - """Check if the error code is set. If so, a Python log message is generated - and an error is raised. - - Args: - code: error code number. - context: context in which is called. - - Raises: - RuntimeError: if the code exists and is different from 1. - """ - if code and code != 1: - error = error_text(code, context) - logger.error(error) - raise RuntimeError(error) - - -def error_text(error: int, context: Context = "client") -> bytes: - """Returns a textual explanation of a given error number - - Args: - error: an error integer - context: context in which is called from, server, client or partner - - Returns: - The error. - - Raises: - TypeError: if the context is not in `["client", "server", "partner"]` - """ - if context not in ("client", "server", "partner"): - raise TypeError(f"Unkown context {context} used, should be either client, server or partner") - logger.debug(f"error text for {hex(error)}") - len_ = 1024 - text_type = c_char * len_ - text = text_type() - library = load_library() - error_text_func: Callable[[c_int32, Array[c_char], c_int], int] = { - "client": library.Cli_ErrorText, - "server": library.Srv_ErrorText, - "partner": library.Par_ErrorText, - }[context] - error_text_func(c_int32(error), text, c_int(len_)) - return text.value diff --git a/snap7/util/db.py b/snap7/db.py similarity index 79% rename from snap7/util/db.py rename to snap7/db.py index 6cc57b56..a1cff1a4 100644 --- a/snap7/util/db.py +++ b/snap7/db.py @@ -1,14 +1,114 @@ +""" +This module contains utility functions for working with PLC DB objects. +There are functions to work with the raw bytearray data snap7 functions return +In order to work with this data you need to make python able to work with the +PLC bytearray data. + +For example code see test_util.py and example.py in the example folder. + + +example:: + + spec/DB layout + + # Byte index Variable name Datatype + layout=\"\"\" + 4 ID INT + 6 NAME STRING[6] + + 12.0 test_bool1 BOOL + 12.1 test_bool2 BOOL + 12.2 test_bool3 BOOL + 12.3 test_bool4 BOOL + 12.4 test_bool5 BOOL + 12.5 test_bool6 BOOL + 12.6 test_bool7 BOOL + 12.7 test_bool8 BOOL + 13 testReal REAL + 17 testDword DWORD + \"\"\" + + client = snap7.client.Client() + client.connect('192.168.200.24', 0, 3) + + # this looks confusing but this means uploading from the PLC to YOU + # so downloading in the PC world :) + + all_data = client.upload(db_number) + + simple: + + from snap7 import DB + db1 = DB( + db_number, # the db we use + all_data, # bytearray from the plc + layout, # layout specification DB variable data + # A DB specification is the specification of a + # DB object in the PLC you can find it using + # the dataview option on a DB object in PCS7 + + 17+2, # size of the specification 17 is start + # of last value + # which is a DWORD which is 2 bytes, + + 1, # number of row's / specifications + + id_field='ID', # field we can use to identify a row. + # default index is used + layout_offset=4, # sometimes specification does not start a 0 + # like in our example + db_offset=0 # At which point in 'all_data' should we start + # reading. This could be that the specification + # does not start at 0 + ) + + Now we can use db1 in python as a dict. if 'ID' contains + the 'test' we can identify the 'test' row in the all_data bytearray + + To test of you layout matches the data from the plc you can + just print db1[0] or db['test'] in the example + + db1['test']['test_bool1'] = 0 + + If we do not specify an id_field this should work to read out the + same data. + + db1[0]['test_bool1'] + + to read and write a single Row from the plc. takes like 5ms! + + db1['test'].write() + + db1['test'].read(client) + + +""" + import re -from collections import OrderedDict -from datetime import datetime, date, timedelta -from typing import Any, Iterator, Optional, Tuple, Union, Dict, Callable from logging import getLogger +from datetime import datetime, date +from typing import Any, Optional, Union, Iterator, Tuple, Dict, Callable -from snap7.client import Client -from snap7.types import Area +from snap7 import Client +from snap7.types import Area, ValueType -from snap7.util import parse_specification -from snap7.util.getters import ( +from snap7.util import ( + set_bool, + set_fstring, + set_string, + set_real, + set_dword, + set_udint, + set_dint, + set_uint, + set_int, + set_word, + set_byte, + set_usint, + set_sint, + set_time, + set_lreal, + set_date, get_bool, get_fstring, get_string, @@ -33,27 +133,60 @@ get_wchar, get_dtl, ) -from snap7.util.setters import ( - set_bool, - set_fstring, - set_string, - set_real, - set_dword, - set_udint, - set_dint, - set_uint, - set_int, - set_word, - set_byte, - set_usint, - set_sint, - set_time, -) -from snap7.util.setters import set_lreal, set_date logger = getLogger(__name__) -ValueType = Union[int, float, str, datetime, bytearray, bytes, date, timedelta] + +def parse_specification(db_specification: str) -> Dict[str, Any]: + """Create a db specification derived from a + dataview of a db in which the byte layout + is specified + + Args: + db_specification: string formatted table with the indexes, aliases and types. + + Returns: + Parsed DB specification. + """ + parsed_db_specification = {} + + for line in db_specification.split("\n"): + if line and not line.lstrip().startswith("#"): + index, var_name, _type = line.lstrip().split("#")[0].split() + parsed_db_specification[var_name] = (index, _type) + + return parsed_db_specification + + +def print_row(data: bytearray) -> None: + """print a single db row in chr and str""" + index_line = "" + pri_line1 = "" + chr_line2 = "" + matcher = re.compile("[a-zA-Z0-9 ]") + + for i, xi in enumerate(data): + # index + if not i % 5: + diff = len(pri_line1) - len(index_line) + index_line += diff * " " + index_line += str(i) + # i = i + (ws - len(i)) * ' ' + ',' + + # byte array line + str_v = str(xi) + pri_line1 += str(xi) + "," + # char line + c = chr(xi) + c = c if matcher.match(c) else " " + # align white space + w = len(str_v) + c = c + (w - 1) * " " + "," + chr_line2 += c + + print(index_line) + print(pri_line1) + print(chr_line2) class DB: @@ -82,7 +215,7 @@ class DB: Examples: >>> db = DB() - >>> db[0]['testbool1'] = "test" + >>> db[0]['test_bool1'] = "test" >>> db.write(Client()) # puts data in plc """ @@ -93,7 +226,7 @@ class DB: layout_offset: int = 0 # at which byte in row specification should db_offset: int = 0 # at which byte in db should we start reading? - # first fields could be be status data. + # first fields could be status data. # and only the last part could be control data # now you can be sure you will never overwrite # critical parts of db @@ -140,7 +273,7 @@ def __init__( self.specification = specification # loop over bytearray. make rowObjects # store index of id_field to row objects - self.index: OrderedDict[str, DB_Row] = OrderedDict() + self.index: Dict[str, Row] = {} self.make_rows() def make_rows(self) -> None: @@ -155,7 +288,7 @@ def make_rows(self) -> None: # calculate where row in bytearray starts db_offset = i * (row_size + row_offset) + self.db_offset # create a row object - row = DB_Row( + row = Row( self, specification, row_size=row_size, @@ -172,7 +305,7 @@ def make_rows(self) -> None: logger.error(msg) self.index[key] = row - def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, "DB_Row"]: + def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, "Row"]: """Access a row of the table through its index. Rows (values) are of type :class:`DB_Row`. @@ -217,8 +350,8 @@ def items(self) -> Iterator[Tuple[str, Any]]: """ yield from self.index.items() - def export(self) -> OrderedDict[str, Any]: - """Export the object to an :class:`OrderedDict`, where each item in the dictionary + def export(self) -> Dict[str, Any]: + """Export the object to a dict, where each item in the dictionary has an index as the key, and the value of the DB row associated with that index as a value, represented itself as a :class:`dict` (as returned by :func:`DB_Row.export`). @@ -228,7 +361,7 @@ def export(self) -> OrderedDict[str, Any]: Notes: This function effectively returns a snapshot of the DB. """ - ret = OrderedDict() + ret = {} for k, v in self.items(): ret[k] = v.export() return ret @@ -268,7 +401,6 @@ def read(self, client: Client) -> None: for i, b in enumerate(bytearray_): self._bytearray[i + self.db_offset] = b - # todo: optimize by only rebuilding the index instead of all the DB_Row objects self.index.clear() self.make_rows() @@ -303,8 +435,11 @@ def write(self, client: Client) -> None: else: client.write_area(self.area, 0, self.db_offset, data) + def get_bytearray(self) -> bytearray: + return self._bytearray + -class DB_Row: +class Row: """ Provide ROW API for DB bytearray @@ -314,7 +449,7 @@ class DB_Row: """ bytearray_: bytearray # data of reference to parent DB - _specification: OrderedDict[str, Any] = OrderedDict() # row specification + _specification: Dict[str, Any] = {} # row specification def __init__( self, @@ -333,7 +468,7 @@ def __init__( _specification: row specification layout. row_size: Amount of bytes of the row. db_offset: at which byte in the db starts reading. - layout_offset: at which byte in the row specificaion we + layout_offset: at which byte in the row specification we start reading the data. row_offset: offset between rows. area: which memory area this row is representing. @@ -344,7 +479,7 @@ def __init__( self.db_offset = db_offset # start point of row data in db self.layout_offset = layout_offset # start point of row data in layout - self.row_size = row_size # lenght of the read + self.row_size = row_size # length of the read self.row_offset = row_offset # start of writable part of row self.area = area @@ -360,7 +495,7 @@ def get_bytearray(self) -> bytearray: Buffer data corresponding to the row. """ if isinstance(self._bytearray, DB): - return self._bytearray._bytearray + return self._bytearray.get_bytearray() return self._bytearray def export(self) -> Dict[str, Union[str, int, float, bool, datetime]]: @@ -395,7 +530,7 @@ def unchanged(self, bytearray_: bytearray) -> bool: bytearray_: buffer of data to check. Returns: - True if the current `bytearray_` is equal to the new one. Otherwise is False. + True if the current `bytearray_` is equal to the new one. Otherwise, this is False. """ return self.get_bytearray() == bytearray_ @@ -421,7 +556,7 @@ def get_value(self, byte_index: Union[str, int], type_: str) -> ValueType: type_: type of data to read. Raises: - :obj:`ValueError`: if reading a `string` when checking the lenght of the string. + :obj:`ValueError`: if reading a `string` when checking the length of the string. :obj:`ValueError`: if the `type_` is not handled. Returns: @@ -607,3 +742,7 @@ def read(self, client: Client) -> None: # replace data in bytearray for i, b in enumerate(bytearray_): data[i + self.db_offset] = b + + +# backwards compatible alias +DBRow = Row diff --git a/snap7/error.py b/snap7/error.py index 24ea573e..fe4884e2 100644 --- a/snap7/error.py +++ b/snap7/error.py @@ -6,6 +6,14 @@ so we are using that now. But maybe we will use this in the future again. """ +from _ctypes import Array +from ctypes import c_char, c_int32, c_int +from functools import cache +from typing import Callable, Any, Hashable + +from .common import logger, load_library +from .types import Context + s7_client_errors = { 0x00100000: "errNegotiatingPDU", 0x00200000: "errCliInvalidParams", @@ -102,3 +110,61 @@ server_errors = s7_server_errors.copy() server_errors.update(isotcp_errors) server_errors.update(tcp_errors) + + +def error_wrap(context: Context) -> Callable[..., Callable[..., None]]: + """Parses a s7 error code returned the decorated function.""" + + def middle(func: Callable[..., int]) -> Any: + def inner(*args: tuple[Any, ...], **kwargs: dict[Hashable, Any]) -> None: + code = func(*args, **kwargs) + check_error(code, context=context) + + return inner + + return middle + + +@cache +def check_error(code: int, context: Context = "client") -> None: + """Check if the error code is set. If so, a Python log message is generated + and an error is raised. + + Args: + code: error code number. + context: context in which is called. + + Raises: + RuntimeError: if the code exists and is different from 1. + """ + if code and code != 1: + error = error_text(code, context) + logger.error(error) + raise RuntimeError(error) + + +def error_text(error: int, context: Context = "client") -> bytes: + """Returns a textual explanation of a given error number + + Args: + error: an error integer + context: context in which is called from, server, client or partner + + Returns: + The error. + + Raises: + TypeError: if the context is not in `["client", "server", "partner"]` + """ + logger.debug(f"error text for {hex(error)}") + len_ = 1024 + text_type = c_char * len_ + text = text_type() + library = load_library() + error_text_func: Callable[[c_int32, Array[c_char], c_int], int] = { + "client": library.Cli_ErrorText, + "server": library.Srv_ErrorText, + "partner": library.Par_ErrorText, + }[context] + error_text_func(c_int32(error), text, c_int(len_)) + return text.value diff --git a/snap7/exceptions.py b/snap7/exceptions.py deleted file mode 100644 index cf024bcb..00000000 --- a/snap7/exceptions.py +++ /dev/null @@ -1,4 +0,0 @@ -class Snap7Exception(Exception): - """ - A Snap7 specific exception. - """ diff --git a/snap7/logo.py b/snap7/logo.py index 2827f112..f860178e 100644 --- a/snap7/logo.py +++ b/snap7/logo.py @@ -7,9 +7,9 @@ import logging from ctypes import byref, c_int, c_int32, c_uint16 -from .types import WordLen, S7Object, param_types -from .types import RemotePort, Area -from .common import ipv4, check_error, load_library +from .types import WordLen, S7Object, Parameter, Area +from .common import ipv4, load_library +from .error import check_error logger = logging.getLogger(__name__) @@ -81,7 +81,7 @@ def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int # special handling for Siemens Logo # 1st set connection params # 2nd connect without any parameters - self.set_param(RemotePort, tcpport) + self.set_param(Parameter.RemotePort, tcpport) self.set_connection_params(ip_address, tsap_snap7, tsap_logo) result = self.library.Cli_Connect(self.pointer) check_error(result, context="client") @@ -303,23 +303,22 @@ def get_connected(self) -> bool: check_error(result, context="client") return bool(connected) - def set_param(self, number: int, value: int) -> int: + def set_param(self, parameter: Parameter, value: int) -> int: """Sets an internal Server object parameter. Args: - number: Parameter type number + parameter: Parameter to be set value: Parameter value Returns: Error code from snap7 library. """ - logger.debug(f"setting param number {number} to {value}") - type_ = param_types[number] - result = self.library.Cli_SetParam(self.pointer, number, byref(type_(value))) + logger.debug(f"setting param number {parameter} to {value}") + result = self.library.Cli_SetParam(self.pointer, parameter, byref(parameter.ctype(value))) check_error(result, context="client") return result - def get_param(self, number: int) -> int: + def get_param(self, parameter: Parameter) -> int: """Reads an internal Logo object parameter. Args: @@ -328,9 +327,8 @@ def get_param(self, number: int) -> int: Returns: Parameter value """ - logger.debug(f"retreiving param number {number}") - type_ = param_types[number] - value = type_() - code = self.library.Cli_GetParam(self.pointer, c_int(number), byref(value)) + logger.debug(f"retreiving param number {parameter}") + value = parameter.ctype() + code = self.library.Cli_GetParam(self.pointer, c_int(parameter), byref(value)) check_error(code) return value.value diff --git a/snap7/partner.py b/snap7/partner.py index d2c197a2..c1c7bf8c 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -11,25 +11,16 @@ import re import logging from ctypes import byref, c_int, c_int32, c_uint32, c_void_p -from typing import Any, Callable, Hashable, Optional, Tuple +from typing import Optional, Tuple -from .common import ipv4, check_error, load_library +from .common import ipv4, load_library +from .error import check_error, error_wrap from .protocol import Snap7CliProtocol -from .types import S7Object, param_types, word +from .types import S7Object, word, Parameter logger = logging.getLogger(__name__) -def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: - """Parses a s7 error code returned the decorated function.""" - - def f(*args: tuple[Any, ...], **kwargs: dict[Hashable, Any]) -> None: - code = func(*args, **kwargs) - check_error(code, context="partner") - - return f - - class Partner: """ A snap7 partner. @@ -121,14 +112,13 @@ def get_last_error(self) -> c_int32: check_error(result, "partner") return error - def get_param(self, number: int) -> int: + def get_param(self, parameter: Parameter) -> int: """ Reads an internal Partner object parameter. """ - logger.debug(f"retreiving param number {number}") - type_ = param_types[number] - value = type_() - code = self._library.Par_GetParam(self._pointer, c_int(number), byref(value)) + logger.debug(f"retreiving param number {parameter}") + value = parameter.ctype() + code = self._library.Par_GetParam(self._pointer, c_int(parameter), byref(value)) check_error(code) return value.value @@ -165,11 +155,11 @@ def get_times(self) -> Tuple[c_int32, c_int32]: check_error(result, "partner") return send_time, recv_time - @error_wrap - def set_param(self, number: int, value: int) -> int: + @error_wrap(context="partner") + def set_param(self, parameter: Parameter, value: int) -> int: """Sets an internal Partner object parameter.""" - logger.debug(f"setting param number {number} to {value}") - return self._library.Par_SetParam(self._pointer, c_int(number), byref(c_int(value))) + logger.debug(f"setting param number {parameter} to {value}") + return self._library.Par_SetParam(self._pointer, c_int(parameter), byref(c_int(value))) def set_recv_callback(self) -> int: """ @@ -185,7 +175,7 @@ def set_send_callback(self) -> int: """ return self._library.Par_SetSendCallback(self._pointer) - @error_wrap + @error_wrap(context="partner") def start(self) -> int: """ Starts the Partner and binds it to the specified IP address and the @@ -193,7 +183,7 @@ def start(self) -> int: """ return self._library.Par_Start(self._pointer) - @error_wrap + @error_wrap(context="partner") def start_to(self, local_ip: str, remote_ip: str, local_tsap: int, remote_tsap: int) -> int: """ Starts the Partner and binds it to the specified IP address and the @@ -220,7 +210,7 @@ def stop(self) -> int: """ return self._library.Par_Stop(self._pointer) - @error_wrap + @error_wrap(context="partner") def wait_as_b_send_completion(self, timeout: int = 0) -> int: """ Waits until the current asynchronous send job is done or the timeout diff --git a/snap7/server/__init__.py b/snap7/server/__init__.py index 480d7948..f0d3794e 100644 --- a/snap7/server/__init__.py +++ b/snap7/server/__init__.py @@ -18,26 +18,17 @@ from _ctypes import CFuncPtr import struct import logging -from typing import Any, Callable, Hashable, Optional, Tuple, cast, Type +from typing import Any, Callable, Optional, Tuple, cast, Type from types import TracebackType -from ..common import ipv4, check_error, load_library +from ..common import ipv4, load_library +from ..error import check_error, error_wrap from ..protocol import Snap7CliProtocol -from ..types import SrvEvent, LocalPort, cpu_statuses, server_statuses, SrvArea, longword, WordLen, S7Object, CDataArrayType +from ..types import SrvEvent, Parameter, cpu_statuses, server_statuses, SrvArea, longword, WordLen, S7Object, CDataArrayType logger = logging.getLogger(__name__) -def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: - """Parses a s7 error code returned the decorated function.""" - - def f(*args: tuple[Any, ...], **kwargs: dict[Hashable, Any]) -> None: - code = func(*args, **kwargs) - check_error(code, context="server") - - return f - - class Server: """ A fake S7 server. @@ -94,7 +85,7 @@ def create(self) -> None: self._lib.Srv_Create.restype = S7Object # type: ignore[attr-defined] self._s7_server = S7Object(self._lib.Srv_Create()) - @error_wrap + @error_wrap(context="server") def register_area(self, area: SrvArea, index: int, userdata: CDataArrayType) -> int: """Shares a memory area with the server. That memory block will be visible by the clients. @@ -111,7 +102,7 @@ def register_area(self, area: SrvArea, index: int, userdata: CDataArrayType) -> logger.info(f"registering area {area}, index {index}, size {size}") return self._lib.Srv_RegisterArea(self._s7_server, area.value, index, byref(userdata), size) - @error_wrap + @error_wrap(context="server") def set_events_callback(self, call_back: Callable[..., Any]) -> int: """Sets the user callback that the Server object has to call when an event is created. @@ -138,7 +129,7 @@ def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int: usrPtr = c_void_p() return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, usrPtr) - @error_wrap + @error_wrap(context="server") def set_read_events_callback(self, call_back: Callable[..., Any]) -> int: """Sets the user callback that the Server object has to call when a Read event is created. @@ -176,7 +167,7 @@ def log_callback(event: SrvEvent) -> None: self.set_events_callback(log_callback) - @error_wrap + @error_wrap(context="server") def start(self, tcpport: int = 102) -> int: """Starts the server. @@ -185,11 +176,11 @@ def start(self, tcpport: int = 102) -> int: """ if tcpport != 102: logger.info(f"setting server TCP port to {tcpport}") - self.set_param(LocalPort, tcpport) + self.set_param(Parameter.LocalPort, tcpport) logger.info(f"starting server on 0.0.0.0:{tcpport}") return self._lib.Srv_Start(self._s7_server) - @error_wrap + @error_wrap(context="server") def stop(self) -> int: """Stop the server.""" logger.info("stopping server") @@ -219,7 +210,7 @@ def get_status(self) -> Tuple[str, str, int]: logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}") return (server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value) - @error_wrap + @error_wrap(context="server") def unregister_area(self, area: SrvArea, index: int) -> int: """'Unshares' a memory area previously shared with Srv_RegisterArea(). @@ -235,7 +226,7 @@ def unregister_area(self, area: SrvArea, index: int) -> int: """ return self._lib.Srv_UnregisterArea(self._s7_server, area.value, index) - @error_wrap + @error_wrap(context="server") def unlock_area(self, area: SrvArea, index: int) -> int: """Unlocks a previously locked shared memory area. @@ -249,7 +240,7 @@ def unlock_area(self, area: SrvArea, index: int) -> int: logger.debug(f"unlocking area code {area} index {index}") return self._lib.Srv_UnlockArea(self._s7_server, area.value, index) - @error_wrap + @error_wrap(context="server") def lock_area(self, area: SrvArea, index: int) -> int: """Locks a shared memory area. @@ -263,7 +254,7 @@ def lock_area(self, area: SrvArea, index: int) -> int: logger.debug(f"locking area code {area} index {index}") return self._lib.Srv_LockArea(self._s7_server, area.value, index) - @error_wrap + @error_wrap(context="server") def start_to(self, ip: str, tcp_port: int = 102) -> int: """Start server on a specific interface. @@ -276,27 +267,27 @@ def start_to(self, ip: str, tcp_port: int = 102) -> int: """ if tcp_port != 102: logger.info(f"setting server TCP port to {tcp_port}") - self.set_param(LocalPort, tcp_port) + self.set_param(Parameter.LocalPort, tcp_port) if not re.match(ipv4, ip): raise ValueError(f"{ip} is invalid ipv4") logger.info(f"starting server to {ip}:102") return self._lib.Srv_StartTo(self._s7_server, ip.encode()) - @error_wrap - def set_param(self, number: int, value: int) -> int: + @error_wrap(context="server") + def set_param(self, parameter: Parameter, value: int) -> int: """Sets an internal Server object parameter. Args: - number: number of the parameter. + parameter: the parameter to set value: value to be set. Returns: Error code from snap7 library. """ - logger.debug(f"setting param number {number} to {value}") - return self._lib.Srv_SetParam(self._s7_server, number, byref(c_int(value))) + logger.debug(f"setting param number {parameter} to {value}") + return self._lib.Srv_SetParam(self._s7_server, parameter, byref(c_int(value))) - @error_wrap + @error_wrap(context="server") def set_mask(self, kind: int, mask: int) -> int: """Writes the specified filter mask. @@ -310,7 +301,7 @@ def set_mask(self, kind: int, mask: int) -> int: logger.debug(f"setting mask kind {kind} to {mask}") return self._lib.Srv_SetMask(self._s7_server, kind, mask) - @error_wrap + @error_wrap(context="server") def set_cpu_status(self, status: int) -> int: """Sets the Virtual CPU status. @@ -375,7 +366,7 @@ def get_mask(self, kind: int) -> c_uint32: check_error(code) return mask - @error_wrap + @error_wrap(context="server") def clear_events(self) -> int: """Empties the Event queue. diff --git a/snap7/types.py b/snap7/types.py index a671ec83..069a4c9c 100755 --- a/snap7/types.py +++ b/snap7/types.py @@ -19,11 +19,17 @@ c_int, c_uint8, ) +from datetime import datetime, date, timedelta from enum import IntEnum -from typing import Dict, Union +from typing import Dict, Union, Literal -CDataArrayType = Union[Array[c_byte], Array[c_int], Array[c_int16], Array[c_int32]] -CDataType = Union[type[c_int8], type[c_int16], type[c_int32]] +CDataArrayType = Union[ + Array[c_byte], Array[c_int], Array[c_int16], Array[c_int32], Array[c_uint8], Array[c_uint16], Array[c_uint32] +] +CDataType = Union[type[c_int8], type[c_int16], type[c_int32], type[c_uint8], type[c_uint16], type[c_uint32]] +ValueType = Union[int, float, str, datetime, bytearray, bytes, date, timedelta] + +Context = Literal["client", "server", "partner"] S7Object = c_void_p buffer_size = 65536 @@ -33,46 +39,51 @@ word = c_uint16 longword = c_uint32 -# // PARAMS LIST -LocalPort = 1 -RemotePort = 2 -PingTimeout = 3 -SendTimeout = 4 -RecvTimeout = 5 -WorkInterval = 6 -SrcRef = 7 -DstRef = 8 -SrcTSap = 9 -PDURequest = 10 -MaxClients = 11 -BSendTimeout = 12 -BRecvTimeout = 13 -RecoveryTime = 14 -KeepAliveTime = 15 - -param_types = { - LocalPort: c_uint16, - RemotePort: c_uint16, - PingTimeout: c_int32, - SendTimeout: c_int32, - RecvTimeout: c_int32, - WorkInterval: c_int32, - SrcRef: c_uint16, - DstRef: c_uint16, - SrcTSap: c_uint16, - PDURequest: c_int32, - MaxClients: c_int32, - BSendTimeout: c_int32, - BRecvTimeout: c_int32, - RecoveryTime: c_uint32, - KeepAliveTime: c_uint32, -} - # mask types mkEvent = 0 mkLog = 1 +class Parameter(IntEnum): + # // PARAMS LIST + LocalPort = 1 + RemotePort = 2 + PingTimeout = 3 + SendTimeout = 4 + RecvTimeout = 5 + WorkInterval = 6 + SrcRef = 7 + DstRef = 8 + SrcTSap = 9 + PDURequest = 10 + MaxClients = 11 + BSendTimeout = 12 + BRecvTimeout = 13 + RecoveryTime = 14 + KeepAliveTime = 15 + + @property + def ctype(self) -> CDataType: + map_: Dict[int, CDataType] = { + self.LocalPort: c_uint16, + self.RemotePort: c_uint16, + self.PingTimeout: c_int32, + self.SendTimeout: c_int32, + self.RecvTimeout: c_int32, + self.WorkInterval: c_int32, + self.SrcRef: c_uint16, + self.DstRef: c_uint16, + self.SrcTSap: c_uint16, + self.PDURequest: c_int32, + self.MaxClients: c_int32, + self.BSendTimeout: c_int32, + self.BRecvTimeout: c_int32, + self.RecoveryTime: c_uint32, + self.KeepAliveTime: c_uint32, + } + return map_[self] + + # Area ID # Word Length class WordLen(IntEnum): diff --git a/snap7/util/__init__.py b/snap7/util/__init__.py index f9dccc37..f1af6e80 100644 --- a/snap7/util/__init__.py +++ b/snap7/util/__init__.py @@ -1,97 +1,3 @@ -""" -This module contains utility functions for working with PLC DB objects. -There are functions to work with the raw bytearray data snap7 functions return -In order to work with this data you need to make python able to work with the -PLC bytearray data. - -For example code see test_util.py and example.py in the example folder. - - -example:: - - spec/DB layout - - # Byte index Variable name Datatype - layout=\"\"\" - 4 ID INT - 6 NAME STRING[6] - - 12.0 testbool1 BOOL - 12.1 testbool2 BOOL - 12.2 testbool3 BOOL - 12.3 testbool4 BOOL - 12.4 testbool5 BOOL - 12.5 testbool6 BOOL - 12.6 testbool7 BOOL - 12.7 testbool8 BOOL - 13 testReal REAL - 17 testDword DWORD - \"\"\" - - client = snap7.client.Client() - client.connect('192.168.200.24', 0, 3) - - # this looks confusing but this means uploading from the PLC to YOU - # so downloading in the PC world :) - - all_data = client.upload(db_number) - - simple: - - db1 = snap7.util.DB( - db_number, # the db we use - all_data, # bytearray from the plc - layout, # layout specification DB variable data - # A DB specification is the specification of a - # DB object in the PLC you can find it using - # the dataview option on a DB object in PCS7 - - 17+2, # size of the specification 17 is start - # of last value - # which is a DWORD which is 2 bytes, - - 1, # number of row's / specifications - - id_field='ID', # field we can use to identify a row. - # default index is used - layout_offset=4, # sometimes specification does not start a 0 - # like in our example - db_offset=0 # At which point in 'all_data' should we start - # reading. if could be that the specification - # does not start at 0 - ) - - Now we can use db1 in python as a dict. if 'ID' contains - the 'test' we can identify the 'test' row in the all_data bytearray - - To test of you layout matches the data from the plc you can - just print db1[0] or db['test'] in the example - - db1['test']['testbool1'] = 0 - - If we do not specify a id_field this should work to read out the - same data. - - db1[0]['testbool1'] - - to read and write a single Row from the plc. takes like 5ms! - - db1['test'].write() - - db1['test'].read(client) - - -""" - -import re -from typing import Any -from collections import OrderedDict - -from .db import ( - DB, - DB_Row, -) - from .setters import ( set_bool, set_fstring, @@ -107,6 +13,8 @@ set_usint, set_sint, set_time, + set_lreal, + set_date, ) from .getters import ( @@ -135,7 +43,6 @@ get_dtl, ) - __all__ = [ "get_bool", "get_real", @@ -162,6 +69,8 @@ "get_wstring", "set_real", "set_dword", + "set_date", + "set_lreal", "set_udint", "set_dint", "set_uint", @@ -175,55 +84,3 @@ "set_fstring", "set_string", ] - - -def parse_specification(db_specification: str) -> OrderedDict[str, Any]: - """Create a db specification derived from a - dataview of a db in which the byte layout - is specified - - Args: - db_specification: string formatted table with the indexes, aliases and types. - - Returns: - Parsed DB specification. - """ - parsed_db_specification = OrderedDict() - - for line in db_specification.split("\n"): - if line and not line.lstrip().startswith("#"): - index, var_name, _type = line.lstrip().split("#")[0].split() - parsed_db_specification[var_name] = (index, _type) - - return parsed_db_specification - - -def print_row(data: bytearray) -> None: - """print a single db row in chr and str""" - index_line = "" - pri_line1 = "" - chr_line2 = "" - asci = re.compile("[a-zA-Z0-9 ]") - - for i, xi in enumerate(data): - # index - if not i % 5: - diff = len(pri_line1) - len(index_line) - index_line += diff * " " - index_line += str(i) - # i = i + (ws - len(i)) * ' ' + ',' - - # byte array line - str_v = str(xi) - pri_line1 += str(xi) + "," - # char line - c = chr(xi) - c = c if asci.match(c) else " " - # align white space - w = len(str_v) - c = c + (w - 1) * " " + "," - chr_line2 += c - - print(index_line) - print(pri_line1) - print(chr_line2) diff --git a/snap7/util/getters.py b/snap7/util/getters.py index 494182b3..949f4d6a 100644 --- a/snap7/util/getters.py +++ b/snap7/util/getters.py @@ -63,7 +63,7 @@ def get_word(bytearray_: bytearray, byte_index: int) -> bytearray: Examples: >>> data = bytearray([0, 100]) # two bytes for a word - >>> snap7.util.get_word(data, 0) + >>> get_word(data, 0) 100 """ data = bytearray_[byte_index : byte_index + 2] @@ -89,7 +89,7 @@ def get_int(bytearray_: bytearray, byte_index: int) -> int: Examples: >>> data = bytearray([0, 255]) - >>> snap7.util.get_int(data, 0) + >>> get_int(data, 0) 255 """ data = bytearray_[byte_index : byte_index + 2] @@ -117,7 +117,7 @@ def get_uint(bytearray_: bytearray, byte_index: int) -> int: Examples: >>> data = bytearray([255, 255]) - >>> snap7.util.get_uint(data, 0) + >>> get_uint(data, 0) 65535 """ data = bytearray_[byte_index : byte_index + 2] @@ -144,7 +144,7 @@ def get_real(bytearray_: bytearray, byte_index: int) -> float: Examples: >>> data = bytearray(b'B\\xf6\\xa4Z') - >>> snap7.util.get_real(data, 0) + >>> get_real(data, 0) 123.32099914550781 """ x = bytearray_[byte_index : byte_index + 4] @@ -169,9 +169,9 @@ def get_fstring(bytearray_: bytearray, byte_index: int, max_length: int, remove_ Examples: >>> data = [ord(letter) for letter in "hello world "] - >>> snap7.util.get_fstring(data, 0, 15) + >>> get_fstring(data, 0, 15) 'hello world' - >>> snap7.util.get_fstring(data, 0, 15, remove_padding=false) + >>> get_fstring(data, 0, 15, remove_padding=False) 'hello world ' """ data = map(chr, bytearray_[byte_index : byte_index + max_length]) @@ -198,8 +198,8 @@ def get_string(bytearray_: bytearray, byte_index: int) -> str: String value. Examples: - >>> data = bytearray([254, len("hello world")] + [ord(letter) for letter in "hello world"]) - >>> snap7.util.get_string(data, 0) + >>> data = bytearray([254, len("hello world")] + [ord(l) for letter in "hello world"]) + >>> get_string(data, 0) 'hello world' """ @@ -234,7 +234,7 @@ def get_dword(bytearray_: bytearray, byte_index: int) -> int: Examples: >>> data = bytearray(8) >>> data[:] = b"\\x12\\x34\\xAB\\xCD" - >>> snap7.util.get_dword(data, 0) + >>> get_dword(data, 0) 4294967295 """ data = bytearray_[byte_index : byte_index + 4] @@ -261,7 +261,7 @@ def get_dint(bytearray_: bytearray, byte_index: int) -> int: >>> import struct >>> data = bytearray(4) >>> data[:] = struct.pack(">i", 2147483647) - >>> snap7.util.get_dint(data, 0) + >>> get_dint(data, 0) 2147483647 """ data = bytearray_[byte_index : byte_index + 4] @@ -288,7 +288,7 @@ def get_udint(bytearray_: bytearray, byte_index: int) -> int: >>> import struct >>> data = bytearray(4) >>> data[:] = struct.pack(">I", 4294967295) - >>> snap7.util.get_udint(data, 0) + >>> get_udint(data, 0) 4294967295 """ data = bytearray_[byte_index : byte_index + 4] @@ -390,7 +390,7 @@ def get_time(bytearray_: bytearray, byte_index: int) -> str: >>> import struct >>> data = bytearray(4) >>> data[:] = struct.pack(">i", 2147483647) - >>> snap7.util.get_time(data, 0) + >>> get_time(data, 0) '24:20:31:23:647' """ data_bytearray = bytearray_[byte_index : byte_index + 4] @@ -432,7 +432,7 @@ def get_usint(bytearray_: bytearray, byte_index: int) -> int: Examples: >>> data = bytearray([255]) - >>> snap7.util.get_usint(data, 0) + >>> get_usint(data, 0) 255 """ data = bytearray_[byte_index] & 0xFF @@ -458,7 +458,7 @@ def get_sint(bytearray_: bytearray, byte_index: int) -> int: Examples: >>> data = bytearray([127]) - >>> snap7.util.get_sint(data, 0) + >>> get_sint(data, 0) 127 """ data = bytearray_[byte_index] @@ -486,8 +486,9 @@ def get_lint(bytearray_: bytearray, byte_index: int) -> NoReturn: Examples: read lint value (here as example 12345) from DB1.10 of a PLC - >>> data = client.db_read(db_number=1, start=10, size=8) - >>> snap7.util.get_lint(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=8) + >>> get_lint(data, 0) 12345 """ @@ -515,8 +516,9 @@ def get_lreal(bytearray_: bytearray, byte_index: int) -> float: Examples: read lreal value (here as example 12345.12345) from DB1.10 of a PLC - >>> data = client.db_read(db_number=1, start=10, size=8) - >>> snap7.util.get_lreal(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=8) + >>> get_lreal(data, 0) 12345.12345 """ return float(struct.unpack_from(">d", bytearray_, offset=byte_index)[0]) @@ -541,8 +543,9 @@ def get_lword(bytearray_: bytearray, byte_index: int) -> bytearray: Examples: read lword value (here as example 0xAB\0xCD) from DB1.10 of a PLC - >>> data = client.db_read(db_number=1, start=10, size=8) - >>> snap7.util.get_lword(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=8) + >>> get_lword(data, 0) bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\xAB\\xCD") """ # data = bytearray_[byte_index:byte_index + 4] @@ -566,8 +569,9 @@ def get_ulint(bytearray_: bytearray, byte_index: int) -> int: Examples: Read 8 Bytes raw from DB1.10, where an ulint value is stored. Return Python compatible value. - >>> data = client.db_read(db_number=1, start=10, size=8) - >>> snap7.util.get_ulint(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=8) + >>> get_ulint(data, 0) 12345 """ raw_ulint = bytearray_[byte_index : byte_index + 8] @@ -639,8 +643,9 @@ def get_char(bytearray_: bytearray, byte_index: int) -> str: Examples: Read 1 Byte raw from DB1.10, where a char value is stored. Return Python compatible value. - >>> data = client.db_read(db_number=1, start=10, size=1) - >>> snap7.util.get_char(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=1) + >>> get_char(data, 0) 'C' """ char = chr(bytearray_[byte_index]) @@ -663,8 +668,9 @@ def get_wchar(bytearray_: bytearray, byte_index: int) -> str: Examples: Read 2 Bytes raw from DB1.10, where a wchar value is stored. Return Python compatible value. - >>> data = client.db_read(db_number=1, start=10, size=2) - >>> snap7.util.get_wchar(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=2) + >>> get_wchar(data, 0) 'C' """ if bytearray_[byte_index] == 0: @@ -689,8 +695,9 @@ def get_wstring(bytearray_: bytearray, byte_index: int) -> str: Examples: Read from DB1.10 22, where the WSTRING is stored, the raw 22 Bytes and convert them to a python string - >>> data = client.db_read(db_number=1, start=10, size=22) - >>> snap7.util.get_wstring(data, 0) + >>> from snap7 import Client + >>> data = Client().db_read(db_number=1, start=10, size=22) + >>> get_wstring(data, 0) 'hello world' """ # Byte 0 + 1 --> total length of wstring, should be bytearray_ - 4 diff --git a/snap7/util/setters.py b/snap7/util/setters.py index 19c633d8..f4ea7d6e 100644 --- a/snap7/util/setters.py +++ b/snap7/util/setters.py @@ -98,7 +98,7 @@ def set_int(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: Examples: >>> data = bytearray(2) - >>> snap7.util.set_int(data, 0, 255) + >>> set_int(data, 0, 255) bytearray(b'\\x00\\xff') """ # make sure were dealing with an int @@ -123,8 +123,9 @@ def set_uint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: Buffer with the written value. Examples: + >>> from snap7.util import set_uint >>> data = bytearray(2) - >>> snap7.util.set_uint(data, 0, 65535) + >>> set_uint(data, 0, 65535) bytearray(b'\\xff\\xff') """ # make sure were dealing with an int @@ -151,7 +152,7 @@ def set_real(bytearray_: bytearray, byte_index: int, real: Union[bool, str, floa Examples: >>> data = bytearray(4) - >>> snap7.util.set_real(data, 0, 123.321) + >>> set_real(data, 0, 123.321) bytearray(b'B\\xf6\\xa4Z') """ real_packed = struct.pack(">f", float(real)) @@ -177,7 +178,7 @@ def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: Examples: >>> data = bytearray(20) - >>> snap7.util.set_fstring(data, 0, "hello world", 15) + >>> set_fstring(data, 0, "hello world", 15) >>> data bytearray(b'hello world \x00\x00\x00\x00\x00') """ @@ -214,8 +215,9 @@ def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int or 'max_size' is greater than 254 or 'value' contains non-ascii characters. Examples: + >>> from snap7.util import set_string >>> data = bytearray(20) - >>> snap7.util.set_string(data, 0, "hello world", 254) + >>> set_string(data, 0, "hello world", 254) >>> data bytearray(b'\\xff\\x0bhello world\\x00\\x00\\x00\\x00\\x00\\x00\\x00') """ @@ -265,7 +267,7 @@ def set_dword(bytearray_: bytearray, byte_index: int, dword: int) -> None: Examples: >>> data = bytearray(4) - >>> snap7.util.set_dword(data,0, 4294967295) + >>> set_dword(data,0, 4294967295) >>> data bytearray(b'\\xff\\xff\\xff\\xff') """ @@ -290,7 +292,7 @@ def set_dint(bytearray_: bytearray, byte_index: int, dint: int) -> None: Examples: >>> data = bytearray(4) - >>> snap7.util.set_dint(data, 0, 2147483647) + >>> set_dint(data, 0, 2147483647) >>> data bytearray(b'\\x7f\\xff\\xff\\xff') """ @@ -315,7 +317,7 @@ def set_udint(bytearray_: bytearray, byte_index: int, udint: int) -> None: Examples: >>> data = bytearray(4) - >>> snap7.util.set_udint(data, 0, 4294967295) + >>> set_udint(data, 0, 4294967295) >>> data bytearray(b'\\xff\\xff\\xff\\xff') """ @@ -341,7 +343,7 @@ def set_time(bytearray_: bytearray, byte_index: int, time_string: str) -> bytear Examples: >>> data = bytearray(4) - >>> snap7.util.set_time(data, 0, '-22:3:57:28.192') + >>> set_time(data, 0, '-22:3:57:28.192') >>> data bytearray(b'\x8d\xda\xaf\x00') @@ -390,7 +392,7 @@ def set_usint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: Examples: >>> data = bytearray(1) - >>> snap7.util.set_usint(data, 0, 255) + >>> set_usint(data, 0, 255) bytearray(b'\\xff') """ _int = int(_int) @@ -417,7 +419,7 @@ def set_sint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: Examples: >>> data = bytearray(1) - >>> snap7.util.set_sint(data, 0, 127) + >>> set_sint(data, 0, 127) bytearray(b'\\x7f') """ _int = int(_int) @@ -445,8 +447,9 @@ def set_lreal(bytearray_: bytearray, byte_index: int, lreal: float) -> bytearray Examples: write lreal value (here as example 12345.12345) to DB1.10 of a PLC - >>> data = snap7.util.set_lreal(data, 12345.12345) - >>> client.db_write(db_number=1, start=10, data=data) + >>> data = set_lreal(data, 12345.12345) + >>> from snap7 import Client + >>> Client().db_write(db_number=1, start=10, data=data) """ lreal = float(lreal) @@ -474,9 +477,10 @@ def set_lword(bytearray_: bytearray, byte_index: int, lword: bytearray) -> bytea Examples: read lword value (here as example 0xAB\0xCD) from DB1.10 of a PLC - >>> data = snap7.util.set_lword(data, 0, bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\xAB\\xCD")) + >>> data = set_lword(data, 0, bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\xAB\\xCD")) bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\xAB\\xCD") - >>> client.db_write(db_number=1, start=10, data=data) + >>> from snap7 import Client + >>> Client().db_write(db_number=1, start=10, data=data) """ # data = bytearray_[byte_index:byte_index + 4] # dword = struct.unpack('8B', struct.pack('>Q', *data))[0] @@ -500,8 +504,9 @@ def set_char(bytearray_: bytearray, byte_index: int, chr_: str) -> Union[ValueEr Examples: Read 1 Byte raw from DB1.10, where a char value is stored. Return Python compatible value. - >>> data = snap7.util.set_char(data, 0, 'C') - >>> client.db_write(db_number=1, start=10, data=data) + >>> data = set_char(data, 0, 'C') + >>> from snap7 import Client + >>> Client().db_write(db_number=1, start=10, data=data) 'bytearray('0x43') """ if chr_.isascii(): @@ -518,10 +523,10 @@ def set_date(bytearray_: bytearray, byte_index: int, date_: date) -> bytearray: Args: bytearray_: buffer to write. byte_index: byte index from where to start writing. - date: date object + date_: date object Examples: >>> data = bytearray(2) - >>> snap7.util.set_date(data, 0, date(2024, 3, 27)) + >>> set_date(data, 0, date(2024, 3, 27)) >>> data bytearray(b'\x30\xd8') """ diff --git a/tests/test_client.py b/tests/test_client.py index 6b1fc18f..4a96cf31 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,7 +2,7 @@ import logging import struct import time -from typing import Tuple, Union +from typing import Tuple import pytest import unittest @@ -17,17 +17,14 @@ cast, pointer, Array, - c_byte, - c_int16, ) from datetime import datetime, timedelta, date from multiprocessing import Process from unittest import mock from typing import cast as typing_cast -from snap7.util.getters import get_real, get_int -from snap7.util.setters import set_int -from snap7.common import check_error +from snap7.util import get_real, get_int, set_int +from snap7.error import check_error from snap7.server import mainloop from snap7.client import Client from snap7.types import ( @@ -38,22 +35,9 @@ buffer_size, Area, WordLen, - RemotePort, - LocalPort, - WorkInterval, - MaxClients, - BSendTimeout, - BRecvTimeout, - PingTimeout, - SendTimeout, - RecvTimeout, - SrcRef, - DstRef, - SrcTSap, - PDURequest, - RecoveryTime, - KeepAliveTime, Block, + Parameter, + CDataArrayType, ) logging.basicConfig(level=logging.WARNING) @@ -65,13 +49,13 @@ slot = 1 -def _prepare_as_read_area(area: Area, size: int) -> Tuple[WordLen, Union[Array[c_byte], Array[c_int16], Array[c_int32]]]: +def _prepare_as_read_area(area: Area, size: int) -> Tuple[WordLen, CDataArrayType]: wordlen = area.wordlen() usrdata = (wordlen.ctype * size)() return wordlen, usrdata -def _prepare_as_write_area(area: Area, data: bytearray) -> Tuple[WordLen, Union[Array[c_byte], Array[c_int16], Array[c_int32]]]: +def _prepare_as_write_area(area: Area, data: bytearray) -> Tuple[WordLen, CDataArrayType]: if area not in Area: raise ValueError(f"{area} is not implemented in types") elif area == Area.TM: @@ -225,10 +209,10 @@ def test_as_upload(self) -> None: self.client.as_upload(1, _buffer, size) self.assertRaises(RuntimeError, self.client.wait_as_completion, 500) - @unittest.skip("TODO: invalid block size") + @unittest.skip("TODO: not yet implemented") def test_download(self) -> None: - data = bytearray(1024) - self.client.download(block_num=db_number, data=data) + data = bytearray([0b11111111]) + self.client.download(block_num=0, data=data) def test_read_area(self) -> None: amount = 1 @@ -272,7 +256,7 @@ def test_write_area(self) -> None: area = Area.TM dbnumber = 0 timer = bytearray(b"\x12\x00") - res = self.client.write_area(area, dbnumber, start, timer) + self.client.write_area(area, dbnumber, start, timer) res = self.client.read_area(area, dbnumber, start, 1) self.assertEqual(timer, bytearray(res)) @@ -280,7 +264,7 @@ def test_write_area(self) -> None: area = Area.CT dbnumber = 0 timer = bytearray(b"\x13\x00") - res = self.client.write_area(area, dbnumber, start, timer) + self.client.write_area(area, dbnumber, start, timer) res = self.client.read_area(area, dbnumber, start, 1) self.assertEqual(timer, bytearray(res)) @@ -361,41 +345,41 @@ def test_as_compress(self) -> None: def test_set_param(self) -> None: values = ( - (PingTimeout, 800), - (SendTimeout, 15), - (RecvTimeout, 3500), - (SrcRef, 128), - (DstRef, 128), - (SrcTSap, 128), - (PDURequest, 470), + (Parameter.PingTimeout, 800), + (Parameter.SendTimeout, 15), + (Parameter.RecvTimeout, 3500), + (Parameter.SrcRef, 128), + (Parameter.DstRef, 128), + (Parameter.SrcTSap, 128), + (Parameter.PDURequest, 470), ) for param, value in values: self.client.set_param(param, value) - self.assertRaises(Exception, self.client.set_param, RemotePort, 1) + self.assertRaises(Exception, self.client.set_param, Parameter.RemotePort, 1) def test_get_param(self) -> None: expected = ( - (RemotePort, tcpport), - (PingTimeout, 750), - (SendTimeout, 10), - (RecvTimeout, 3000), - (SrcRef, 256), - (DstRef, 0), - (SrcTSap, 256), - (PDURequest, 480), + (Parameter.RemotePort, tcpport), + (Parameter.PingTimeout, 750), + (Parameter.SendTimeout, 10), + (Parameter.RecvTimeout, 3000), + (Parameter.SrcRef, 256), + (Parameter.DstRef, 0), + (Parameter.SrcTSap, 256), + (Parameter.PDURequest, 480), ) for param, value in expected: self.assertEqual(self.client.get_param(param), value) non_client = ( - LocalPort, - WorkInterval, - MaxClients, - BSendTimeout, - BRecvTimeout, - RecoveryTime, - KeepAliveTime, + Parameter.LocalPort, + Parameter.WorkInterval, + Parameter.MaxClients, + Parameter.BSendTimeout, + Parameter.BRecvTimeout, + Parameter.RecoveryTime, + Parameter.KeepAliveTime, ) # invalid param for client @@ -463,7 +447,6 @@ def test_as_db_write(self) -> None: self.client.wait_as_completion(500) self.assertEqual(data, result) - @unittest.skip("TODO: not yet fully implemented") def test_as_download(self) -> None: data = bytearray(128) self.client.as_download(block_num=-1, data=data) @@ -478,7 +461,7 @@ def test_plc_cold_start(self) -> None: self.client.plc_cold_start() def test_get_pdu_length(self) -> None: - pduRequested = self.client.get_param(10) + pduRequested = self.client.get_param(Parameter.PDURequest) pduSize = self.client.get_pdu_length() self.assertEqual(pduSize, pduRequested) @@ -556,7 +539,6 @@ def test_ab_write_with_byte_literal_does_not_throw(self) -> None: finally: self.client._lib.Cli_ABWrite = original - @unittest.skip("TODO: not yet fully implemented") def test_as_ab_write_with_byte_literal_does_not_throw(self) -> None: mock_write = mock.MagicMock() mock_write.return_value = None @@ -573,7 +555,6 @@ def test_as_ab_write_with_byte_literal_does_not_throw(self) -> None: finally: self.client._lib.Cli_AsABWrite = original - @unittest.skip("TODO: not yet fully implemented") def test_as_db_write_with_byte_literal_does_not_throw(self) -> None: mock_write = mock.MagicMock() mock_write.return_value = None @@ -588,7 +569,6 @@ def test_as_db_write_with_byte_literal_does_not_throw(self) -> None: finally: self.client._lib.Cli_AsDBWrite = original - @unittest.skip("TODO: not yet fully implemented") def test_as_download_with_byte_literal_does_not_throw(self) -> None: mock_download = mock.MagicMock() mock_download.return_value = None @@ -1041,14 +1021,14 @@ def setUp(self) -> None: def test_set_param(self) -> None: values = ( - (RemotePort, 1102), - (PingTimeout, 800), - (SendTimeout, 15), - (RecvTimeout, 3500), - (SrcRef, 128), - (DstRef, 128), - (SrcTSap, 128), - (PDURequest, 470), + (Parameter.RemotePort, 1102), + (Parameter.PingTimeout, 800), + (Parameter.SendTimeout, 15), + (Parameter.RecvTimeout, 3500), + (Parameter.SrcRef, 128), + (Parameter.DstRef, 128), + (Parameter.SrcTSap, 128), + (Parameter.PDURequest, 470), ) for param, value in values: self.client.set_param(param, value) diff --git a/tests/test_logo_client.py b/tests/test_logo_client.py index 54e9cb63..0289bfaa 100644 --- a/tests/test_logo_client.py +++ b/tests/test_logo_client.py @@ -6,6 +6,7 @@ import snap7 from snap7.server import mainloop +from snap7.types import Parameter logging.basicConfig(level=logging.WARNING) @@ -60,41 +61,41 @@ def test_get_connected(self) -> None: def test_set_param(self) -> None: values = ( - (snap7.types.PingTimeout, 800), - (snap7.types.SendTimeout, 15), - (snap7.types.RecvTimeout, 3500), - (snap7.types.SrcRef, 128), - (snap7.types.DstRef, 128), - (snap7.types.SrcTSap, 128), - (snap7.types.PDURequest, 470), + (Parameter.PingTimeout, 800), + (Parameter.SendTimeout, 15), + (Parameter.RecvTimeout, 3500), + (Parameter.SrcRef, 128), + (Parameter.DstRef, 128), + (Parameter.SrcTSap, 128), + (Parameter.PDURequest, 470), ) for param, value in values: self.client.set_param(param, value) - self.assertRaises(Exception, self.client.set_param, snap7.types.RemotePort, 1) + self.assertRaises(Exception, self.client.set_param, Parameter.RemotePort, 1) def test_get_param(self) -> None: expected = ( - (snap7.types.RemotePort, tcpport), - (snap7.types.PingTimeout, 750), - (snap7.types.SendTimeout, 10), - (snap7.types.RecvTimeout, 3000), - (snap7.types.SrcRef, 256), - (snap7.types.DstRef, 0), - (snap7.types.SrcTSap, 4096), - (snap7.types.PDURequest, 480), + (Parameter.RemotePort, tcpport), + (Parameter.PingTimeout, 750), + (Parameter.SendTimeout, 10), + (Parameter.RecvTimeout, 3000), + (Parameter.SrcRef, 256), + (Parameter.DstRef, 0), + (Parameter.SrcTSap, 4096), + (Parameter.PDURequest, 480), ) for param, value in expected: self.assertEqual(self.client.get_param(param), value) non_client = ( - snap7.types.LocalPort, - snap7.types.WorkInterval, - snap7.types.MaxClients, - snap7.types.BSendTimeout, - snap7.types.BRecvTimeout, - snap7.types.RecoveryTime, - snap7.types.KeepAliveTime, + Parameter.LocalPort, + Parameter.WorkInterval, + Parameter.MaxClients, + Parameter.BSendTimeout, + Parameter.BRecvTimeout, + Parameter.RecoveryTime, + Parameter.KeepAliveTime, ) # invalid param for client @@ -113,14 +114,14 @@ def setUp(self) -> None: def test_set_param(self) -> None: values = ( - (snap7.types.RemotePort, 1102), - (snap7.types.PingTimeout, 800), - (snap7.types.SendTimeout, 15), - (snap7.types.RecvTimeout, 3500), - (snap7.types.SrcRef, 128), - (snap7.types.DstRef, 128), - (snap7.types.SrcTSap, 128), - (snap7.types.PDURequest, 470), + (Parameter.RemotePort, 1102), + (Parameter.PingTimeout, 800), + (Parameter.SendTimeout, 15), + (Parameter.RecvTimeout, 3500), + (Parameter.SrcRef, 128), + (Parameter.DstRef, 128), + (Parameter.SrcTSap, 128), + (Parameter.PDURequest, 470), ) for param, value in values: self.client.set_param(param, value) diff --git a/tests/test_mainloop.py b/tests/test_mainloop.py index 52cdfd63..b8f662d4 100644 --- a/tests/test_mainloop.py +++ b/tests/test_mainloop.py @@ -7,9 +7,8 @@ import snap7.error import snap7.server -import snap7.util -import snap7.util.getters -from snap7.util import get_bool, get_dint, get_dword, get_int, get_real, get_sint, get_string, get_usint, get_word +import snap7.db +from snap7.db import get_bool, get_dint, get_dword, get_int, get_real, get_sint, get_string, get_usint, get_word from snap7.client import Client import snap7.types @@ -50,15 +49,14 @@ def tearDown(self) -> None: self.client.disconnect() self.client.destroy() - @unittest.skip("TODO: only first test used") def test_read_prefill_db(self) -> None: + buffer = bytearray([0b11111111]) + self.client.db_write(0, 0, buffer) data = self.client.db_read(0, 0, 7) - boolean = snap7.util.getters.get_bool(data, 0, 0) + boolean = get_bool(data, 0, 0) self.assertEqual(boolean, True) - integer = snap7.util.getters.get_int(data, 1) - self.assertEqual(integer, 128) - real = snap7.util.getters.get_real(data, 3) - self.assertEqual(real, -128) + integer = get_int(data, 0) + self.assertEqual(integer, -256) def test_read_booleans(self) -> None: data = self.client.db_read(0, 0, 1) diff --git a/tests/test_partner.py b/tests/test_partner.py index e9ea5ac5..986e8a17 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -1,9 +1,12 @@ import logging + import pytest import unittest as unittest from unittest import mock +from snap7.error import error_text import snap7.partner +from snap7.types import Parameter logging.basicConfig(level=logging.WARNING) @@ -21,7 +24,6 @@ def tearDown(self) -> None: def test_as_b_send(self) -> None: self.partner.as_b_send() - @unittest.skip("we don't recv something yet") def test_b_recv(self) -> None: self.partner.b_recv() @@ -41,31 +43,31 @@ def test_destroy(self) -> None: self.partner.destroy() def test_error_text(self) -> None: - snap7.common.error_text(0, context="partner") + error_text(0, context="partner") def test_get_last_error(self) -> None: self.partner.get_last_error() def test_get_param(self) -> None: expected = ( - (snap7.types.LocalPort, 0), - (snap7.types.RemotePort, 102), - (snap7.types.PingTimeout, 750), - (snap7.types.SendTimeout, 10), - (snap7.types.RecvTimeout, 3000), - (snap7.types.SrcRef, 256), - (snap7.types.DstRef, 0), - (snap7.types.PDURequest, 480), - (snap7.types.WorkInterval, 100), - (snap7.types.BSendTimeout, 3000), - (snap7.types.BRecvTimeout, 3000), - (snap7.types.RecoveryTime, 500), - (snap7.types.KeepAliveTime, 5000), + (Parameter.LocalPort, 0), + (Parameter.RemotePort, 102), + (Parameter.PingTimeout, 750), + (Parameter.SendTimeout, 10), + (Parameter.RecvTimeout, 3000), + (Parameter.SrcRef, 256), + (Parameter.DstRef, 0), + (Parameter.PDURequest, 480), + (Parameter.WorkInterval, 100), + (Parameter.BSendTimeout, 3000), + (Parameter.BRecvTimeout, 3000), + (Parameter.RecoveryTime, 500), + (Parameter.KeepAliveTime, 5000), ) for param, value in expected: self.assertEqual(self.partner.get_param(param), value) - self.assertRaises(Exception, self.partner.get_param, snap7.types.MaxClients) + self.assertRaises(Exception, self.partner.get_param, Parameter.MaxClients) def test_get_stats(self) -> None: self.partner.get_stats() @@ -78,23 +80,23 @@ def test_get_times(self) -> None: def test_set_param(self) -> None: values = ( - (snap7.types.PingTimeout, 800), - (snap7.types.SendTimeout, 15), - (snap7.types.RecvTimeout, 3500), - (snap7.types.WorkInterval, 50), - (snap7.types.SrcRef, 128), - (snap7.types.DstRef, 128), - (snap7.types.SrcTSap, 128), - (snap7.types.PDURequest, 470), - (snap7.types.BSendTimeout, 2000), - (snap7.types.BRecvTimeout, 2000), - (snap7.types.RecoveryTime, 400), - (snap7.types.KeepAliveTime, 4000), + (Parameter.PingTimeout, 800), + (Parameter.SendTimeout, 15), + (Parameter.RecvTimeout, 3500), + (Parameter.WorkInterval, 50), + (Parameter.SrcRef, 128), + (Parameter.DstRef, 128), + (Parameter.SrcTSap, 128), + (Parameter.PDURequest, 470), + (Parameter.BSendTimeout, 2000), + (Parameter.BRecvTimeout, 2000), + (Parameter.RecoveryTime, 400), + (Parameter.KeepAliveTime, 4000), ) for param, value in values: self.partner.set_param(param, value) - self.assertRaises(Exception, self.partner.set_param, snap7.types.RemotePort, 1) + self.assertRaises(Exception, self.partner.set_param, Parameter.RemotePort, 1) def test_set_recv_callback(self) -> None: self.partner.set_recv_callback() diff --git a/tests/test_server.py b/tests/test_server.py index 32489ba6..84593164 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -7,10 +7,9 @@ from threading import Thread from unittest import mock -from snap7.common import error_text -from snap7.error import server_errors +from snap7.error import server_errors, error_text from snap7.server import Server -from snap7.types import SrvEvent, mkEvent, mkLog, LocalPort, WorkInterval, MaxClients, RemotePort, SrvArea +from snap7.types import SrvEvent, mkEvent, mkLog, SrvArea, Parameter logging.basicConfig(level=logging.WARNING) @@ -123,12 +122,12 @@ def test_start_to(self) -> None: def test_get_param(self) -> None: # check the defaults - self.assertEqual(self.server.get_param(LocalPort), 1102) - self.assertEqual(self.server.get_param(WorkInterval), 100) - self.assertEqual(self.server.get_param(MaxClients), 1024) + self.assertEqual(self.server.get_param(Parameter.LocalPort), 1102) + self.assertEqual(self.server.get_param(Parameter.WorkInterval), 100) + self.assertEqual(self.server.get_param(Parameter.MaxClients), 1024) # invalid param for server - self.assertRaises(Exception, self.server.get_param, RemotePort) + self.assertRaises(Exception, self.server.get_param, Parameter.RemotePort) @pytest.mark.server @@ -141,7 +140,7 @@ def setUp(self) -> None: self.server = Server() def test_set_param(self) -> None: - self.server.set_param(LocalPort, 1102) + self.server.set_param(Parameter.LocalPort, 1102) @pytest.mark.server diff --git a/tests/test_util.py b/tests/test_util.py index 1c9de650..716c8e8e 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -4,9 +4,9 @@ import struct from typing import cast -from snap7.util.db import DB_Row, DB -from snap7.util.getters import get_byte, get_time, get_fstring, get_int -from snap7.util.setters import set_byte, set_time, set_fstring, set_int +from snap7 import DB, Row +from snap7.util import get_byte, get_time, get_fstring, get_int +from snap7.util import set_byte, set_time, set_fstring, set_int from snap7.types import WordLen test_spec = """ @@ -214,19 +214,19 @@ def test_set_byte_new(self) -> None: def test_get_byte(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(50, "BYTE") # get value self.assertEqual(value, 254) def test_set_byte(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testByte"] = 255 self.assertEqual(row["testByte"], 255) def test_set_lreal(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testLreal"] = 123.123 self.assertEqual(row["testLreal"], 123.123) @@ -236,7 +236,7 @@ def test_get_s5time(self) -> None: """ test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertEqual(row["testS5time"], "0:00:00.100000") @@ -246,7 +246,7 @@ def test_get_dt(self) -> None: """ test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertEqual(row["testdateandtime"], "2020-07-12T17:32:02.854000") @@ -303,12 +303,12 @@ def test_get_string(self) -> None: """ test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertEqual(row["NAME"], "test") def test_write_string(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["NAME"] = "abc" self.assertEqual(row["NAME"], "abc") row["NAME"] = "" @@ -331,13 +331,13 @@ def test_get_fstring(self) -> None: def test_get_fstring_name(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row["testFstring"] self.assertEqual(value, "test") def test_get_fstring_index(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(98, "FSTRING[8]") # get value self.assertEqual(value, "test") @@ -348,19 +348,19 @@ def test_set_fstring(self) -> None: def test_set_fstring_name(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testFstring"] = "TSET" self.assertEqual(row["testFstring"], "TSET") def test_set_fstring_index(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row.set_value(98, "FSTRING[8]", "TSET") self.assertEqual(row["testFstring"], "TSET") def test_get_int(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) x = row["ID"] y = row["testint2"] self.assertEqual(x, 0) @@ -368,31 +368,31 @@ def test_get_int(self) -> None: def test_set_int(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["ID"] = 259 self.assertEqual(row["ID"], 259) def test_get_usint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(43, "USINT") # get value self.assertEqual(value, 254) def test_set_usint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testusint0"] = 255 self.assertEqual(row["testusint0"], 255) def test_get_sint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(44, "SINT") # get value self.assertEqual(value, 127) def test_set_sint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testsint0"] = 127 self.assertEqual(row["testsint0"], 127) @@ -406,20 +406,20 @@ def test_set_int_roundtrip(self) -> None: def test_get_int_values(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) for value in (-32768, -16385, -256, -128, -127, 0, 127, 128, 255, 256, 16384, 32767): row["ID"] = value self.assertEqual(row["ID"], value) def test_get_bool(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertEqual(row["testbool1"], 1) self.assertEqual(row["testbool8"], 0) def test_set_bool(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testbool8"] = True row["testbool1"] = False @@ -465,56 +465,56 @@ def test_db_export(self) -> None: def test_get_real(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertTrue(0.01 > (row["testReal"] - 827.3) > -0.1) def test_set_real(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) row["testReal"] = 1337.1337 self.assertTrue(0.01 > (row["testReal"] - 1337.1337) > -0.01) def test_set_dword(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) # The range of numbers is 0 to 4294967295. row["testDword"] = 9999999 self.assertEqual(row["testDword"], 9999999) def test_get_dword(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) self.assertEqual(row["testDword"], 4294967295) def test_set_dint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) # The range of numbers is -2147483648 to 2147483647 + row.set_value(23, "DINT", 2147483647) # set value self.assertEqual(row["testDint"], 2147483647) def test_get_dint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(23, "DINT") # get value self.assertEqual(value, -2147483648) def test_set_word(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) # The range of numbers is 0 to 65535 row.set_value(27, "WORD", 0) # set value self.assertEqual(row["testWord"], 0) def test_get_word(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) value = row.get_value(27, "WORD") # get value self.assertEqual(value, 65535) def test_export(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec, layout_offset=4) + row = Row(test_array, test_spec, layout_offset=4) data = row.export() self.assertIn("testDword", data) self.assertIn("testbool1", data) @@ -522,7 +522,7 @@ def test_export(self) -> None: def test_indented_layout(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) x = row["ID"] y_single_space = row["testbool1"] y_multi_space = row["testbool2"] @@ -546,61 +546,61 @@ def test_indented_layout(self) -> None: def test_get_uint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testUint"] self.assertEqual(val, 12345) def test_get_udint(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testUdint"] self.assertEqual(val, 123456789) def test_get_lreal(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testLreal"] self.assertEqual(val, 123456789.123456789) def test_get_char(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testChar"] self.assertEqual(val, "A") def test_get_wchar(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testWchar"] self.assertEqual(val, "Ω") def test_get_wstring(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testWstring"] self.assertEqual(val, "ΩstÄ") def test_get_date(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testDate"] self.assertEqual(val, datetime.date(day=9, month=3, year=2022)) def test_get_tod(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testTod"] self.assertEqual(val, datetime.timedelta(hours=12, minutes=34, seconds=56)) def test_get_dtl(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) val = row["testDtl"] self.assertEqual(val, datetime.datetime(year=2022, month=3, day=9, hour=12, minute=34, second=45)) def test_set_date(self) -> None: test_array = bytearray(_bytearray) - row = DB_Row(test_array, test_spec_indented, layout_offset=4) + row = Row(test_array, test_spec_indented, layout_offset=4) row["testDate"] = datetime.date(day=28, month=3, year=2024) self.assertEqual(row["testDate"], datetime.date(day=28, month=3, year=2024)) diff --git a/tox.ini b/tox.ini index 833a63cf..d24dc630 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ commands = basepython = python3.10 deps = -r{toxinidir}/requirements-dev.txt skip_install = true -commands = mypy {toxinidir}/snap7 {toxinidir}/tests +commands = mypy {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example [testenv:lint-ruff]