diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b6b4a8c5..196b23f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,9 +22,11 @@ repos: - id: mypy additional_dependencies: [types-setuptools] files: ^snap7 + args: [--strict] - repo: https://github.com/astral-sh/ruff-pre-commit rev: 'v0.4.2' hooks: - id: ruff - id: ruff-format +exclude: "snap7/protocol.py" diff --git a/.venv3.11/pyvenv.cfg b/.venv3.11/pyvenv.cfg new file mode 100644 index 00000000..87098a2e --- /dev/null +++ b/.venv3.11/pyvenv.cfg @@ -0,0 +1,5 @@ +home = /usr/bin +include-system-site-packages = false +version = 3.11.9 +executable = /usr/bin/python3.11 +command = /usr/bin/python3.11 -m venv /home/user/Documents/python-snap7/.venv3.11 diff --git a/snap7/client/__init__.py b/snap7/client/__init__.py index e21ca87f..25eaa022 100644 --- a/snap7/client/__init__.py +++ b/snap7/client/__init__.py @@ -5,11 +5,13 @@ import re import logging from ctypes import CFUNCTYPE, byref, create_string_buffer, sizeof -from ctypes import Array, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p +from ctypes import Array, _SimpleCData, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p from datetime import datetime -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union, Type +from types import TracebackType from ..common import check_error, ipv4, load_library +from ..protocol import Snap7CliProtocol from ..types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem from ..types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen from ..types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types @@ -18,11 +20,11 @@ logger = logging.getLogger(__name__) -def error_wrap(func): +def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) + def f(*args: Any, **kwargs: Any) -> None: + code = func(*args, **kwargs) check_error(code, context="client") return f @@ -47,10 +49,10 @@ class Client: >>> client.db_write(1, 0, data) """ - _lib: Any # since this is dynamically loaded from a DLL we don't have the type signature. + _lib: Snap7CliProtocol _read_callback = None _callback = None - _s7_client: Optional[S7Object] = None + _s7_client: S7Object def __init__(self, lib_location: Optional[str] = None): """Creates a new `Client` instance. @@ -66,22 +68,24 @@ def __init__(self, lib_location: Optional[str] = None): """ - self._lib = load_library(lib_location) + self._lib: Snap7CliProtocol = load_library(lib_location) self.create() - def __enter__(self): + def __enter__(self) -> "Client": return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: self.destroy() - def __del__(self): + def __del__(self) -> None: self.destroy() - def create(self): + def create(self) -> None: """Creates a SNAP7 client.""" logger.info("creating snap7 client") - self._lib.Cli_Create.restype = S7Object + self._lib.Cli_Create.restype = S7Object # type: ignore[attr-defined] self._s7_client = S7Object(self._lib.Cli_Create()) def destroy(self) -> Optional[int]: @@ -97,7 +101,7 @@ def destroy(self) -> Optional[int]: logger.info("destroying snap7 client") if self._lib and self._s7_client is not None: return self._lib.Cli_Destroy(byref(self._s7_client)) - self._s7_client = None + self._s7_client = None # type: ignore[assignment] return None def plc_stop(self) -> int: @@ -199,7 +203,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int """ logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}") - self.set_param(RemotePort, tcpport) + self.set_param(number=RemotePort, value=tcpport) return self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot)) def db_read(self, db_number: int, start: int, size: int) -> bytearray: @@ -441,7 +445,7 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) -> cdata = (type_ * len(data)).from_buffer_copy(data) return self._lib.Cli_WriteArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, byref(cdata)) - def read_multi_vars(self, items) -> Tuple[int, S7DataItem]: + def read_multi_vars(self, items: Array[S7DataItem]) -> Tuple[int, Array[S7DataItem]]: """Reads different kind of variables from a PLC simultaneously. Args: @@ -472,7 +476,7 @@ def list_blocks(self) -> BlocksList: logger.debug(f"blocks: {blocksList}") return blocksList - def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]: + def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array[c_uint16]]: """This function returns the AG list of a specified block type. Args: @@ -592,11 +596,11 @@ def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int) """ if not re.match(ipv4, address): raise ValueError(f"{address} is invalid ipv4") - result = self._lib.Cli_SetConnectionParams(self._s7_client, address, c_uint16(local_tsap), c_uint16(remote_tsap)) + result = self._lib.Cli_SetConnectionParams(self._s7_client, address.encode(), c_uint16(local_tsap), c_uint16(remote_tsap)) if result != 0: raise ValueError("The parameter was invalid") - def set_connection_type(self, connection_type: int): + def set_connection_type(self, connection_type: int) -> None: """Sets the connection resource type, i.e the way in which the Clients connects to a PLC. Args: @@ -659,7 +663,7 @@ def ab_write(self, start: int, data: bytearray) -> int: logger.debug(f"ab write: start: {start}: size: {size}: ") return self._lib.Cli_ABWrite(self._s7_client, start, size, byref(cdata)) - def as_ab_read(self, start: int, size: int, data) -> int: + def as_ab_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of IPU area from a PLC asynchronously. Args: @@ -720,7 +724,7 @@ def as_copy_ram_to_rom(self, timeout: int = 1) -> int: check_error(result, context="client") return result - def as_ct_read(self, start: int, amount: int, data) -> int: + def as_ct_read(self, start: int, amount: int, data: "_SimpleCData[Any]") -> int: """Reads counters from a PLC asynchronously. Args: @@ -752,7 +756,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int: check_error(result, context="client") return result - def as_db_fill(self, db_number: int, filler) -> int: + def as_db_fill(self, db_number: int, filler: int) -> int: """Fills a DB in AG with a given byte. Args: @@ -766,7 +770,7 @@ def as_db_fill(self, db_number: int, filler) -> int: check_error(result, context="client") return result - def as_db_get(self, db_number: int, _buffer, size) -> bytearray: + def as_db_get(self, db_number: int, _buffer: "_SimpleCData[Any]", size: "_SimpleCData[Any]") -> int: """Uploads a DB from AG using DBRead. Note: @@ -784,7 +788,7 @@ def as_db_get(self, db_number: int, _buffer, size) -> bytearray: check_error(result, context="client") return result - def as_db_read(self, db_number: int, start: int, size: int, data) -> Array: + def as_db_read(self, db_number: int, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of a DB from a PLC. Args: @@ -807,7 +811,7 @@ def as_db_read(self, db_number: int, start: int, size: int, data) -> Array: check_error(result, context="client") return result - def as_db_write(self, db_number: int, start: int, size: int, data) -> int: + def as_db_write(self, db_number: int, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Writes a part of a DB into a PLC. Args: @@ -943,7 +947,7 @@ def set_plc_datetime(self, dt: datetime) -> int: return self._lib.Cli_SetPlcDateTime(self._s7_client, byref(buffer)) - def check_as_completion(self, p_value) -> int: + def check_as_completion(self, p_value: c_int) -> int: """Method to check Status of an async request. Result contains if the check was successful, not the data value itself Args: @@ -1000,7 +1004,7 @@ def wait_as_completion(self, timeout: int) -> int: check_error(result, context="client") return result - def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]: + def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, "Array[_SimpleCData[int]]"]: if area not in Areas: raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: @@ -1013,7 +1017,9 @@ def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array] usrdata = (type_ * size)() return wordlen, usrdata - def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int: + def as_read_area( + self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata: "Array[_SimpleCData[int]]" + ) -> int: """Reads a data area from a PLC asynchronously. With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters. @@ -1032,11 +1038,11 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle f"reading area: {area.name} dbnumber: {dbnumber} start: {start} amount: {size} " f"wordlen: {wordlen.name}={wordlen.value}" ) - result = self._lib.Cli_AsReadArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, pusrdata) + result = self._lib.Cli_AsReadArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, byref(pusrdata)) check_error(result, context="client") return result - def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array]: + def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array[Any]]: if area not in Areas: raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: @@ -1049,7 +1055,7 @@ def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, cdata = (type_ * len(data)).from_buffer_copy(data) return wordlen, cdata - def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int: + def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata: bytearray) -> int: """Writes a data area into a PLC asynchronously. Args: @@ -1072,7 +1078,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl check_error(res, context="client") return res - def as_eb_read(self, start: int, size: int, data) -> int: + def as_eb_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of IPI area from a PLC asynchronously. Args: @@ -1124,7 +1130,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int: check_error(result, context="client") return result - def as_list_blocks_of_type(self, blocktype: str, data, count) -> int: + def as_list_blocks_of_type(self, blocktype: str, data: "_SimpleCData[Any]", count: "_SimpleCData[Any]") -> int: """Returns the AG blocks list of a given type. Args: @@ -1145,7 +1151,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int: check_error(result, context="client") return result - def as_mb_read(self, start: int, size: int, data) -> int: + def as_mb_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of Merkers area from a PLC. Args: @@ -1177,7 +1183,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int: check_error(result, context="client") return result - def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int: + def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size: "_SimpleCData[Any]") -> int: """Reads a partial list of given ID and Index. Args: @@ -1193,7 +1199,7 @@ def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int: check_error(result, context="client") return result - def as_read_szl_list(self, szl_list, items_count) -> int: + def as_read_szl_list(self, szl_list: "_SimpleCData[Any]", items_count: "_SimpleCData[Any]") -> int: """Reads the list of partial lists available in the CPU. Args: @@ -1207,7 +1213,7 @@ def as_read_szl_list(self, szl_list, items_count) -> int: check_error(result, context="client") return result - def as_tm_read(self, start: int, amount: int, data) -> bytearray: + def as_tm_read(self, start: int, amount: int, data: "_SimpleCData[Any]") -> int: """Reads timers from a PLC. Args: @@ -1239,7 +1245,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int: check_error(result) return result - def as_upload(self, block_num: int, _buffer, size) -> int: + def as_upload(self, block_num: int, _buffer: "_SimpleCData[Any]", size: "_SimpleCData[Any]") -> int: """Uploads a block from AG. Note: @@ -1363,7 +1369,7 @@ def error_text(self, error: int) -> str: text_length = c_int(256) error_code = c_int32(error) text = create_string_buffer(buffer_size) - response = self._lib.Cli_ErrorText(error_code, byref(text), text_length) + response = self._lib.Cli_ErrorText(error_code, text, text_length) check_error(response) result = bytearray(text)[: text_length.value].decode().strip("\x00") return result diff --git a/snap7/client/client.pyi b/snap7/client/client.pyi new file mode 100644 index 00000000..78517283 --- /dev/null +++ b/snap7/client/client.pyi @@ -0,0 +1,103 @@ +from ctypes import _CArgObject, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p +from _ctypes import CFuncPtr + +class Snap7CliProtocol: + def Cli_Create(self) -> c_void_p: ... + def Cli_Destroy(self, pointer: _CArgObject) -> int: ... + def Cli_PlcStop(self, pointer: c_void_p) -> int: ... + def Cli_PlcColdStart(self, pointer: c_void_p) -> int: ... + def Cli_PlcHotStart(self, pointer: c_void_p) -> int: ... + def Cli_GetPlcStatus(self, pointer: c_void_p, state: _CArgObject) -> int: ... + def Cli_GetCpuInfo(self, pointer: c_void_p, info: _CArgObject) -> int: ... + def Cli_Disconnect(self, pointer: c_void_p) -> int: ... + def Cli_Connect(self, pointer: c_void_p) -> int: ... + def Cli_ConnectTo(self, pointer: c_void_p, address: c_char_p, rack: c_int, slot: c_int) -> int: ... + def Cli_DBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_DBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_Delete(self, pointer: c_void_p, blocktype: c_int, block_num: int) -> int: ... + def Cli_FullUpload( + self, pointer: c_void_p, blocktype: c_int, block_num: int, data: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_Upload(self, pointer: c_void_p, block_type: c_int, block_num: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_Download(self, pointer: c_void_p, block_num: int, data: _CArgObject, size: int) -> int: ... + def Cli_DBGet(self, pointer: c_void_p, db_number: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_WriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_ReadMultiVars(self, pointer: c_void_p, items: _CArgObject, items_count: c_int32) -> int: ... + def Cli_ListBlocks(self, pointer: c_void_p, blocksList: _CArgObject) -> int: ... + def Cli_ListBlocksOfType(self, pointer: c_void_p, blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_GetAgBlockInfo(self, pointer: c_void_p, blocktype: c_int, db_number: int, data: _CArgObject) -> int: ... + def Cli_SetSessionPassword(self, pointer: c_void_p, password: c_char_p) -> int: ... + def Cli_ClearSessionPassword(self, pointer: c_void_p) -> int: ... + def Cli_SetConnectionParams(self, pointer: c_void_p, address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16) -> int: ... + def Cli_SetConnectionType(self, pointer: c_void_p, connection_type: c_uint16) -> int: ... + def Cli_GetConnected(self, pointer: c_void_p, connected: _CArgObject) -> int: ... + def Cli_ABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_ABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsCompress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCopyRamToRom(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsCTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_AsDBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_AsDBGet(self, pointer: c_void_p, db_number: int, _buffer: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsDBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDownload(self, pointer: c_void_p, block_num: int, cdata: _CArgObject, size: int) -> int: ... + def Cli_Compress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Cli_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Cli_GetPduLength(self, pointer: c_void_p, requested_: _CArgObject, negotiated_: _CArgObject) -> int: ... + def Cli_GetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetAsCallback(self, pointer: c_void_p, pfn_clicompletion: CFuncPtr, p_usr: c_void_p) -> int: ... + def Cli_WaitAsCompletion(self, pointer: c_void_p, timeout: c_ulong) -> int: ... + def Cli_AsReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsWriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsEBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsEBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsFullUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_AsListBlocksOfType(self, pointer: c_void_p, _blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_AsMBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsMBWrite(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_AsTMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsTMWrite(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_CopyRamToRom(self, pointer: c_void_p, timeout: int) -> int: ... + def Cli_CTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_CTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_DBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_EBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_EBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ErrorText(self, error_code: c_int32, text: _CArgObject, text_length: c_int) -> int: ... + def Cli_GetCpInfo(self, pointer: c_void_p, cp_info: _CArgObject) -> int: ... + def Cli_GetExecTime(self, pointer: c_void_p, time: _CArgObject) -> int: ... + def Cli_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Cli_GetOrderCode(self, pointer: c_void_p, order_code: _CArgObject) -> int: ... + def Cli_GetPgBlockInfo(self, pointer: c_void_p, buffer: _CArgObject, block_info: _CArgObject, size: c_int) -> int: ... + def Cli_GetProtection(self, pointer: c_void_p, s7_protection: _CArgObject) -> int: ... + def Cli_IsoExchangeBuffer(self, pointer: c_void_p, cdata: _CArgObject, size: _CArgObject) -> int: ... + def Cli_MBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_MBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_SetPlcSystemDateTime(self, pointer: c_void_p) -> int: ... + def Cli_TMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_TMWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_WriteMultiVars(self, pointer: c_void_p, cdata: _CArgObject, items_count: c_int32) -> int: ... + def Cli_CheckAsCompletion(self, pointer: c_void_p, p_value: c_int) -> int: ... diff --git a/snap7/common.py b/snap7/common.py index cb979693..48493d50 100644 --- a/snap7/common.py +++ b/snap7/common.py @@ -3,10 +3,12 @@ import pathlib import platform from pathlib import Path -from ctypes import c_char -from typing import Any, Literal, Optional +from ctypes import Array, c_char, c_int, c_int32 +from typing import Callable, Literal, NoReturn, Optional, cast from ctypes.util import find_library from functools import cache +from .protocol import Snap7CliProtocol + if platform.system() == "Windows": from ctypes import windll as cdll # type: ignore @@ -19,7 +21,7 @@ ipv4 = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$" -def _raise_error(): +def _raise_error() -> NoReturn: error = f"""can't find snap7 shared library. This probably means you are installing python-snap7 from source. When no binary wheel is found for you architecture, pip @@ -72,7 +74,7 @@ def _find_in_package() -> Optional[str]: @cache -def load_library(lib_location: Optional[str] = None) -> Any: +def load_library(lib_location: Optional[str] = None) -> Snap7CliProtocol: """Loads the `snap7.dll` library. Returns: cdll: a ctypes cdll object with the snap7 shared library loaded. @@ -83,7 +85,7 @@ def load_library(lib_location: Optional[str] = None) -> Any: if not lib_location: _raise_error() - return cdll.LoadLibrary(lib_location) + return cast(Snap7CliProtocol, cdll.LoadLibrary(lib_location)) Context = Literal["client", "server", "partner"] @@ -107,7 +109,7 @@ def check_error(code: int, context: Context = "client") -> None: raise RuntimeError(error) -def error_text(error, context: Context = "client") -> bytes: +def error_text(error: int, context: Context = "client") -> bytes: """Returns a textual explanation of a given error number Args: @@ -127,6 +129,10 @@ def error_text(error, context: Context = "client") -> bytes: text_type = c_char * len_ text = text_type() library = load_library() - map_ = {"client": library.Cli_ErrorText, "server": library.Srv_ErrorText, "partner": library.Par_ErrorText} - map_[context](error, text, len_) + error_text_func: Callable[[c_int32, Array[c_char], c_int], int] = { + "client": library.Cli_ErrorText, + "server": library.Srv_ErrorText, + "partner": library.Par_ErrorText, + }[context] + error_text_func(c_int32(error), text, c_int(len_)) return text.value diff --git a/snap7/logo.py b/snap7/logo.py index ff348cf4..b295e10a 100644 --- a/snap7/logo.py +++ b/snap7/logo.py @@ -5,7 +5,7 @@ import re import struct import logging -from ctypes import byref, c_int, c_int32, c_uint16, c_void_p +from ctypes import byref, c_int, c_int32, c_uint16 from .types import WordLen, S7Object, param_types from .types import RemotePort, Areas, wordlen_to_ctypes @@ -27,19 +27,19 @@ class Logo: For more information see examples for Siemens Logo 7 and 8 """ - def __init__(self): + def __init__(self) -> None: """Creates a new instance of :obj:`Logo`""" - self.pointer = None + self.pointer: S7Object self.library = load_library() self.create() - def __del__(self): + def __del__(self) -> None: self.destroy() - def create(self): + def create(self) -> None: """Create a SNAP7 client.""" logger.info("creating snap7 client") - self.library.Cli_Create.restype = c_void_p + self.library.Cli_Create.restype = S7Object # type: ignore[attr-defined] self.pointer = S7Object(self.library.Cli_Create()) def destroy(self) -> int: @@ -87,7 +87,7 @@ def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int check_error(result, context="client") return result - def read(self, vm_address: str): + def read(self, vm_address: str) -> int: """Reads from VM addresses of Siemens Logo. Examples: read("V40") / read("VW64") / read("V10.2") Args: @@ -139,13 +139,14 @@ def read(self, vm_address: str): check_error(result, context="client") # transform result to int value if wordlen == WordLen.Bit: - return data[0] + result = int(data[0]) if wordlen == WordLen.Byte: - return struct.unpack_from(">B", data)[0] + result = struct.unpack_from(">B", data)[0] if wordlen == WordLen.Word: - return struct.unpack_from(">h", data)[0] + result = struct.unpack_from(">h", data)[0] if wordlen == WordLen.DWord: - return struct.unpack_from(">l", data)[0] + result = struct.unpack_from(">l", data)[0] + return result def write(self, vm_address: str, value: int) -> int: """Writes to VM addresses of Siemens Logo. @@ -251,7 +252,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int: check_error(result, context="client") return result - def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int): + def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int) -> None: """Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates. Notes: @@ -274,7 +275,7 @@ def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int if result != 0: raise ValueError("The parameter was invalid") - def set_connection_type(self, connection_type: int): + def set_connection_type(self, connection_type: int) -> None: """Sets the connection resource type, i.e the way in which the Clients connects to a PLC. @@ -303,7 +304,7 @@ def get_connected(self) -> bool: check_error(result, context="client") return bool(connected) - def set_param(self, number: int, value): + def set_param(self, number: int, value: int) -> int: """Sets an internal Server object parameter. Args: @@ -319,7 +320,7 @@ def set_param(self, number: int, value): check_error(result, context="client") return result - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """Reads an internal Logo object parameter. Args: diff --git a/snap7/partner.py b/snap7/partner.py index df05481b..01289121 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -11,19 +11,20 @@ import re import logging from ctypes import byref, c_int, c_int32, c_uint32, c_void_p -from typing import Tuple, Optional +from typing import Any, Callable, Optional, Tuple from .common import ipv4, check_error, load_library +from .protocol import Snap7CliProtocol from .types import S7Object, param_types, word logger = logging.getLogger(__name__) -def error_wrap(func): +def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) + def f(*args: Any, **kwargs: Any) -> None: + code = func(*args, **kwargs) check_error(code, context="partner") return f @@ -34,14 +35,13 @@ class Partner: A snap7 partner. """ - _pointer: Optional[c_void_p] + _pointer: c_void_p def __init__(self, active: bool = False): - self._library = load_library() - self._pointer = None + self._library: Snap7CliProtocol = load_library() self.create(active) - def __del__(self): + def __del__(self) -> None: self.destroy() def as_b_send(self) -> int: @@ -91,7 +91,7 @@ def check_as_b_send_completion(self) -> Tuple[str, c_int32]: return return_values[result], op_result - def create(self, active: bool = False): + def create(self, active: bool = False) -> None: """ Creates a Partner and returns its handle, which is the reference that you have to use every time you refer to that Partner. @@ -99,10 +99,10 @@ def create(self, active: bool = False): :param active: 0 :returns: a pointer to the partner object """ - self._library.Par_Create.restype = S7Object + self._library.Par_Create.restype = S7Object # type: ignore[attr-defined] self._pointer = S7Object(self._library.Par_Create(int(active))) - def destroy(self): + def destroy(self) -> Optional[int]: """ Destroy a Partner of given handle. Before destruction the Partner is stopped, all clients disconnected and @@ -121,7 +121,7 @@ def get_last_error(self) -> c_int32: check_error(result, "partner") return error - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """ Reads an internal Partner object parameter. """ @@ -166,10 +166,10 @@ def get_times(self) -> Tuple[c_int32, c_int32]: return send_time, recv_time @error_wrap - def set_param(self, number: int, value) -> int: + def set_param(self, number: int, value: int) -> int: """Sets an internal Partner object parameter.""" logger.debug(f"setting param number {number} to {value}") - return self._library.Par_SetParam(self._pointer, number, byref(c_int(value))) + return self._library.Par_SetParam(self._pointer, c_int(number), byref(c_int(value))) def set_recv_callback(self) -> int: """ diff --git a/snap7/protocol.py b/snap7/protocol.py new file mode 100644 index 00000000..7c9c9e74 --- /dev/null +++ b/snap7/protocol.py @@ -0,0 +1,140 @@ +from typing import Protocol + + +class Snap7CliProtocol(Protocol): + # Client + def Cli_Create(self): ... + def Cli_Destroy(self, pointer): ... + def Cli_PlcStop(self, pointer): ... + def Cli_PlcColdStart(self, pointer): ... + def Cli_PlcHotStart(self, pointer): ... + def Cli_GetPlcStatus(self, pointer, state): ... + def Cli_GetCpuInfo(self, pointer, info): ... + def Cli_Disconnect(self, pointer): ... + def Cli_Connect(self, pointer): ... + def Cli_ConnectTo(self, pointer, address, rack, slot): ... + def Cli_DBRead(self, pointer, db_number, start, size, data): ... + def Cli_DBWrite(self, pointer, db_number, start, size, data): ... + def Cli_Delete(self, pointer, blocktype, block_num): ... + def Cli_FullUpload(self, pointer, blocktype, block_num, data, size): ... + def Cli_Upload(self, pointer, block_type, block_num, data, size): ... + def Cli_Download(self, pointer, block_num, data, size): ... + def Cli_DBGet(self, pointer, db_number, data, size): ... + def Cli_ReadArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_WriteArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_ReadMultiVars(self, pointer, items, items_count32): ... + def Cli_ListBlocks(self, pointer, blocksList): ... + def Cli_ListBlocksOfType(self, pointer, blocktype, data, count): ... + def Cli_GetAgBlockInfo(self, pointer, blocktype, db_number, data): ... + def Cli_SetSessionPassword(self, pointer, password): ... + def Cli_ClearSessionPassword(self, pointer): ... + def Cli_SetConnectionParams(self, pointer, address, local_tsap, remote_tsap): ... + def Cli_SetConnectionType(self, pointer, connection_type): ... + def Cli_GetConnected(self, pointer, connected): ... + def Cli_ABRead(self, pointer, start, size, data): ... + def Cli_ABWrite(self, pointer, start, size, cdata): ... + def Cli_AsABRead(self, pointer, start, size, data): ... + def Cli_AsABWrite(self, pointer, start, size, cdata): ... + def Cli_AsCompress(self, pointer, time): ... + def Cli_AsCopyRamToRom(self, pointer, time): ... + def Cli_AsCTRead(self, pointer, start, amount, data): ... + def Cli_AsCTWrite(self, pointer, start, amount, cdata): ... + def Cli_AsDBFill(self, pointer, db_number, filler): ... + def Cli_AsDBGet(self, pointer, db_number, _buffer, size): ... + def Cli_AsDBRead(self, pointer, db_number, start, size, data): ... + def Cli_AsDBWrite(self, pointer, db_number, start, size, data): ... + def Cli_AsDownload(self, pointer, block_num, cdata, size): ... + def Cli_Compress(self, pointer, time): ... + def Cli_SetParam(self, pointer, number, value): ... + def Cli_GetParam(self, pointer, number, value): ... + def Cli_GetPduLength(self, pointer, requested_, negotiated_): ... + def Cli_GetPlcDateTime(self, pointer, buffer): ... + def Cli_SetPlcDateTime(self, pointer, buffer): ... + def Cli_SetAsCallback(self, pointer, pfn_clicompletion, p_usr): ... + def Cli_WaitAsCompletion(self, pointer, timeout): ... + def Cli_AsReadArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_AsWriteArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_AsEBRead(self, pointer, start, size, data): ... + def Cli_AsEBWrite(self, pointer, start, size, cdata): ... + def Cli_AsFullUpload(self, pointer, block_type, block_num, _buffer, size): ... + def Cli_AsListBlocksOfType(self, pointer, _blocktype, data, count): ... + def Cli_AsMBRead(self, pointer, start, size, data): ... + def Cli_AsMBWrite(self, pointer, start, size, data): ... + def Cli_AsReadSZL(self, pointer, ssl_id, index, s7_szl, size): ... + def Cli_AsReadSZLList(self, pointer, szl_list, items_count): ... + def Cli_AsTMRead(self, pointer, start, amount, data): ... + def Cli_AsTMWrite(self, pointer, start, amount, data): ... + def Cli_AsUpload(self, pointer, block_type, block_num, _buffer, size): ... + def Cli_CopyRamToRom(self, pointer, timeout): ... + def Cli_CTRead(self, pointer, start, amount, data): ... + def Cli_CTWrite(self, pointer, start, amount, cdata): ... + def Cli_DBFill(self, pointer, db_number, filler): ... + def Cli_EBRead(self, pointer, start, size, data): ... + def Cli_EBWrite(self, pointer, start, size, cdata): ... + def Cli_ErrorText(self, error_code32, text, text_length): ... + def Cli_GetCpInfo(self, pointer, cp_info): ... + def Cli_GetExecTime(self, pointer, time): ... + def Cli_GetLastError(self, pointer, last_error): ... + def Cli_GetOrderCode(self, pointer, order_code): ... + def Cli_GetPgBlockInfo(self, pointer, buffer, block_info, size): ... + def Cli_GetProtection(self, pointer, s7_protection): ... + def Cli_IsoExchangeBuffer(self, pointer, cdata, size): ... + def Cli_MBRead(self, pointer, start, size, data): ... + def Cli_MBWrite(self, pointer, start, size, cdata): ... + def Cli_ReadSZL(self, pointer, ssl_id, index, s7_szl, size): ... + def Cli_ReadSZLList(self, pointer, szl_list, items_count): ... + def Cli_SetPlcSystemDateTime(self, pointer): ... + def Cli_TMRead(self, pointer, start, amount, data): ... + def Cli_TMWrite(self, pointer, start, amount, cdata): ... + def Cli_WriteMultiVars(self, pointer, cdata, items_count32): ... + def Cli_CheckAsCompletion(self, pointer, p_value): ... + # Server + def Srv_Create(self): ... + def Srv_Start(self, pointer): ... + def Srv_Stop(self, pointer): ... + def Srv_Destroy(self, pointer): ... + def Srv_EventText(self, event, text, len_): ... + def Srv_RegisterArea(self, pointer, area_code, index, userdata, size): ... + def Srv_SetEventsCallback(self, pointer, callback, usrPtr): ... + def Srv_SetReadEventsCallback(self, pointer, read_callback): ... + def Srv_GetStatus(self, pointer, server_status, cpu_status, clients_count): ... + def Srv_UnregisterArea(self, pointer, area_code, index): ... + def Srv_UnlockArea(self, pointer, code, index): ... + def Srv_LockArea(self, pointer, code, index): ... + def Srv_StartTo(self, pointer, ip): ... + def Srv_SetParam(self, pointer, number, value): ... + def Srv_SetMask(self, pointer, kind, mask): ... + def Srv_SetCpuStatus(self, pointer, status): ... + def Srv_PickEvent(self, pointer, event, ready): ... + def Srv_GetParam(self, pointer, number, value): ... + def Srv_GetMask(self, pointer, kind, mask): ... + def Srv_ClearEvents(self, pointer): ... + def Srv_ErrorText(self, error_code32, text, text_length): ... + # Partner + def Par_Create(self, active): ... + def Par_AsBSend(self, pointer): ... + def Par_BRecv(self, pointer): ... + def Par_BSend(self, pointer): ... + def Par_CheckAsBRecvCompletion(self, pointer): ... + def Par_CheckAsBSendCompletion(self, pointer, result): ... + def Par_Destroy(self, pointer): ... + def Par_GetLastError(self, pointer, last_error): ... + def Par_GetStats( + self, + pointer, + bytes_sent, + bytes_recv, + send_errors, + recv_errors, + ): ... + def Par_GetStatus(self, pointer, status): ... + def Par_SetParam(self, pointer, number, value): ... + def Par_GetParam(self, pointer, number, value): ... + def Par_SetRecvCallback(self, pointer): ... + def Par_SetSendCallback(self, pointer): ... + def Par_Start(self, pointer): ... + def Par_StartTo(self, pointer, local_address, remote_address, local_tsap, remote_tsap): ... + def Par_Stop(self, pointer): ... + def Par_WaitAsBSendCompletion(self, pointer, timeout): ... + def Par_ErrorText(self, error_code32, text, text_length): ... + def Par_GetTimes(self, pointer, send_time, recv_time): ... diff --git a/snap7/protocol.pyi b/snap7/protocol.pyi new file mode 100644 index 00000000..53f89d5a --- /dev/null +++ b/snap7/protocol.pyi @@ -0,0 +1,160 @@ +from typing import Type + +from ctypes import Array, c_char, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p +from _ctypes import CFuncPtr, _CArgObject + +class Snap7CliProtocol: + # Client + def Cli_Create(self) -> int: ... + def Cli_Destroy(self, pointer: _CArgObject) -> int: ... + def Cli_PlcStop(self, pointer: c_void_p) -> int: ... + def Cli_PlcColdStart(self, pointer: c_void_p) -> int: ... + def Cli_PlcHotStart(self, pointer: c_void_p) -> int: ... + def Cli_GetPlcStatus(self, pointer: c_void_p, state: _CArgObject) -> int: ... + def Cli_GetCpuInfo(self, pointer: c_void_p, info: _CArgObject) -> int: ... + def Cli_Disconnect(self, pointer: c_void_p) -> int: ... + def Cli_Connect(self, pointer: c_void_p) -> int: ... + def Cli_ConnectTo(self, pointer: c_void_p, address: c_char_p, rack: c_int, slot: c_int) -> int: ... + def Cli_DBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_DBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_Delete(self, pointer: c_void_p, blocktype: c_int, block_num: int) -> int: ... + def Cli_FullUpload( + self, pointer: c_void_p, blocktype: c_int, block_num: int, data: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_Upload(self, pointer: c_void_p, block_type: c_int, block_num: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_Download(self, pointer: c_void_p, block_num: int, data: _CArgObject, size: int) -> int: ... + def Cli_DBGet(self, pointer: c_void_p, db_number: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_WriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_ReadMultiVars(self, pointer: c_void_p, items: _CArgObject, items_count: c_int32) -> int: ... + def Cli_ListBlocks(self, pointer: c_void_p, blocksList: _CArgObject) -> int: ... + def Cli_ListBlocksOfType(self, pointer: c_void_p, blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_GetAgBlockInfo(self, pointer: c_void_p, blocktype: c_int, db_number: int, data: _CArgObject) -> int: ... + def Cli_SetSessionPassword(self, pointer: c_void_p, password: c_char_p) -> int: ... + def Cli_ClearSessionPassword(self, pointer: c_void_p) -> int: ... + def Cli_SetConnectionParams(self, pointer: c_void_p, address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16) -> int: ... + def Cli_SetConnectionType(self, pointer: c_void_p, connection_type: c_uint16) -> int: ... + def Cli_GetConnected(self, pointer: c_void_p, connected: _CArgObject) -> int: ... + def Cli_ABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_ABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsCompress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCopyRamToRom(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsCTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_AsDBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_AsDBGet(self, pointer: c_void_p, db_number: int, _buffer: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsDBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDownload(self, pointer: c_void_p, block_num: int, cdata: _CArgObject, size: int) -> int: ... + def Cli_Compress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Cli_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Cli_GetPduLength(self, pointer: c_void_p, requested_: _CArgObject, negotiated_: _CArgObject) -> int: ... + def Cli_GetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetAsCallback(self, pointer: c_void_p, pfn_clicompletion: CFuncPtr, p_usr: c_void_p) -> int: ... + def Cli_WaitAsCompletion(self, pointer: c_void_p, timeout: c_ulong) -> int: ... + def Cli_AsReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsWriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsEBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsEBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsFullUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_AsListBlocksOfType(self, pointer: c_void_p, _blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_AsMBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsMBWrite(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_AsTMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsTMWrite(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_CopyRamToRom(self, pointer: c_void_p, timeout: int) -> int: ... + def Cli_CTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_CTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_DBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_EBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_EBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + def Cli_GetCpInfo(self, pointer: c_void_p, cp_info: _CArgObject) -> int: ... + def Cli_GetExecTime(self, pointer: c_void_p, time: _CArgObject) -> int: ... + def Cli_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Cli_GetOrderCode(self, pointer: c_void_p, order_code: _CArgObject) -> int: ... + def Cli_GetPgBlockInfo(self, pointer: c_void_p, buffer: _CArgObject, block_info: _CArgObject, size: c_int) -> int: ... + def Cli_GetProtection(self, pointer: c_void_p, s7_protection: _CArgObject) -> int: ... + def Cli_IsoExchangeBuffer(self, pointer: c_void_p, cdata: _CArgObject, size: _CArgObject) -> int: ... + def Cli_MBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_MBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_SetPlcSystemDateTime(self, pointer: c_void_p) -> int: ... + def Cli_TMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_TMWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_WriteMultiVars(self, pointer: c_void_p, cdata: _CArgObject, items_count: c_int32) -> int: ... + def Cli_CheckAsCompletion(self, pointer: c_void_p, p_value: c_int) -> int: ... + # Server + def Srv_Create(self) -> int: ... + def Srv_Start(self, pointer: c_void_p) -> int: ... + def Srv_Stop(self, pointer: c_void_p) -> int: ... + def Srv_Destroy(self, pointer: _CArgObject) -> None: ... + def Srv_EventText(self, event: _CArgObject, text: _CArgObject, len_: int) -> int: ... + def Srv_RegisterArea(self, pointer: c_void_p, area_code: int, index: int, userdata: _CArgObject, size: int) -> int: ... + def Srv_SetEventsCallback(self, pointer: c_void_p, callback: Type[CFuncPtr], usrPtr: c_void_p) -> int: ... + def Srv_SetReadEventsCallback(self, pointer: c_void_p, read_callback: CFuncPtr) -> int: ... + def Srv_GetStatus( + self, pointer: c_void_p, server_status: _CArgObject, cpu_status: _CArgObject, clients_count: _CArgObject + ) -> int: ... + def Srv_UnregisterArea(self, pointer: c_void_p, area_code: int, index: int) -> int: ... + def Srv_UnlockArea(self, pointer: c_void_p, code: int, index: int) -> int: ... + def Srv_LockArea(self, pointer: c_void_p, code: int, index: int) -> int: ... + def Srv_StartTo(self, pointer: c_void_p, ip: bytes) -> int: ... + def Srv_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Srv_SetMask(self, pointer: c_void_p, kind: int, mask: int) -> int: ... + def Srv_SetCpuStatus(self, pointer: c_void_p, status: int) -> int: ... + def Srv_PickEvent(self, pointer: c_void_p, event: _CArgObject, ready: _CArgObject) -> int: ... + def Srv_GetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Srv_GetMask(self, pointer: c_void_p, kind: int, mask: _CArgObject) -> int: ... + def Srv_ClearEvents(self, pointer: c_void_p) -> int: ... + def Srv_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + # Partner + def Par_Create(self, active: int) -> int: ... + def Par_AsBSend(self, pointer: c_void_p) -> int: ... + def Par_BRecv(self, pointer: c_void_p) -> int: ... + def Par_BSend(self, pointer: c_void_p) -> int: ... + def Par_CheckAsBRecvCompletion(self, pointer: c_void_p) -> int: ... + def Par_CheckAsBSendCompletion(self, pointer: c_void_p, result: _CArgObject) -> int: ... + def Par_Destroy(self, pointer: _CArgObject) -> int: ... + def Par_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Par_GetStats( + self, + pointer: c_void_p, + bytes_sent: _CArgObject, + bytes_recv: _CArgObject, + send_errors: _CArgObject, + recv_errors: _CArgObject, + ) -> int: ... + def Par_GetStatus(self, pointer: c_void_p, status: _CArgObject) -> int: ... + def Par_SetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Par_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Par_SetRecvCallback(self, pointer: c_void_p) -> int: ... + def Par_SetSendCallback(self, pointer: c_void_p) -> int: ... + def Par_Start(self, pointer: c_void_p) -> int: ... + def Par_StartTo( + self, pointer: c_void_p, local_address: bytes, remote_address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16 + ) -> int: ... + def Par_Stop(self, pointer: c_void_p) -> int: ... + def Par_WaitAsBSendCompletion(self, pointer: c_void_p, timeout: int) -> int: ... + def Par_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + def Par_GetTimes(self, pointer: c_void_p, send_time: _CArgObject, recv_time: _CArgObject) -> int: ... diff --git a/snap7/server/__init__.py b/snap7/server/__init__.py index b132b7eb..c4f54675 100644 --- a/snap7/server/__init__.py +++ b/snap7/server/__init__.py @@ -4,12 +4,27 @@ import re import time -from ctypes import c_char, byref, sizeof, c_int, c_int32, c_uint32, c_void_p, CFUNCTYPE, POINTER, Array, c_int8 +from ctypes import ( + c_char, + byref, + sizeof, + c_int, + c_int32, + c_uint32, + c_void_p, + CFUNCTYPE, + POINTER, + Array, + _SimpleCData, +) +from _ctypes import CFuncPtr import struct import logging -from typing import Any, Tuple, Callable, Optional +from typing import Any, Callable, Optional, Tuple, cast, Type +from types import TracebackType from ..common import ipv4, check_error, load_library +from ..protocol import Snap7CliProtocol from ..types import SrvEvent, LocalPort, cpu_statuses, server_statuses from ..types import longword, wordlen_to_ctypes, WordLen, S7Object from ..types import srvAreaDB, srvAreaPA, srvAreaTM, srvAreaCT @@ -17,11 +32,11 @@ logger = logging.getLogger(__name__) -def error_wrap(func): +def error_wrap(func: Callable[..., Any]) -> Callable[..., Any]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) + def f(*args: Any, **kwargs: Any) -> None: + code = func(*args, **kwargs) check_error(code, context="server") return f @@ -32,8 +47,8 @@ class Server: A fake S7 server. """ - _lib: Any # since this is dynamically loaded from a DLL we don't have the type signature. - _s7_server: Optional[S7Object] = None + _lib: Snap7CliProtocol + _s7_server: S7Object _read_callback = None _callback: Optional[Callable[..., Any]] = None @@ -44,18 +59,20 @@ def __init__(self, log: bool = True): Args: log: `True` for enabling the event logging. Optinoal. """ - self._lib = load_library() + self._lib: Snap7CliProtocol = load_library() self.create() if log: self._set_log_callback() - def __enter__(self): + def __enter__(self) -> "Server": return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: self.destroy() - def __del__(self): + def __del__(self) -> None: self.destroy() def event_text(self, event: SrvEvent) -> str: @@ -75,14 +92,14 @@ def event_text(self, event: SrvEvent) -> str: check_error(error) return text.value.decode("ascii") - def create(self): + def create(self) -> None: """Create the server.""" logger.info("creating server") - self._lib.Srv_Create.restype = S7Object + self._lib.Srv_Create.restype = S7Object # type: ignore[attr-defined] self._s7_server = S7Object(self._lib.Srv_Create()) @error_wrap - def register_area(self, area_code: int, index: int, userdata: Array[c_int8]): + def register_area(self, area_code: int, index: int, userdata: "Array[_SimpleCData[int]]") -> int: """Shares a memory area with the server. That memory block will be visible by the clients. @@ -121,12 +138,12 @@ def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int: call_back(pevent.contents) return 0 - self._callback = callback_wrap(wrapper) + self._callback = cast(type[CFuncPtr], callback_wrap(wrapper)) usrPtr = c_void_p() return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, usrPtr) @error_wrap - def set_read_events_callback(self, call_back: Callable[..., Any]): + def set_read_events_callback(self, call_back: Callable[..., Any]) -> int: """Sets the user callback that the Server object has to call when a Read event is created. @@ -154,17 +171,17 @@ def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int: self._read_callback = callback_wrapper(wrapper) return self._lib.Srv_SetReadEventsCallback(self._s7_server, self._read_callback) - def _set_log_callback(self): + def _set_log_callback(self) -> None: """Sets a callback that logs the events""" logger.debug("setting up event logger") - def log_callback(event): + def log_callback(event: SrvEvent) -> None: logger.info(f"callback event: {self.event_text(event)}") self.set_events_callback(log_callback) @error_wrap - def start(self, tcpport: int = 102): + def start(self, tcpport: int = 102) -> int: """Starts the server. Args: @@ -177,17 +194,17 @@ def start(self, tcpport: int = 102): return self._lib.Srv_Start(self._s7_server) @error_wrap - def stop(self): + def stop(self) -> int: """Stop the server.""" logger.info("stopping server") return self._lib.Srv_Stop(self._s7_server) - def destroy(self) -> Optional[int]: + def destroy(self) -> None: """Destroy the server.""" logger.info("destroying server") if self._lib and self._s7_server is not None: return self._lib.Srv_Destroy(byref(self._s7_server)) - self._s7_server = None + self._s7_server = None # type: ignore[assignment] return None def get_status(self) -> Tuple[str, str, int]: @@ -207,7 +224,7 @@ def get_status(self) -> Tuple[str, str, int]: return (server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value) @error_wrap - def unregister_area(self, area_code: int, index: int): + def unregister_area(self, area_code: int, index: int) -> int: """'Unshares' a memory area previously shared with Srv_RegisterArea(). Notes: @@ -223,7 +240,7 @@ def unregister_area(self, area_code: int, index: int): return self._lib.Srv_UnregisterArea(self._s7_server, area_code, index) @error_wrap - def unlock_area(self, code: int, index: int): + def unlock_area(self, code: int, index: int) -> int: """Unlocks a previously locked shared memory area. Args: @@ -237,7 +254,7 @@ def unlock_area(self, code: int, index: int): return self._lib.Srv_UnlockArea(self._s7_server, code, index) @error_wrap - def lock_area(self, code: int, index: int): + def lock_area(self, code: int, index: int) -> int: """Locks a shared memory area. Args: @@ -251,7 +268,7 @@ def lock_area(self, code: int, index: int): return self._lib.Srv_LockArea(self._s7_server, code, index) @error_wrap - def start_to(self, ip: str, tcpport: int = 102): + def start_to(self, ip: str, tcpport: int = 102) -> int: """Start server on a specific interface. Args: @@ -270,7 +287,7 @@ def start_to(self, ip: str, tcpport: int = 102): return self._lib.Srv_StartTo(self._s7_server, ip.encode()) @error_wrap - def set_param(self, number: int, value: int): + def set_param(self, number: int, value: int) -> int: """Sets an internal Server object parameter. Args: @@ -284,7 +301,7 @@ def set_param(self, number: int, value: int): return self._lib.Srv_SetParam(self._s7_server, number, byref(c_int(value))) @error_wrap - def set_mask(self, kind: int, mask: int): + def set_mask(self, kind: int, mask: int) -> int: """Writes the specified filter mask. Args: @@ -298,7 +315,7 @@ def set_mask(self, kind: int, mask: int): return self._lib.Srv_SetMask(self._s7_server, kind, mask) @error_wrap - def set_cpu_status(self, status: int): + def set_cpu_status(self, status: int) -> int: """Sets the Virtual CPU status. Args: @@ -332,7 +349,7 @@ def pick_event(self) -> Optional[SrvEvent]: logger.debug("no events ready") return None - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """Reads an internal Server object parameter. Args: @@ -373,7 +390,7 @@ def clear_events(self) -> int: return self._lib.Srv_ClearEvents(self._s7_server) -def mainloop(tcpport: int = 1102, init_standard_values: bool = False): +def mainloop(tcpport: int = 1102, init_standard_values: bool = False) -> None: """Init a fake Snap7 server with some default values. Args: @@ -383,10 +400,10 @@ def mainloop(tcpport: int = 1102, init_standard_values: bool = False): server = Server() size = 100 - DBdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - PAdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - TMdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - CTdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + DBdata: "Array[_SimpleCData[int]]" = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + PAdata: "Array[_SimpleCData[int]]" = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + TMdata: "Array[_SimpleCData[int]]" = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + CTdata: "Array[_SimpleCData[int]]" = (wordlen_to_ctypes[WordLen.Byte.value] * size)() server.register_area(srvAreaDB, 1, DBdata) server.register_area(srvAreaPA, 1, PAdata) server.register_area(srvAreaTM, 1, TMdata) diff --git a/snap7/server/__main__.py b/snap7/server/__main__.py index 015bddc1..d6fb4476 100644 --- a/snap7/server/__main__.py +++ b/snap7/server/__main__.py @@ -6,6 +6,7 @@ """ import logging +from ctypes import CDLL try: import click @@ -20,18 +21,18 @@ logger = logging.getLogger("Snap7.Server") -@click.command() -@click.option("-p", "--port", default=1102, help="Port the server will listen on.") -@click.option( +@click.command() # type: ignore[misc] +@click.option("-p", "--port", default=1102, help="Port the server will listen on.") # type: ignore[misc] +@click.option( # type: ignore[misc] "--dll", hidden=True, type=click.Path(exists=True, file_okay=True, dir_okay=False, resolve_path=True), help="Path to the snap7 DLL (for emergencies if it can't be put on PATH).", ) -@click.option("-v", "--verbose", is_flag=True, help="Also print debug-output.") -@click.version_option(__version__) -@click.help_option("-h", "--help") -def main(port, dll, verbose): +@click.option("-v", "--verbose", is_flag=True, help="Also print debug-output.") # type: ignore[misc] +@click.version_option(__version__) # type: ignore[misc] +@click.help_option("-h", "--help") # type: ignore[misc] +def main(port: int, dll: CDLL, verbose: bool) -> None: """Start a S7 dummy server with some default values.""" # setup logging diff --git a/snap7/types.py b/snap7/types.py index 406e6d16..9d8d14ca 100755 --- a/snap7/types.py +++ b/snap7/types.py @@ -274,7 +274,7 @@ class S7CpuInfo(ctypes.Structure): ("ModuleName", ctypes.c_char * 25), ] - def __str__(self): + def __str__(self) -> str: return ( f"" diff --git a/snap7/util/__init__.py b/snap7/util/__init__.py index f6776aed..dde717f0 100644 --- a/snap7/util/__init__.py +++ b/snap7/util/__init__.py @@ -85,7 +85,7 @@ import re import time -from typing import Union +from typing import Any, Union from datetime import date, datetime from collections import OrderedDict @@ -147,7 +147,7 @@ def utc2local(utc: Union[date, datetime]) -> Union[datetime, date]: return utc + offset -def parse_specification(db_specification: str) -> OrderedDict: +def parse_specification(db_specification: str) -> OrderedDict[str, Any]: """Create a db specification derived from a dataview of a db in which the byte layout is specified @@ -168,7 +168,7 @@ def parse_specification(db_specification: str) -> OrderedDict: return parsed_db_specification -def print_row(data): +def print_row(data: bytearray) -> None: """print a single db row in chr and str""" index_line = "" pri_line1 = "" @@ -179,9 +179,8 @@ def print_row(data): # index if not i % 5: diff = len(pri_line1) - len(index_line) - i = str(i) index_line += diff * " " - index_line += i + index_line += str(i) # i = i + (ws - len(i)) * ' ' + ',' # byte array line diff --git a/snap7/util/db.py b/snap7/util/db.py index 102f94cf..e62e3cbb 100644 --- a/snap7/util/db.py +++ b/snap7/util/db.py @@ -1,7 +1,7 @@ import re from collections import OrderedDict -from datetime import datetime, date -from typing import Optional, Union, Dict, Callable +from datetime import datetime, date, timedelta +from typing import Any, Iterator, Optional, Tuple, Union, Dict, Callable from logging import getLogger from snap7.client import Client @@ -84,7 +84,7 @@ class DB: """ bytearray_: Optional[bytearray] = None # data from plc - specification: Optional[str] = None # layout of db rows + specification: str # layout of db rows id_field: Optional[str] = None # ID field of the rows row_size: int = 0 # bytes size of a db row layout_offset: int = 0 # at which byte in row specification should @@ -137,10 +137,10 @@ def __init__( self.specification = specification # loop over bytearray. make rowObjects # store index of id_field to row objects - self.index: OrderedDict = OrderedDict() + self.index: OrderedDict[str, DB_Row] = OrderedDict() self.make_rows() - def make_rows(self): + def make_rows(self) -> None: """Make each row for the DB.""" id_field = self.id_field row_size = self.row_size @@ -169,7 +169,7 @@ def make_rows(self): logger.error(msg) self.index[key] = row - def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, int, float, str, bool, datetime]: + def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, "DB_Row"]: """Access a row of the table through its index. Rows (values) are of type :class:`DB_Row`. @@ -179,7 +179,7 @@ def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, i """ return self.index.get(key, default) - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: """Iterate over the items contained in the table, in the physical order they are contained in memory. @@ -190,7 +190,7 @@ def __iter__(self): """ yield from self.index.items() - def __len__(self): + def __len__(self) -> int: """Return the number of rows contained in the DB. Notes: @@ -198,23 +198,23 @@ def __len__(self): """ return len(self.index) - def __contains__(self, key): + def __contains__(self, key: str) -> bool: """Return whether the given key is the index of a row in the DB.""" return key in self.index - def keys(self): + def keys(self) -> Iterator[str]: """Return a *view object* of the keys that are used as indices for the rows in the DB. """ yield from self.index.keys() - def items(self): + def items(self) -> Iterator[Tuple[str, Any]]: """Return a *view object* of the items (``(index, row)`` pairs) that are used as indices for the rows in the DB. """ yield from self.index.items() - def export(self): + def export(self) -> OrderedDict[str, Any]: """Export the object to an :class:`OrderedDict`, where each item in the dictionary has an index as the key, and the value of the DB row associated with that index as a value, represented itself as a :class:`dict` (as returned by :func:`DB_Row.export`). @@ -230,7 +230,7 @@ def export(self): ret[k] = v.export() return ret - def set_data(self, bytearray_: bytearray): + def set_data(self, bytearray_: bytearray) -> None: """Set the new buffer data from the PLC to the current instance. Args: @@ -243,7 +243,7 @@ def set_data(self, bytearray_: bytearray): raise TypeError(f"Value bytearray_: {bytearray_} is not from type bytearray") self._bytearray = bytearray_ - def read(self, client: Client): + def read(self, client: Client) -> None: """Reads all the rows from the PLC to the :obj:`bytearray` of this instance. Args: @@ -269,7 +269,7 @@ def read(self, client: Client): self.index.clear() self.make_rows() - def write(self, client): + def write(self, client: Client) -> None: """Writes all the rows from the :obj:`bytearray` of this instance to the PLC Notes: @@ -311,17 +311,17 @@ class DB_Row: """ bytearray_: bytearray # data of reference to parent DB - _specification: OrderedDict = OrderedDict() # row specification + _specification: OrderedDict[str, Any] = OrderedDict() # row specification def __init__( self, - bytearray_: bytearray, + bytearray_: Union[bytearray, "DB"], _specification: str, - row_size: Optional[int] = 0, + row_size: int = 0, db_offset: int = 0, layout_offset: int = 0, row_offset: Optional[int] = 0, - area: Optional[Areas] = Areas.DB, + area: Areas = Areas.DB, ): """Creates a new instance of the `DB_Row` class. @@ -368,21 +368,21 @@ def export(self) -> Dict[str, Union[str, int, float, bool, datetime]]: """ return {key: self[key] for key in self._specification} - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: """ Get a specific db field """ index, _type = self._specification[key] return self.get_value(index, _type) - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any) -> None: index, _type = self._specification[key] self.set_value(index, _type, value) - def __repr__(self): + def __repr__(self) -> str: string = "" for var_name, (index, _type) in self._specification.items(): - string = f"{string}\n{var_name:<20} {self.get_value(index, _type):<10}" + string = f"{string}\n{var_name:<20} {self.get_value(index, _type)!r:<10}" return string def unchanged(self, bytearray_: bytearray) -> bool: @@ -410,7 +410,9 @@ def get_offset(self, byte_index: Union[str, int]) -> int: # the variable address with decimal point(like 0.0 or 4.0) return int(float(byte_index)) - self.layout_offset + self.db_offset - def get_value(self, byte_index: Union[str, int], type_: str) -> Union[ValueError, int, float, str, datetime]: + def get_value( + self, byte_index: Union[str, int], type_: str + ) -> Union[int, float, str, datetime, bytearray, bytes, date, timedelta]: """Gets the value for a specific type. Args: @@ -453,7 +455,9 @@ def get_value(self, byte_index: Union[str, int], type_: str) -> Union[ValueError raise ValueError("Max size could not be determinate. re.search() returned None") return get_wstring(bytearray_, byte_index) else: - type_to_func: Dict[str, Callable] = { + type_to_func: Dict[ + str, Callable[[bytearray, int], Union[int, float, str, datetime, bytearray, bytes, date, timedelta]] + ] = { "REAL": get_real, "DWORD": get_dword, "UDINT": get_udint, @@ -509,7 +513,8 @@ def set_value(self, byte_index: Union[str, int], type_: str, value: Union[bool, raise ValueError("Max size could not be determinate. re.search() returned None") max_size_grouped = max_size.group(0) max_size_int = int(max_size_grouped) - return set_fstring(bytearray_, byte_index, value, max_size_int) + set_fstring(bytearray_, byte_index, value, max_size_int) + return None if type_.startswith("STRING") and isinstance(value, str): max_size = re.search(r"\d+", type_) @@ -517,7 +522,8 @@ def set_value(self, byte_index: Union[str, int], type_: str, value: Union[bool, raise ValueError("Max size could not be determinate. re.search() returned None") max_size_grouped = max_size.group(0) max_size_int = int(max_size_grouped) - return set_string(bytearray_, byte_index, value, max_size_int) + set_string(bytearray_, byte_index, value, max_size_int) + return None if type_ == "REAL": return set_real(bytearray_, byte_index, value) diff --git a/snap7/util/getters.py b/snap7/util/getters.py index f49cec17..494182b3 100644 --- a/snap7/util/getters.py +++ b/snap7/util/getters.py @@ -1,6 +1,6 @@ import struct from datetime import timedelta, datetime, date -from typing import Union, List +from typing import NoReturn from logging import getLogger logger = getLogger(__name__) @@ -44,7 +44,7 @@ def get_byte(bytearray_: bytearray, byte_index: int) -> bytes: data = bytearray_[byte_index : byte_index + 1] data[0] = data[0] & 0xFF packed = struct.pack("B", *data) - value = struct.unpack("B", packed)[0] + value: bytes = struct.unpack("B", packed)[0] return value @@ -70,7 +70,7 @@ def get_word(bytearray_: bytearray, byte_index: int) -> bytearray: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">H", packed)[0] + value: bytearray = struct.unpack(">H", packed)[0] return value @@ -96,7 +96,7 @@ def get_int(bytearray_: bytearray, byte_index: int) -> int: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">h", packed)[0] + value: int = struct.unpack(">h", packed)[0] return value @@ -124,7 +124,7 @@ def get_uint(bytearray_: bytearray, byte_index: int) -> int: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">H", packed)[0] + value: int = struct.unpack(">H", packed)[0] return value @@ -148,7 +148,7 @@ def get_real(bytearray_: bytearray, byte_index: int) -> float: 123.32099914550781 """ x = bytearray_[byte_index : byte_index + 4] - real = struct.unpack(">f", struct.pack("4B", *x))[0] + real: float = struct.unpack(">f", struct.pack("4B", *x))[0] return real @@ -238,7 +238,7 @@ def get_dword(bytearray_: bytearray, byte_index: int) -> int: 4294967295 """ data = bytearray_[byte_index : byte_index + 4] - dword = struct.unpack(">I", struct.pack("4B", *data))[0] + dword: int = struct.unpack(">I", struct.pack("4B", *data))[0] return dword @@ -265,7 +265,7 @@ def get_dint(bytearray_: bytearray, byte_index: int) -> int: 2147483647 """ data = bytearray_[byte_index : byte_index + 4] - dint = struct.unpack(">i", struct.pack("4B", *data))[0] + dint: int = struct.unpack(">i", struct.pack("4B", *data))[0] return dint @@ -292,7 +292,7 @@ def get_udint(bytearray_: bytearray, byte_index: int) -> int: 4294967295 """ data = bytearray_[byte_index : byte_index + 4] - dint = struct.unpack(">I", struct.pack("4B", *data))[0] + dint: int = struct.unpack(">I", struct.pack("4B", *data))[0] return dint @@ -437,7 +437,7 @@ def get_usint(bytearray_: bytearray, byte_index: int) -> int: """ data = bytearray_[byte_index] & 0xFF packed = struct.pack("B", data) - value = struct.unpack(">B", packed)[0] + value: int = struct.unpack(">B", packed)[0] return value @@ -463,11 +463,11 @@ def get_sint(bytearray_: bytearray, byte_index: int) -> int: """ data = bytearray_[byte_index] packed = struct.pack("B", data) - value = struct.unpack(">b", packed)[0] + value: int = struct.unpack(">b", packed)[0] return value -def get_lint(bytearray_: bytearray, byte_index: int): +def get_lint(bytearray_: bytearray, byte_index: int) -> NoReturn: """Get the long int THIS VALUE IS NEITHER TESTED NOR VERIFIED BY A REAL PLC AT THE MOMENT @@ -494,7 +494,7 @@ def get_lint(bytearray_: bytearray, byte_index: int): # raw_lint = bytearray_[byte_index:byte_index + 8] # lint = struct.unpack('>q', struct.pack('8B', *raw_lint))[0] # return lint - return NotImplementedError + raise NotImplementedError def get_lreal(bytearray_: bytearray, byte_index: int) -> float: @@ -519,7 +519,7 @@ def get_lreal(bytearray_: bytearray, byte_index: int) -> float: >>> snap7.util.get_lreal(data, 0) 12345.12345 """ - return struct.unpack_from(">d", bytearray_, offset=byte_index)[0] + return float(struct.unpack_from(">d", bytearray_, offset=byte_index)[0]) def get_lword(bytearray_: bytearray, byte_index: int) -> bytearray: @@ -571,7 +571,7 @@ def get_ulint(bytearray_: bytearray, byte_index: int) -> int: 12345 """ raw_ulint = bytearray_[byte_index : byte_index + 8] - lint = struct.unpack(">Q", struct.pack("8B", *raw_ulint))[0] + lint: int = struct.unpack(">Q", struct.pack("8B", *raw_ulint))[0] return lint @@ -647,7 +647,7 @@ def get_char(bytearray_: bytearray, byte_index: int) -> str: return char -def get_wchar(bytearray_: bytearray, byte_index: int) -> Union[ValueError, str]: +def get_wchar(bytearray_: bytearray, byte_index: int) -> str: """Get wchar value from bytearray. Notes: @@ -715,5 +715,5 @@ def get_wstring(bytearray_: bytearray, byte_index: int) -> str: return bytearray_[wstring_start : wstring_start + wstr_symbols_amount].decode("utf-16-be") -def get_array(bytearray_: bytearray, byte_index: int) -> List: +def get_array(bytearray_: bytearray, byte_index: int) -> NoReturn: raise NotImplementedError diff --git a/snap7/util/setters.py b/snap7/util/setters.py index 83272ea5..19c633d8 100644 --- a/snap7/util/setters.py +++ b/snap7/util/setters.py @@ -62,7 +62,7 @@ def set_byte(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: return bytearray_ -def set_word(bytearray_: bytearray, byte_index: int, _int: int): +def set_word(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to word Notes: @@ -82,7 +82,7 @@ def set_word(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_int(bytearray_: bytearray, byte_index: int, _int: int): +def set_int(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to int Notes: @@ -108,7 +108,7 @@ def set_int(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_uint(bytearray_: bytearray, byte_index: int, _int: int): +def set_uint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to unsigned int Notes: @@ -134,7 +134,7 @@ def set_uint(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_real(bytearray_: bytearray, byte_index: int, real) -> bytearray: +def set_real(bytearray_: bytearray, byte_index: int, real: Union[bool, str, float, int]) -> bytearray: """Set Real value Notes: @@ -154,15 +154,14 @@ def set_real(bytearray_: bytearray, byte_index: int, real) -> bytearray: >>> snap7.util.set_real(data, 0, 123.321) bytearray(b'B\\xf6\\xa4Z') """ - real = float(real) - real = struct.pack(">f", real) - _bytes = struct.unpack("4B", real) + real_packed = struct.pack(">f", float(real)) + _bytes = struct.unpack("4B", real_packed) for i, b in enumerate(_bytes): bytearray_[byte_index + i] = b return bytearray_ -def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: int): +def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: int) -> None: """Set space-padded fixed-length string value Args: @@ -200,7 +199,7 @@ def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: bytearray_[byte_index + r] = ord(" ") -def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int = 254): +def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int = 254) -> None: """Set string value Args: @@ -252,7 +251,7 @@ def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int bytearray_[byte_index + 2 + r] = ord(" ") -def set_dword(bytearray_: bytearray, byte_index: int, dword: int): +def set_dword(bytearray_: bytearray, byte_index: int, dword: int) -> None: """Set a DWORD to the buffer. Notes: @@ -276,7 +275,7 @@ def set_dword(bytearray_: bytearray, byte_index: int, dword: int): bytearray_[byte_index + i] = b -def set_dint(bytearray_: bytearray, byte_index: int, dint: int): +def set_dint(bytearray_: bytearray, byte_index: int, dint: int) -> None: """Set value in bytearray to dint Notes: @@ -301,7 +300,7 @@ def set_dint(bytearray_: bytearray, byte_index: int, dint: int): bytearray_[byte_index + i] = b -def set_udint(bytearray_: bytearray, byte_index: int, udint: int): +def set_udint(bytearray_: bytearray, byte_index: int, udint: int) -> None: """Set value in bytearray to unsigned dint Notes: @@ -400,7 +399,7 @@ def set_usint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: return bytearray_ -def set_sint(bytearray_: bytearray, byte_index: int, _int) -> bytearray: +def set_sint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set small int to the buffer. Notes: diff --git a/tests/test_client.py b/tests/test_client.py index f57c0abc..114d9b13 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -595,8 +595,7 @@ def test_wait_as_completion_pass(self, timeout=1000): self.client.write_area(area, dbnumber, start, data) # start as_request and test wordlen, usrdata = self.client._prepare_as_read_area(area, size) - pusrdata = ctypes.byref(usrdata) - self.client.as_read_area(area, dbnumber, start, size, wordlen, pusrdata) + self.client.as_read_area(area, dbnumber, start, size, wordlen, usrdata) self.client.wait_as_completion(timeout) self.assertEqual(bytearray(usrdata), data) @@ -609,11 +608,10 @@ def test_wait_as_completion_timeouted(self, timeout=0, tries=500): start = 1 data = bytearray(size) wordlen, data = self.client._prepare_as_read_area(area, size) - pdata = ctypes.byref(data) self.client.write_area(area, dbnumber, start, bytearray(data)) # start as_request and wait for zero seconds to try trigger timeout for i in range(tries): - self.client.as_read_area(area, dbnumber, start, size, wordlen, pdata) + self.client.as_read_area(area, dbnumber, start, size, wordlen, data) res = None try: res = self.client.wait_as_completion(timeout) @@ -646,7 +644,7 @@ def test_check_as_completion(self, timeout=5): # start as_request and test wordlen, cdata = self.client._prepare_as_read_area(area, size) - pcdata = ctypes.byref(cdata) + pcdata = cdata self.client.as_read_area(area, db, start, size, wordlen, pcdata) for _ in range(10): self.client.check_as_completion(ctypes.byref(check_status)) @@ -670,8 +668,7 @@ def test_as_read_area(self): data = bytearray(b"\x11") self.client.write_area(area, dbnumber, start, data) wordlen, usrdata = self.client._prepare_as_read_area(area, amount) - pusrdata = ctypes.byref(usrdata) - self.client.as_read_area(area, dbnumber, start, amount, wordlen, pusrdata) + self.client.as_read_area(area, dbnumber, start, amount, wordlen, usrdata) self.client.wait_as_completion(1000) self.assertEqual(bytearray(usrdata), data) @@ -681,8 +678,7 @@ def test_as_read_area(self): data = bytearray(b"\x12\x34") self.client.write_area(area, dbnumber, start, data) wordlen, usrdata = self.client._prepare_as_read_area(area, amount) - pusrdata = ctypes.byref(usrdata) - self.client.as_read_area(area, dbnumber, start, amount, wordlen, pusrdata) + self.client.as_read_area(area, dbnumber, start, amount, wordlen, usrdata) self.client.wait_as_completion(1000) self.assertEqual(bytearray(usrdata), data) @@ -692,7 +688,7 @@ def test_as_read_area(self): data = bytearray(b"\x13\x35") self.client.write_area(area, dbnumber, start, data) wordlen, usrdata = self.client._prepare_as_read_area(area, amount) - pusrdata = ctypes.byref(usrdata) + pusrdata = usrdata self.client.as_read_area(area, dbnumber, start, amount, wordlen, pusrdata) self.client.wait_as_completion(1000) self.assertEqual(bytearray(usrdata), data)