From 60881b16724f5876c50536ceabed0b90cbb8372d Mon Sep 17 00:00:00 2001 From: nikteliy Date: Sun, 12 May 2024 19:26:32 +0600 Subject: [PATCH] Improve typing --- .pre-commit-config.yaml | 2 + .venv3.11/pyvenv.cfg | 5 ++ snap7/client/__init__.py | 89 ++++++++++++---------- snap7/client/client.pyi | 103 +++++++++++++++++++++++++ snap7/common.py | 22 ++++-- snap7/logo.py | 33 ++++---- snap7/partner.py | 35 +++++---- snap7/protocol.py | 140 ++++++++++++++++++++++++++++++++++ snap7/protocol.pyi | 160 +++++++++++++++++++++++++++++++++++++++ snap7/server/__init__.py | 94 ++++++++++++++--------- snap7/server/__main__.py | 15 ++-- snap7/types.py | 2 +- snap7/util/__init__.py | 9 +-- snap7/util/getters.py | 32 ++++---- snap7/util/setters.py | 25 +++--- 15 files changed, 607 insertions(+), 159 deletions(-) create mode 100644 .venv3.11/pyvenv.cfg create mode 100644 snap7/client/client.pyi create mode 100644 snap7/protocol.py create mode 100644 snap7/protocol.pyi diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b6b4a8c5..fa04d5b3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,9 +22,11 @@ repos: - id: mypy additional_dependencies: [types-setuptools] files: ^snap7 + args: [--strict, --exclude=snap7/util/db.py] - repo: https://github.com/astral-sh/ruff-pre-commit rev: 'v0.4.2' hooks: - id: ruff - id: ruff-format +exclude: "snap7/protocol.py" diff --git a/.venv3.11/pyvenv.cfg b/.venv3.11/pyvenv.cfg new file mode 100644 index 00000000..87098a2e --- /dev/null +++ b/.venv3.11/pyvenv.cfg @@ -0,0 +1,5 @@ +home = /usr/bin +include-system-site-packages = false +version = 3.11.9 +executable = /usr/bin/python3.11 +command = /usr/bin/python3.11 -m venv /home/user/Documents/python-snap7/.venv3.11 diff --git a/snap7/client/__init__.py b/snap7/client/__init__.py index e21ca87f..6a48e832 100644 --- a/snap7/client/__init__.py +++ b/snap7/client/__init__.py @@ -5,11 +5,13 @@ import re import logging from ctypes import CFUNCTYPE, byref, create_string_buffer, sizeof -from ctypes import Array, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p +from ctypes import Array, _SimpleCData, c_byte, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p from datetime import datetime -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union, ParamSpec, TypeVar, Type +from types import TracebackType from ..common import check_error, ipv4, load_library +from ..protocol import Snap7CliProtocol from ..types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem from ..types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen from ..types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types @@ -17,12 +19,15 @@ logger = logging.getLogger(__name__) +Param = ParamSpec("Param") +RetType = TypeVar("RetType") -def error_wrap(func): + +def error_wrap(func: Callable[Param, RetType]) -> Callable[Param, None]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) + def f(*args: Param.args, **kwargs: Param.kwargs) -> None: + code = func(*args, **kwargs) check_error(code, context="client") return f @@ -47,10 +52,10 @@ class Client: >>> client.db_write(1, 0, data) """ - _lib: Any # since this is dynamically loaded from a DLL we don't have the type signature. + _lib: Snap7CliProtocol _read_callback = None _callback = None - _s7_client: Optional[S7Object] = None + _s7_client: S7Object def __init__(self, lib_location: Optional[str] = None): """Creates a new `Client` instance. @@ -66,23 +71,25 @@ def __init__(self, lib_location: Optional[str] = None): """ - self._lib = load_library(lib_location) + self._lib: Snap7CliProtocol = load_library(lib_location) self.create() - def __enter__(self): + def __enter__(self) -> "Client": return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: self.destroy() - def __del__(self): + def __del__(self) -> None: self.destroy() - def create(self): + def create(self) -> None: """Creates a SNAP7 client.""" logger.info("creating snap7 client") - self._lib.Cli_Create.restype = S7Object - self._s7_client = S7Object(self._lib.Cli_Create()) + self._lib.Cli_Create.restype = S7Object # type: ignore[attr-defined] + self._s7_client = self._lib.Cli_Create() def destroy(self) -> Optional[int]: """Destroys the Client object. @@ -97,7 +104,7 @@ def destroy(self) -> Optional[int]: logger.info("destroying snap7 client") if self._lib and self._s7_client is not None: return self._lib.Cli_Destroy(byref(self._s7_client)) - self._s7_client = None + self._s7_client = None # type: ignore[assignment] return None def plc_stop(self) -> int: @@ -199,7 +206,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int """ logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}") - self.set_param(RemotePort, tcpport) + self.set_param(number=RemotePort, value=tcpport) return self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot)) def db_read(self, db_number: int, start: int, size: int) -> bytearray: @@ -441,7 +448,7 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) -> cdata = (type_ * len(data)).from_buffer_copy(data) return self._lib.Cli_WriteArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, byref(cdata)) - def read_multi_vars(self, items) -> Tuple[int, S7DataItem]: + def read_multi_vars(self, items: Array[S7DataItem]) -> Tuple[int, Array[S7DataItem]]: """Reads different kind of variables from a PLC simultaneously. Args: @@ -472,7 +479,7 @@ def list_blocks(self) -> BlocksList: logger.debug(f"blocks: {blocksList}") return blocksList - def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]: + def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array[c_uint16]]: """This function returns the AG list of a specified block type. Args: @@ -592,11 +599,11 @@ def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int) """ if not re.match(ipv4, address): raise ValueError(f"{address} is invalid ipv4") - result = self._lib.Cli_SetConnectionParams(self._s7_client, address, c_uint16(local_tsap), c_uint16(remote_tsap)) + result = self._lib.Cli_SetConnectionParams(self._s7_client, address.encode(), c_uint16(local_tsap), c_uint16(remote_tsap)) if result != 0: raise ValueError("The parameter was invalid") - def set_connection_type(self, connection_type: int): + def set_connection_type(self, connection_type: int) -> None: """Sets the connection resource type, i.e the way in which the Clients connects to a PLC. Args: @@ -659,7 +666,7 @@ def ab_write(self, start: int, data: bytearray) -> int: logger.debug(f"ab write: start: {start}: size: {size}: ") return self._lib.Cli_ABWrite(self._s7_client, start, size, byref(cdata)) - def as_ab_read(self, start: int, size: int, data) -> int: + def as_ab_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of IPU area from a PLC asynchronously. Args: @@ -720,7 +727,7 @@ def as_copy_ram_to_rom(self, timeout: int = 1) -> int: check_error(result, context="client") return result - def as_ct_read(self, start: int, amount: int, data) -> int: + def as_ct_read(self, start: int, amount: int, data: "_SimpleCData[Any]") -> int: """Reads counters from a PLC asynchronously. Args: @@ -752,7 +759,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int: check_error(result, context="client") return result - def as_db_fill(self, db_number: int, filler) -> int: + def as_db_fill(self, db_number: int, filler: int) -> int: """Fills a DB in AG with a given byte. Args: @@ -766,7 +773,7 @@ def as_db_fill(self, db_number: int, filler) -> int: check_error(result, context="client") return result - def as_db_get(self, db_number: int, _buffer, size) -> bytearray: + def as_db_get(self, db_number: int, _buffer: "_SimpleCData[Any]", size: "_SimpleCData[Any]") -> int: """Uploads a DB from AG using DBRead. Note: @@ -784,7 +791,7 @@ def as_db_get(self, db_number: int, _buffer, size) -> bytearray: check_error(result, context="client") return result - def as_db_read(self, db_number: int, start: int, size: int, data) -> Array: + def as_db_read(self, db_number: int, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of a DB from a PLC. Args: @@ -807,7 +814,7 @@ def as_db_read(self, db_number: int, start: int, size: int, data) -> Array: check_error(result, context="client") return result - def as_db_write(self, db_number: int, start: int, size: int, data) -> int: + def as_db_write(self, db_number: int, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Writes a part of a DB into a PLC. Args: @@ -943,7 +950,7 @@ def set_plc_datetime(self, dt: datetime) -> int: return self._lib.Cli_SetPlcDateTime(self._s7_client, byref(buffer)) - def check_as_completion(self, p_value) -> int: + def check_as_completion(self, p_value: c_int) -> int: """Method to check Status of an async request. Result contains if the check was successful, not the data value itself Args: @@ -1000,7 +1007,7 @@ def wait_as_completion(self, timeout: int) -> int: check_error(result, context="client") return result - def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]: + def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array[_SimpleCData[int]]]: if area not in Areas: raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: @@ -1013,7 +1020,9 @@ def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array] usrdata = (type_ * size)() return wordlen, usrdata - def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int: + def as_read_area( + self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata: Array[_SimpleCData[int]] + ) -> int: """Reads a data area from a PLC asynchronously. With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters. @@ -1032,11 +1041,11 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle f"reading area: {area.name} dbnumber: {dbnumber} start: {start} amount: {size} " f"wordlen: {wordlen.name}={wordlen.value}" ) - result = self._lib.Cli_AsReadArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, pusrdata) + result = self._lib.Cli_AsReadArea(self._s7_client, area.value, dbnumber, start, size, wordlen.value, byref(pusrdata)) check_error(result, context="client") return result - def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array]: + def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array[Any]]: if area not in Areas: raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: @@ -1049,7 +1058,7 @@ def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, cdata = (type_ * len(data)).from_buffer_copy(data) return wordlen, cdata - def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata) -> int: + def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordlen: WordLen, pusrdata: bytearray) -> int: """Writes a data area into a PLC asynchronously. Args: @@ -1072,7 +1081,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl check_error(res, context="client") return res - def as_eb_read(self, start: int, size: int, data) -> int: + def as_eb_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of IPI area from a PLC asynchronously. Args: @@ -1124,7 +1133,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int: check_error(result, context="client") return result - def as_list_blocks_of_type(self, blocktype: str, data, count) -> int: + def as_list_blocks_of_type(self, blocktype: str, data: "_SimpleCData[Any]", count: "_SimpleCData[Any]") -> int: """Returns the AG blocks list of a given type. Args: @@ -1145,7 +1154,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int: check_error(result, context="client") return result - def as_mb_read(self, start: int, size: int, data) -> int: + def as_mb_read(self, start: int, size: int, data: "_SimpleCData[Any]") -> int: """Reads a part of Merkers area from a PLC. Args: @@ -1177,7 +1186,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int: check_error(result, context="client") return result - def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int: + def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size: "_SimpleCData[Any]") -> int: """Reads a partial list of given ID and Index. Args: @@ -1193,7 +1202,7 @@ def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int: check_error(result, context="client") return result - def as_read_szl_list(self, szl_list, items_count) -> int: + def as_read_szl_list(self, szl_list: "_SimpleCData[Any]", items_count: "_SimpleCData[Any]") -> int: """Reads the list of partial lists available in the CPU. Args: @@ -1207,7 +1216,7 @@ def as_read_szl_list(self, szl_list, items_count) -> int: check_error(result, context="client") return result - def as_tm_read(self, start: int, amount: int, data) -> bytearray: + def as_tm_read(self, start: int, amount: int, data: "_SimpleCData[Any]") -> int: """Reads timers from a PLC. Args: @@ -1239,7 +1248,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int: check_error(result) return result - def as_upload(self, block_num: int, _buffer, size) -> int: + def as_upload(self, block_num: int, _buffer: "_SimpleCData[Any]", size: "_SimpleCData[Any]") -> int: """Uploads a block from AG. Note: @@ -1363,7 +1372,7 @@ def error_text(self, error: int) -> str: text_length = c_int(256) error_code = c_int32(error) text = create_string_buffer(buffer_size) - response = self._lib.Cli_ErrorText(error_code, byref(text), text_length) + response = self._lib.Cli_ErrorText(error_code, text, text_length) check_error(response) result = bytearray(text)[: text_length.value].decode().strip("\x00") return result diff --git a/snap7/client/client.pyi b/snap7/client/client.pyi new file mode 100644 index 00000000..78517283 --- /dev/null +++ b/snap7/client/client.pyi @@ -0,0 +1,103 @@ +from ctypes import _CArgObject, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p +from _ctypes import CFuncPtr + +class Snap7CliProtocol: + def Cli_Create(self) -> c_void_p: ... + def Cli_Destroy(self, pointer: _CArgObject) -> int: ... + def Cli_PlcStop(self, pointer: c_void_p) -> int: ... + def Cli_PlcColdStart(self, pointer: c_void_p) -> int: ... + def Cli_PlcHotStart(self, pointer: c_void_p) -> int: ... + def Cli_GetPlcStatus(self, pointer: c_void_p, state: _CArgObject) -> int: ... + def Cli_GetCpuInfo(self, pointer: c_void_p, info: _CArgObject) -> int: ... + def Cli_Disconnect(self, pointer: c_void_p) -> int: ... + def Cli_Connect(self, pointer: c_void_p) -> int: ... + def Cli_ConnectTo(self, pointer: c_void_p, address: c_char_p, rack: c_int, slot: c_int) -> int: ... + def Cli_DBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_DBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_Delete(self, pointer: c_void_p, blocktype: c_int, block_num: int) -> int: ... + def Cli_FullUpload( + self, pointer: c_void_p, blocktype: c_int, block_num: int, data: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_Upload(self, pointer: c_void_p, block_type: c_int, block_num: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_Download(self, pointer: c_void_p, block_num: int, data: _CArgObject, size: int) -> int: ... + def Cli_DBGet(self, pointer: c_void_p, db_number: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_WriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_ReadMultiVars(self, pointer: c_void_p, items: _CArgObject, items_count: c_int32) -> int: ... + def Cli_ListBlocks(self, pointer: c_void_p, blocksList: _CArgObject) -> int: ... + def Cli_ListBlocksOfType(self, pointer: c_void_p, blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_GetAgBlockInfo(self, pointer: c_void_p, blocktype: c_int, db_number: int, data: _CArgObject) -> int: ... + def Cli_SetSessionPassword(self, pointer: c_void_p, password: c_char_p) -> int: ... + def Cli_ClearSessionPassword(self, pointer: c_void_p) -> int: ... + def Cli_SetConnectionParams(self, pointer: c_void_p, address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16) -> int: ... + def Cli_SetConnectionType(self, pointer: c_void_p, connection_type: c_uint16) -> int: ... + def Cli_GetConnected(self, pointer: c_void_p, connected: _CArgObject) -> int: ... + def Cli_ABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_ABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsCompress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCopyRamToRom(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsCTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_AsDBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_AsDBGet(self, pointer: c_void_p, db_number: int, _buffer: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsDBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDownload(self, pointer: c_void_p, block_num: int, cdata: _CArgObject, size: int) -> int: ... + def Cli_Compress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Cli_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Cli_GetPduLength(self, pointer: c_void_p, requested_: _CArgObject, negotiated_: _CArgObject) -> int: ... + def Cli_GetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetAsCallback(self, pointer: c_void_p, pfn_clicompletion: CFuncPtr, p_usr: c_void_p) -> int: ... + def Cli_WaitAsCompletion(self, pointer: c_void_p, timeout: c_ulong) -> int: ... + def Cli_AsReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsWriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsEBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsEBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsFullUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_AsListBlocksOfType(self, pointer: c_void_p, _blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_AsMBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsMBWrite(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_AsTMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsTMWrite(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_CopyRamToRom(self, pointer: c_void_p, timeout: int) -> int: ... + def Cli_CTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_CTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_DBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_EBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_EBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ErrorText(self, error_code: c_int32, text: _CArgObject, text_length: c_int) -> int: ... + def Cli_GetCpInfo(self, pointer: c_void_p, cp_info: _CArgObject) -> int: ... + def Cli_GetExecTime(self, pointer: c_void_p, time: _CArgObject) -> int: ... + def Cli_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Cli_GetOrderCode(self, pointer: c_void_p, order_code: _CArgObject) -> int: ... + def Cli_GetPgBlockInfo(self, pointer: c_void_p, buffer: _CArgObject, block_info: _CArgObject, size: c_int) -> int: ... + def Cli_GetProtection(self, pointer: c_void_p, s7_protection: _CArgObject) -> int: ... + def Cli_IsoExchangeBuffer(self, pointer: c_void_p, cdata: _CArgObject, size: _CArgObject) -> int: ... + def Cli_MBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_MBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_SetPlcSystemDateTime(self, pointer: c_void_p) -> int: ... + def Cli_TMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_TMWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_WriteMultiVars(self, pointer: c_void_p, cdata: _CArgObject, items_count: c_int32) -> int: ... + def Cli_CheckAsCompletion(self, pointer: c_void_p, p_value: c_int) -> int: ... diff --git a/snap7/common.py b/snap7/common.py index cb979693..48493d50 100644 --- a/snap7/common.py +++ b/snap7/common.py @@ -3,10 +3,12 @@ import pathlib import platform from pathlib import Path -from ctypes import c_char -from typing import Any, Literal, Optional +from ctypes import Array, c_char, c_int, c_int32 +from typing import Callable, Literal, NoReturn, Optional, cast from ctypes.util import find_library from functools import cache +from .protocol import Snap7CliProtocol + if platform.system() == "Windows": from ctypes import windll as cdll # type: ignore @@ -19,7 +21,7 @@ ipv4 = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$" -def _raise_error(): +def _raise_error() -> NoReturn: error = f"""can't find snap7 shared library. This probably means you are installing python-snap7 from source. When no binary wheel is found for you architecture, pip @@ -72,7 +74,7 @@ def _find_in_package() -> Optional[str]: @cache -def load_library(lib_location: Optional[str] = None) -> Any: +def load_library(lib_location: Optional[str] = None) -> Snap7CliProtocol: """Loads the `snap7.dll` library. Returns: cdll: a ctypes cdll object with the snap7 shared library loaded. @@ -83,7 +85,7 @@ def load_library(lib_location: Optional[str] = None) -> Any: if not lib_location: _raise_error() - return cdll.LoadLibrary(lib_location) + return cast(Snap7CliProtocol, cdll.LoadLibrary(lib_location)) Context = Literal["client", "server", "partner"] @@ -107,7 +109,7 @@ def check_error(code: int, context: Context = "client") -> None: raise RuntimeError(error) -def error_text(error, context: Context = "client") -> bytes: +def error_text(error: int, context: Context = "client") -> bytes: """Returns a textual explanation of a given error number Args: @@ -127,6 +129,10 @@ def error_text(error, context: Context = "client") -> bytes: text_type = c_char * len_ text = text_type() library = load_library() - map_ = {"client": library.Cli_ErrorText, "server": library.Srv_ErrorText, "partner": library.Par_ErrorText} - map_[context](error, text, len_) + error_text_func: Callable[[c_int32, Array[c_char], c_int], int] = { + "client": library.Cli_ErrorText, + "server": library.Srv_ErrorText, + "partner": library.Par_ErrorText, + }[context] + error_text_func(c_int32(error), text, c_int(len_)) return text.value diff --git a/snap7/logo.py b/snap7/logo.py index ff348cf4..e83ca0e9 100644 --- a/snap7/logo.py +++ b/snap7/logo.py @@ -5,7 +5,7 @@ import re import struct import logging -from ctypes import byref, c_int, c_int32, c_uint16, c_void_p +from ctypes import byref, c_int, c_int32, c_uint16 from .types import WordLen, S7Object, param_types from .types import RemotePort, Areas, wordlen_to_ctypes @@ -27,20 +27,20 @@ class Logo: For more information see examples for Siemens Logo 7 and 8 """ - def __init__(self): + def __init__(self) -> None: """Creates a new instance of :obj:`Logo`""" - self.pointer = None + self.pointer: S7Object self.library = load_library() self.create() - def __del__(self): + def __del__(self) -> None: self.destroy() - def create(self): + def create(self) -> None: """Create a SNAP7 client.""" logger.info("creating snap7 client") - self.library.Cli_Create.restype = c_void_p - self.pointer = S7Object(self.library.Cli_Create()) + self.library.Cli_Create.restype = S7Object # type: ignore[attr-defined] + self.pointer = self.library.Cli_Create() def destroy(self) -> int: """Destroy a client. @@ -87,7 +87,7 @@ def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int check_error(result, context="client") return result - def read(self, vm_address: str): + def read(self, vm_address: str) -> int: """Reads from VM addresses of Siemens Logo. Examples: read("V40") / read("VW64") / read("V10.2") Args: @@ -139,13 +139,14 @@ def read(self, vm_address: str): check_error(result, context="client") # transform result to int value if wordlen == WordLen.Bit: - return data[0] + result = int(data[0]) if wordlen == WordLen.Byte: - return struct.unpack_from(">B", data)[0] + result = struct.unpack_from(">B", data)[0] if wordlen == WordLen.Word: - return struct.unpack_from(">h", data)[0] + result = struct.unpack_from(">h", data)[0] if wordlen == WordLen.DWord: - return struct.unpack_from(">l", data)[0] + result = struct.unpack_from(">l", data)[0] + return result def write(self, vm_address: str, value: int) -> int: """Writes to VM addresses of Siemens Logo. @@ -251,7 +252,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int: check_error(result, context="client") return result - def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int): + def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int) -> None: """Sets internally (IP, LocalTSAP, RemoteTSAP) Coordinates. Notes: @@ -274,7 +275,7 @@ def set_connection_params(self, ip_address: str, tsap_snap7: int, tsap_logo: int if result != 0: raise ValueError("The parameter was invalid") - def set_connection_type(self, connection_type: int): + def set_connection_type(self, connection_type: int) -> None: """Sets the connection resource type, i.e the way in which the Clients connects to a PLC. @@ -303,7 +304,7 @@ def get_connected(self) -> bool: check_error(result, context="client") return bool(connected) - def set_param(self, number: int, value): + def set_param(self, number: int, value: int) -> int: """Sets an internal Server object parameter. Args: @@ -319,7 +320,7 @@ def set_param(self, number: int, value): check_error(result, context="client") return result - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """Reads an internal Logo object parameter. Args: diff --git a/snap7/partner.py b/snap7/partner.py index df05481b..c867957e 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -11,20 +11,24 @@ import re import logging from ctypes import byref, c_int, c_int32, c_uint32, c_void_p -from typing import Tuple, Optional +from typing import Callable, Optional, Tuple, ParamSpec, TypeVar from .common import ipv4, check_error, load_library +from .protocol import Snap7CliProtocol from .types import S7Object, param_types, word logger = logging.getLogger(__name__) +Param = ParamSpec("Param") +RetType = TypeVar("RetType") -def error_wrap(func): + +def error_wrap(func: Callable[Param, RetType]) -> Callable[Param, None]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) - check_error(code, context="partner") + def f(*args: Param.args, **kwargs: Param.kwargs) -> None: + code = func(*args, **kwargs) + check_error(code, context="client") return f @@ -34,14 +38,13 @@ class Partner: A snap7 partner. """ - _pointer: Optional[c_void_p] + _pointer: c_void_p def __init__(self, active: bool = False): - self._library = load_library() - self._pointer = None + self._library: Snap7CliProtocol = load_library() self.create(active) - def __del__(self): + def __del__(self) -> None: self.destroy() def as_b_send(self) -> int: @@ -91,7 +94,7 @@ def check_as_b_send_completion(self) -> Tuple[str, c_int32]: return return_values[result], op_result - def create(self, active: bool = False): + def create(self, active: bool = False) -> None: """ Creates a Partner and returns its handle, which is the reference that you have to use every time you refer to that Partner. @@ -99,10 +102,10 @@ def create(self, active: bool = False): :param active: 0 :returns: a pointer to the partner object """ - self._library.Par_Create.restype = S7Object - self._pointer = S7Object(self._library.Par_Create(int(active))) + self._library.Par_Create.restype = S7Object # type: ignore[attr-defined] + self._pointer = self._library.Par_Create(int(active)) - def destroy(self): + def destroy(self) -> Optional[int]: """ Destroy a Partner of given handle. Before destruction the Partner is stopped, all clients disconnected and @@ -121,7 +124,7 @@ def get_last_error(self) -> c_int32: check_error(result, "partner") return error - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """ Reads an internal Partner object parameter. """ @@ -166,10 +169,10 @@ def get_times(self) -> Tuple[c_int32, c_int32]: return send_time, recv_time @error_wrap - def set_param(self, number: int, value) -> int: + def set_param(self, number: int, value: int) -> int: """Sets an internal Partner object parameter.""" logger.debug(f"setting param number {number} to {value}") - return self._library.Par_SetParam(self._pointer, number, byref(c_int(value))) + return self._library.Par_SetParam(self._pointer, c_int(number), byref(c_int(value))) def set_recv_callback(self) -> int: """ diff --git a/snap7/protocol.py b/snap7/protocol.py new file mode 100644 index 00000000..99b819a0 --- /dev/null +++ b/snap7/protocol.py @@ -0,0 +1,140 @@ +from typing import Protocol + + +class Snap7CliProtocol(Protocol): + # Client + def Cli_Create(self): ... + def Cli_Destroy(self, pointer): ... + def Cli_PlcStop(self, pointer): ... + def Cli_PlcColdStart(self, pointer): ... + def Cli_PlcHotStart(self, pointer): ... + def Cli_GetPlcStatus(self, pointer, state): ... + def Cli_GetCpuInfo(self, pointer, info): ... + def Cli_Disconnect(self, pointer): ... + def Cli_Connect(self, pointer): ... + def Cli_ConnectTo(self, pointer, address, rack, slot): ... + def Cli_DBRead(self, pointer, db_number, start, size, data): ... + def Cli_DBWrite(self, pointer, db_number, start, size, data): ... + def Cli_Delete(self, pointer, blocktype, block_num): ... + def Cli_FullUpload(self, pointer, blocktype, block_num, data, size): ... + def Cli_Upload(self, pointer, block_type, block_num, data, size): ... + def Cli_Download(self, pointer, block_num, data, size): ... + def Cli_DBGet(self, pointer, db_number, data, size): ... + def Cli_ReadArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_WriteArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_ReadMultiVars(self, pointer, items, items_count32): ... + def Cli_ListBlocks(self, pointer, blocksList): ... + def Cli_ListBlocksOfType(self, pointer, blocktype, data, count): ... + def Cli_GetAgBlockInfo(self, pointer, blocktype, db_number, data): ... + def Cli_SetSessionPassword(self, pointer, password): ... + def Cli_ClearSessionPassword(self, pointer): ... + def Cli_SetConnectionParams(self, pointer, address, local_tsap, remote_tsap): ... + def Cli_SetConnectionType(self, pointer, connection_type): ... + def Cli_GetConnected(self, pointer, connected): ... + def Cli_ABRead(self, pointer, start, size, data): ... + def Cli_ABWrite(self, pointer, start, size, cdata): ... + def Cli_AsABRead(self, pointer, start, size, data): ... + def Cli_AsABWrite(self, pointer, start, size, cdata): ... + def Cli_AsCompress(self, pointer, time): ... + def Cli_AsCopyRamToRom(self, pointer, time): ... + def Cli_AsCTRead(self, pointer, start, amount, data): ... + def Cli_AsCTWrite(self, pointer, start, amount, cdata): ... + def Cli_AsDBFill(self, pointer, db_number, filler): ... + def Cli_AsDBGet(self, pointer, db_number, _buffer, size): ... + def Cli_AsDBRead(self, pointer, db_number, start, size, data): ... + def Cli_AsDBWrite(self, pointer, db_number, start, size, data): ... + def Cli_AsDownload(self, pointer, block_num, cdata, size): ... + def Cli_Compress(self, pointer, time): ... + def Cli_SetParam(self, pointer, number, value): ... + def Cli_GetParam(self, pointer, number, value): ... + def Cli_GetPduLength(self, pointer, requested_, negotiated_): ... + def Cli_GetPlcDateTime(self, pointer, buffer): ... + def Cli_SetPlcDateTime(self, pointer, buffer): ... + def Cli_SetAsCallback(self, pointer, pfn_clicompletion, p_usr): ... + def Cli_WaitAsCompletion(self, pointer, timeout): ... + def Cli_AsReadArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_AsWriteArea(self, pointer, area, dbnumber, start, size, wordlen, data): ... + def Cli_AsEBRead(self, pointer, start, size, data): ... + def Cli_AsEBWrite(self, pointer, start, size, cdata): ... + def Cli_AsFullUpload(self, pointer, block_type, block_num, _buffer, size): ... + def Cli_AsListBlocksOfType(self, pointer, _blocktype, data, count): ... + def Cli_AsMBRead(self, pointer, start, size, data): ... + def Cli_AsMBWrite(self, pointer, start, size, data): ... + def Cli_AsReadSZL(self, pointer, ssl_id, index, s7_szl, size): ... + def Cli_AsReadSZLList(self, pointer, szl_list, items_count): ... + def Cli_AsTMRead(self, pointer, start, amount, data): ... + def Cli_AsTMWrite(self, pointer, start, amount, data): ... + def Cli_AsUpload(self, pointer, block_type, block_num, _buffer, size): ... + def Cli_CopyRamToRom(self, pointer, timeout): ... + def Cli_CTRead(self, pointer, start, amount, data): ... + def Cli_CTWrite(self, pointer, start, amount, cdata): ... + def Cli_DBFill(self, pointer, db_number, filler): ... + def Cli_EBRead(self, pointer, start, size, data): ... + def Cli_EBWrite(self, pointer, start, size, cdata): ... + def Cli_ErrorText(self, error_code32, text, text_length): ... + def Cli_GetCpInfo(self, pointer, cp_info): ... + def Cli_GetExecTime(self, pointer, time): ... + def Cli_GetLastError(self, pointer, last_error): ... + def Cli_GetOrderCode(self, pointer, order_code): ... + def Cli_GetPgBlockInfo(self, pointer, buffer, block_info, size): ... + def Cli_GetProtection(self, pointer, s7_protection): ... + def Cli_IsoExchangeBuffer(self, pointer, cdata, size): ... + def Cli_MBRead(self, pointer, start, size, data): ... + def Cli_MBWrite(self, pointer, start, size, cdata): ... + def Cli_ReadSZL(self, pointer, ssl_id, index, s7_szl, size): ... + def Cli_ReadSZLList(self, pointer, szl_list, items_count): ... + def Cli_SetPlcSystemDateTime(self, pointer): ... + def Cli_TMRead(self, pointer, start, amount, data): ... + def Cli_TMWrite(self, pointer, start, amount, cdata): ... + def Cli_WriteMultiVars(self, pointer, cdata, items_count32): ... + def Cli_CheckAsCompletion(self, pointer, p_value): ... + # Server + def Srv_Create(self): ... + def Srv_Start(self, pointer): ... + def Srv_Stop(self, pointer): ... + def Srv_Destroy(self, pointer) -> None: ... + def Srv_EventText(self, event, text, len_): ... + def Srv_RegisterArea(self, pointer, area_code, index, userdata, size): ... + def Srv_SetEventsCallback(self, pointer, callback, usrPtr): ... + def Srv_SetReadEventsCallback(self, pointer, read_callback): ... + def Srv_GetStatus(self, pointer, server_status, cpu_status, clients_count): ... + def Srv_UnregisterArea(self, pointer, area_code, index): ... + def Srv_UnlockArea(self, pointer, code, index): ... + def Srv_LockArea(self, pointer, code, index): ... + def Srv_StartTo(self, pointer, ip): ... + def Srv_SetParam(self, pointer, number, value): ... + def Srv_SetMask(self, pointer, kind, mask): ... + def Srv_SetCpuStatus(self, pointer, status): ... + def Srv_PickEvent(self, pointer, event, ready): ... + def Srv_GetParam(self, pointer, number, value): ... + def Srv_GetMask(self, pointer, kind, mask): ... + def Srv_ClearEvents(self, pointer): ... + def Srv_ErrorText(self, error_code32, text, text_length): ... + # Partner + def Par_Create(self, active): ... + def Par_AsBSend(self, pointer): ... + def Par_BRecv(self, pointer): ... + def Par_BSend(self, pointer): ... + def Par_CheckAsBRecvCompletion(self, pointer): ... + def Par_CheckAsBSendCompletion(self, pointer, result): ... + def Par_Destroy(self, pointer): ... + def Par_GetLastError(self, pointer, last_error): ... + def Par_GetStats( + self, + pointer, + bytes_sent, + bytes_recv, + send_errors, + recv_errors, + ): ... + def Par_GetStatus(self, pointer, status): ... + def Par_SetParam(self, pointer, number, value): ... + def Par_GetParam(self, pointer, number, value): ... + def Par_SetRecvCallback(self, pointer): ... + def Par_SetSendCallback(self, pointer): ... + def Par_Start(self, pointer): ... + def Par_StartTo(self, pointer, local_address, remote_address, local_tsap, remote_tsap): ... + def Par_Stop(self, pointer): ... + def Par_WaitAsBSendCompletion(self, pointer, timeout): ... + def Par_ErrorText(self, error_code32, text, text_length): ... + def Par_GetTimes(self, pointer, send_time, recv_time): ... diff --git a/snap7/protocol.pyi b/snap7/protocol.pyi new file mode 100644 index 00000000..cbec0bcf --- /dev/null +++ b/snap7/protocol.pyi @@ -0,0 +1,160 @@ +from typing import Type + +from ctypes import Array, c_char, c_char_p, c_int, c_int32, c_uint16, c_ulong, c_void_p, _FuncPointer +from _ctypes import CFuncPtr, _CArgObject + +class Snap7CliProtocol: + # Client + def Cli_Create(self) -> c_void_p: ... + def Cli_Destroy(self, pointer: _CArgObject) -> int: ... + def Cli_PlcStop(self, pointer: c_void_p) -> int: ... + def Cli_PlcColdStart(self, pointer: c_void_p) -> int: ... + def Cli_PlcHotStart(self, pointer: c_void_p) -> int: ... + def Cli_GetPlcStatus(self, pointer: c_void_p, state: _CArgObject) -> int: ... + def Cli_GetCpuInfo(self, pointer: c_void_p, info: _CArgObject) -> int: ... + def Cli_Disconnect(self, pointer: c_void_p) -> int: ... + def Cli_Connect(self, pointer: c_void_p) -> int: ... + def Cli_ConnectTo(self, pointer: c_void_p, address: c_char_p, rack: c_int, slot: c_int) -> int: ... + def Cli_DBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_DBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_Delete(self, pointer: c_void_p, blocktype: c_int, block_num: int) -> int: ... + def Cli_FullUpload( + self, pointer: c_void_p, blocktype: c_int, block_num: int, data: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_Upload(self, pointer: c_void_p, block_type: c_int, block_num: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_Download(self, pointer: c_void_p, block_num: int, data: _CArgObject, size: int) -> int: ... + def Cli_DBGet(self, pointer: c_void_p, db_number: int, data: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_WriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_ReadMultiVars(self, pointer: c_void_p, items: _CArgObject, items_count: c_int32) -> int: ... + def Cli_ListBlocks(self, pointer: c_void_p, blocksList: _CArgObject) -> int: ... + def Cli_ListBlocksOfType(self, pointer: c_void_p, blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_GetAgBlockInfo(self, pointer: c_void_p, blocktype: c_int, db_number: int, data: _CArgObject) -> int: ... + def Cli_SetSessionPassword(self, pointer: c_void_p, password: c_char_p) -> int: ... + def Cli_ClearSessionPassword(self, pointer: c_void_p) -> int: ... + def Cli_SetConnectionParams(self, pointer: c_void_p, address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16) -> int: ... + def Cli_SetConnectionType(self, pointer: c_void_p, connection_type: c_uint16) -> int: ... + def Cli_GetConnected(self, pointer: c_void_p, connected: _CArgObject) -> int: ... + def Cli_ABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_ABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsABRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsABWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsCompress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCopyRamToRom(self, pointer: c_void_p, time: int) -> int: ... + def Cli_AsCTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsCTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_AsDBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_AsDBGet(self, pointer: c_void_p, db_number: int, _buffer: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsDBRead(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDBWrite(self, pointer: c_void_p, db_number: int, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsDownload(self, pointer: c_void_p, block_num: int, cdata: _CArgObject, size: int) -> int: ... + def Cli_Compress(self, pointer: c_void_p, time: int) -> int: ... + def Cli_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Cli_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Cli_GetPduLength(self, pointer: c_void_p, requested_: _CArgObject, negotiated_: _CArgObject) -> int: ... + def Cli_GetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetPlcDateTime(self, pointer: c_void_p, buffer: _CArgObject) -> int: ... + def Cli_SetAsCallback(self, pointer: c_void_p, pfn_clicompletion: CFuncPtr, p_usr: c_void_p) -> int: ... + def Cli_WaitAsCompletion(self, pointer: c_void_p, timeout: c_ulong) -> int: ... + def Cli_AsReadArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsWriteArea( + self, pointer: c_void_p, area: int, dbnumber: int, start: int, size: int, wordlen: int, data: _CArgObject + ) -> int: ... + def Cli_AsEBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsEBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_AsFullUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_AsListBlocksOfType(self, pointer: c_void_p, _blocktype: c_int, data: _CArgObject, count: _CArgObject) -> int: ... + def Cli_AsMBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsMBWrite(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_AsReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_AsReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_AsTMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsTMWrite(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_AsUpload( + self, pointer: c_void_p, block_type: c_int, block_num: int, _buffer: _CArgObject, size: _CArgObject + ) -> int: ... + def Cli_CopyRamToRom(self, pointer: c_void_p, timeout: int) -> int: ... + def Cli_CTRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_CTWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_DBFill(self, pointer: c_void_p, db_number: int, filler: int) -> int: ... + def Cli_EBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_EBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + def Cli_GetCpInfo(self, pointer: c_void_p, cp_info: _CArgObject) -> int: ... + def Cli_GetExecTime(self, pointer: c_void_p, time: _CArgObject) -> int: ... + def Cli_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Cli_GetOrderCode(self, pointer: c_void_p, order_code: _CArgObject) -> int: ... + def Cli_GetPgBlockInfo(self, pointer: c_void_p, buffer: _CArgObject, block_info: _CArgObject, size: c_int) -> int: ... + def Cli_GetProtection(self, pointer: c_void_p, s7_protection: _CArgObject) -> int: ... + def Cli_IsoExchangeBuffer(self, pointer: c_void_p, cdata: _CArgObject, size: _CArgObject) -> int: ... + def Cli_MBRead(self, pointer: c_void_p, start: int, size: int, data: _CArgObject) -> int: ... + def Cli_MBWrite(self, pointer: c_void_p, start: int, size: int, cdata: _CArgObject) -> int: ... + def Cli_ReadSZL(self, pointer: c_void_p, ssl_id: int, index: int, s7_szl: _CArgObject, size: _CArgObject) -> int: ... + def Cli_ReadSZLList(self, pointer: c_void_p, szl_list: _CArgObject, items_count: _CArgObject) -> int: ... + def Cli_SetPlcSystemDateTime(self, pointer: c_void_p) -> int: ... + def Cli_TMRead(self, pointer: c_void_p, start: int, amount: int, data: _CArgObject) -> int: ... + def Cli_TMWrite(self, pointer: c_void_p, start: int, amount: int, cdata: _CArgObject) -> int: ... + def Cli_WriteMultiVars(self, pointer: c_void_p, cdata: _CArgObject, items_count: c_int32) -> int: ... + def Cli_CheckAsCompletion(self, pointer: c_void_p, p_value: c_int) -> int: ... + # Server + def Srv_Create(self) -> c_void_p: ... + def Srv_Start(self, pointer: c_void_p) -> int: ... + def Srv_Stop(self, pointer: c_void_p) -> int: ... + def Srv_Destroy(self, pointer: _CArgObject) -> None: ... + def Srv_EventText(self, event: _CArgObject, text: _CArgObject, len_: int) -> int: ... + def Srv_RegisterArea(self, pointer: c_void_p, area_code: int, index: int, userdata: _CArgObject, size: int) -> int: ... + def Srv_SetEventsCallback(self, pointer: c_void_p, callback: Type[_FuncPointer], usrPtr: c_void_p) -> int: ... + def Srv_SetReadEventsCallback(self, pointer: c_void_p, read_callback: CFuncPtr) -> int: ... + def Srv_GetStatus( + self, pointer: c_void_p, server_status: _CArgObject, cpu_status: _CArgObject, clients_count: _CArgObject + ) -> int: ... + def Srv_UnregisterArea(self, pointer: c_void_p, area_code: int, index: int) -> int: ... + def Srv_UnlockArea(self, pointer: c_void_p, code: int, index: int) -> int: ... + def Srv_LockArea(self, pointer: c_void_p, code: int, index: int) -> int: ... + def Srv_StartTo(self, pointer: c_void_p, ip: bytes) -> int: ... + def Srv_SetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Srv_SetMask(self, pointer: c_void_p, kind: int, mask: int) -> int: ... + def Srv_SetCpuStatus(self, pointer: c_void_p, status: int) -> int: ... + def Srv_PickEvent(self, pointer: c_void_p, event: _CArgObject, ready: _CArgObject) -> int: ... + def Srv_GetParam(self, pointer: c_void_p, number: int, value: _CArgObject) -> int: ... + def Srv_GetMask(self, pointer: c_void_p, kind: int, mask: _CArgObject) -> int: ... + def Srv_ClearEvents(self, pointer: c_void_p) -> int: ... + def Srv_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + # Partner + def Par_Create(self, active: int) -> c_void_p: ... + def Par_AsBSend(self, pointer: c_void_p) -> int: ... + def Par_BRecv(self, pointer: c_void_p) -> int: ... + def Par_BSend(self, pointer: c_void_p) -> int: ... + def Par_CheckAsBRecvCompletion(self, pointer: c_void_p) -> int: ... + def Par_CheckAsBSendCompletion(self, pointer: c_void_p, result: _CArgObject) -> int: ... + def Par_Destroy(self, pointer: _CArgObject) -> int: ... + def Par_GetLastError(self, pointer: c_void_p, last_error: _CArgObject) -> int: ... + def Par_GetStats( + self, + pointer: c_void_p, + bytes_sent: _CArgObject, + bytes_recv: _CArgObject, + send_errors: _CArgObject, + recv_errors: _CArgObject, + ) -> int: ... + def Par_GetStatus(self, pointer: c_void_p, status: _CArgObject) -> int: ... + def Par_SetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Par_GetParam(self, pointer: c_void_p, number: c_int, value: _CArgObject) -> int: ... + def Par_SetRecvCallback(self, pointer: c_void_p) -> int: ... + def Par_SetSendCallback(self, pointer: c_void_p) -> int: ... + def Par_Start(self, pointer: c_void_p) -> int: ... + def Par_StartTo( + self, pointer: c_void_p, local_address: bytes, remote_address: bytes, local_tsap: c_uint16, remote_tsap: c_uint16 + ) -> int: ... + def Par_Stop(self, pointer: c_void_p) -> int: ... + def Par_WaitAsBSendCompletion(self, pointer: c_void_p, timeout: int) -> int: ... + def Par_ErrorText(self, error_code: c_int32, text: Array[c_char], text_length: c_int) -> int: ... + def Par_GetTimes(self, pointer: c_void_p, send_time: _CArgObject, recv_time: _CArgObject) -> int: ... diff --git a/snap7/server/__init__.py b/snap7/server/__init__.py index b132b7eb..ea5681b2 100644 --- a/snap7/server/__init__.py +++ b/snap7/server/__init__.py @@ -4,25 +4,43 @@ import re import time -from ctypes import c_char, byref, sizeof, c_int, c_int32, c_uint32, c_void_p, CFUNCTYPE, POINTER, Array, c_int8 +from ctypes import ( + c_char, + byref, + sizeof, + c_int, + c_int32, + c_uint32, + c_void_p, + CFUNCTYPE, + POINTER, + Array, + _SimpleCData, + _FuncPointer, +) import struct import logging -from typing import Any, Tuple, Callable, Optional +from typing import Any, Callable, Optional, Tuple, ParamSpec, TypeVar, cast, Type +from types import TracebackType from ..common import ipv4, check_error, load_library +from ..protocol import Snap7CliProtocol from ..types import SrvEvent, LocalPort, cpu_statuses, server_statuses from ..types import longword, wordlen_to_ctypes, WordLen, S7Object from ..types import srvAreaDB, srvAreaPA, srvAreaTM, srvAreaCT logger = logging.getLogger(__name__) +Param = ParamSpec("Param") +RetType = TypeVar("RetType") -def error_wrap(func): + +def error_wrap(func: Callable[Param, RetType]) -> Callable[Param, None]: """Parses a s7 error code returned the decorated function.""" - def f(*args, **kw): - code = func(*args, **kw) - check_error(code, context="server") + def f(*args: Param.args, **kwargs: Param.kwargs) -> None: + code = func(*args, **kwargs) + check_error(code, context="client") return f @@ -32,8 +50,8 @@ class Server: A fake S7 server. """ - _lib: Any # since this is dynamically loaded from a DLL we don't have the type signature. - _s7_server: Optional[S7Object] = None + _lib: Snap7CliProtocol + _s7_server: S7Object _read_callback = None _callback: Optional[Callable[..., Any]] = None @@ -44,18 +62,20 @@ def __init__(self, log: bool = True): Args: log: `True` for enabling the event logging. Optinoal. """ - self._lib = load_library() + self._lib: Snap7CliProtocol = load_library() self.create() if log: self._set_log_callback() - def __enter__(self): + def __enter__(self) -> "Server": return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: self.destroy() - def __del__(self): + def __del__(self) -> None: self.destroy() def event_text(self, event: SrvEvent) -> str: @@ -75,14 +95,14 @@ def event_text(self, event: SrvEvent) -> str: check_error(error) return text.value.decode("ascii") - def create(self): + def create(self) -> None: """Create the server.""" logger.info("creating server") - self._lib.Srv_Create.restype = S7Object - self._s7_server = S7Object(self._lib.Srv_Create()) + self._lib.Srv_Create.restype = S7Object # type: ignore[attr-defined] + self._s7_server = self._lib.Srv_Create() @error_wrap - def register_area(self, area_code: int, index: int, userdata: Array[c_int8]): + def register_area(self, area_code: int, index: int, userdata: Array[_SimpleCData[int]]) -> int: """Shares a memory area with the server. That memory block will be visible by the clients. @@ -121,12 +141,12 @@ def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int: call_back(pevent.contents) return 0 - self._callback = callback_wrap(wrapper) + self._callback = cast(type[_FuncPointer], callback_wrap(wrapper)) usrPtr = c_void_p() return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, usrPtr) @error_wrap - def set_read_events_callback(self, call_back: Callable[..., Any]): + def set_read_events_callback(self, call_back: Callable[..., Any]) -> int: """Sets the user callback that the Server object has to call when a Read event is created. @@ -154,17 +174,17 @@ def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int: self._read_callback = callback_wrapper(wrapper) return self._lib.Srv_SetReadEventsCallback(self._s7_server, self._read_callback) - def _set_log_callback(self): + def _set_log_callback(self) -> None: """Sets a callback that logs the events""" logger.debug("setting up event logger") - def log_callback(event): + def log_callback(event: SrvEvent) -> None: logger.info(f"callback event: {self.event_text(event)}") self.set_events_callback(log_callback) @error_wrap - def start(self, tcpport: int = 102): + def start(self, tcpport: int = 102) -> int: """Starts the server. Args: @@ -177,17 +197,17 @@ def start(self, tcpport: int = 102): return self._lib.Srv_Start(self._s7_server) @error_wrap - def stop(self): + def stop(self) -> int: """Stop the server.""" logger.info("stopping server") return self._lib.Srv_Stop(self._s7_server) - def destroy(self) -> Optional[int]: + def destroy(self) -> None: """Destroy the server.""" logger.info("destroying server") if self._lib and self._s7_server is not None: return self._lib.Srv_Destroy(byref(self._s7_server)) - self._s7_server = None + self._s7_server = None # type: ignore[assignment] return None def get_status(self) -> Tuple[str, str, int]: @@ -207,7 +227,7 @@ def get_status(self) -> Tuple[str, str, int]: return (server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value) @error_wrap - def unregister_area(self, area_code: int, index: int): + def unregister_area(self, area_code: int, index: int) -> int: """'Unshares' a memory area previously shared with Srv_RegisterArea(). Notes: @@ -223,7 +243,7 @@ def unregister_area(self, area_code: int, index: int): return self._lib.Srv_UnregisterArea(self._s7_server, area_code, index) @error_wrap - def unlock_area(self, code: int, index: int): + def unlock_area(self, code: int, index: int) -> int: """Unlocks a previously locked shared memory area. Args: @@ -237,7 +257,7 @@ def unlock_area(self, code: int, index: int): return self._lib.Srv_UnlockArea(self._s7_server, code, index) @error_wrap - def lock_area(self, code: int, index: int): + def lock_area(self, code: int, index: int) -> int: """Locks a shared memory area. Args: @@ -251,7 +271,7 @@ def lock_area(self, code: int, index: int): return self._lib.Srv_LockArea(self._s7_server, code, index) @error_wrap - def start_to(self, ip: str, tcpport: int = 102): + def start_to(self, ip: str, tcpport: int = 102) -> int: """Start server on a specific interface. Args: @@ -270,7 +290,7 @@ def start_to(self, ip: str, tcpport: int = 102): return self._lib.Srv_StartTo(self._s7_server, ip.encode()) @error_wrap - def set_param(self, number: int, value: int): + def set_param(self, number: int, value: int) -> int: """Sets an internal Server object parameter. Args: @@ -284,7 +304,7 @@ def set_param(self, number: int, value: int): return self._lib.Srv_SetParam(self._s7_server, number, byref(c_int(value))) @error_wrap - def set_mask(self, kind: int, mask: int): + def set_mask(self, kind: int, mask: int) -> int: """Writes the specified filter mask. Args: @@ -298,7 +318,7 @@ def set_mask(self, kind: int, mask: int): return self._lib.Srv_SetMask(self._s7_server, kind, mask) @error_wrap - def set_cpu_status(self, status: int): + def set_cpu_status(self, status: int) -> int: """Sets the Virtual CPU status. Args: @@ -332,7 +352,7 @@ def pick_event(self) -> Optional[SrvEvent]: logger.debug("no events ready") return None - def get_param(self, number) -> int: + def get_param(self, number: int) -> int: """Reads an internal Server object parameter. Args: @@ -373,7 +393,7 @@ def clear_events(self) -> int: return self._lib.Srv_ClearEvents(self._s7_server) -def mainloop(tcpport: int = 1102, init_standard_values: bool = False): +def mainloop(tcpport: int = 1102, init_standard_values: bool = False) -> None: """Init a fake Snap7 server with some default values. Args: @@ -383,10 +403,10 @@ def mainloop(tcpport: int = 1102, init_standard_values: bool = False): server = Server() size = 100 - DBdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - PAdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - TMdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() - CTdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + DBdata: Array[_SimpleCData[int]] = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + PAdata: Array[_SimpleCData[int]] = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + TMdata: Array[_SimpleCData[int]] = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + CTdata: Array[_SimpleCData[int]] = (wordlen_to_ctypes[WordLen.Byte.value] * size)() server.register_area(srvAreaDB, 1, DBdata) server.register_area(srvAreaPA, 1, PAdata) server.register_area(srvAreaTM, 1, TMdata) diff --git a/snap7/server/__main__.py b/snap7/server/__main__.py index 015bddc1..d6fb4476 100644 --- a/snap7/server/__main__.py +++ b/snap7/server/__main__.py @@ -6,6 +6,7 @@ """ import logging +from ctypes import CDLL try: import click @@ -20,18 +21,18 @@ logger = logging.getLogger("Snap7.Server") -@click.command() -@click.option("-p", "--port", default=1102, help="Port the server will listen on.") -@click.option( +@click.command() # type: ignore[misc] +@click.option("-p", "--port", default=1102, help="Port the server will listen on.") # type: ignore[misc] +@click.option( # type: ignore[misc] "--dll", hidden=True, type=click.Path(exists=True, file_okay=True, dir_okay=False, resolve_path=True), help="Path to the snap7 DLL (for emergencies if it can't be put on PATH).", ) -@click.option("-v", "--verbose", is_flag=True, help="Also print debug-output.") -@click.version_option(__version__) -@click.help_option("-h", "--help") -def main(port, dll, verbose): +@click.option("-v", "--verbose", is_flag=True, help="Also print debug-output.") # type: ignore[misc] +@click.version_option(__version__) # type: ignore[misc] +@click.help_option("-h", "--help") # type: ignore[misc] +def main(port: int, dll: CDLL, verbose: bool) -> None: """Start a S7 dummy server with some default values.""" # setup logging diff --git a/snap7/types.py b/snap7/types.py index 406e6d16..9d8d14ca 100755 --- a/snap7/types.py +++ b/snap7/types.py @@ -274,7 +274,7 @@ class S7CpuInfo(ctypes.Structure): ("ModuleName", ctypes.c_char * 25), ] - def __str__(self): + def __str__(self) -> str: return ( f"" diff --git a/snap7/util/__init__.py b/snap7/util/__init__.py index f6776aed..dde717f0 100644 --- a/snap7/util/__init__.py +++ b/snap7/util/__init__.py @@ -85,7 +85,7 @@ import re import time -from typing import Union +from typing import Any, Union from datetime import date, datetime from collections import OrderedDict @@ -147,7 +147,7 @@ def utc2local(utc: Union[date, datetime]) -> Union[datetime, date]: return utc + offset -def parse_specification(db_specification: str) -> OrderedDict: +def parse_specification(db_specification: str) -> OrderedDict[str, Any]: """Create a db specification derived from a dataview of a db in which the byte layout is specified @@ -168,7 +168,7 @@ def parse_specification(db_specification: str) -> OrderedDict: return parsed_db_specification -def print_row(data): +def print_row(data: bytearray) -> None: """print a single db row in chr and str""" index_line = "" pri_line1 = "" @@ -179,9 +179,8 @@ def print_row(data): # index if not i % 5: diff = len(pri_line1) - len(index_line) - i = str(i) index_line += diff * " " - index_line += i + index_line += str(i) # i = i + (ws - len(i)) * ' ' + ',' # byte array line diff --git a/snap7/util/getters.py b/snap7/util/getters.py index f49cec17..aafc599a 100644 --- a/snap7/util/getters.py +++ b/snap7/util/getters.py @@ -1,6 +1,6 @@ import struct from datetime import timedelta, datetime, date -from typing import Union, List +from typing import Union, NoReturn from logging import getLogger logger = getLogger(__name__) @@ -44,7 +44,7 @@ def get_byte(bytearray_: bytearray, byte_index: int) -> bytes: data = bytearray_[byte_index : byte_index + 1] data[0] = data[0] & 0xFF packed = struct.pack("B", *data) - value = struct.unpack("B", packed)[0] + value: bytes = struct.unpack("B", packed)[0] return value @@ -70,7 +70,7 @@ def get_word(bytearray_: bytearray, byte_index: int) -> bytearray: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">H", packed)[0] + value: bytearray = struct.unpack(">H", packed)[0] return value @@ -96,7 +96,7 @@ def get_int(bytearray_: bytearray, byte_index: int) -> int: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">h", packed)[0] + value: int = struct.unpack(">h", packed)[0] return value @@ -124,7 +124,7 @@ def get_uint(bytearray_: bytearray, byte_index: int) -> int: data[1] = data[1] & 0xFF data[0] = data[0] & 0xFF packed = struct.pack("2B", *data) - value = struct.unpack(">H", packed)[0] + value: int = struct.unpack(">H", packed)[0] return value @@ -148,7 +148,7 @@ def get_real(bytearray_: bytearray, byte_index: int) -> float: 123.32099914550781 """ x = bytearray_[byte_index : byte_index + 4] - real = struct.unpack(">f", struct.pack("4B", *x))[0] + real: float = struct.unpack(">f", struct.pack("4B", *x))[0] return real @@ -238,7 +238,7 @@ def get_dword(bytearray_: bytearray, byte_index: int) -> int: 4294967295 """ data = bytearray_[byte_index : byte_index + 4] - dword = struct.unpack(">I", struct.pack("4B", *data))[0] + dword: int = struct.unpack(">I", struct.pack("4B", *data))[0] return dword @@ -265,7 +265,7 @@ def get_dint(bytearray_: bytearray, byte_index: int) -> int: 2147483647 """ data = bytearray_[byte_index : byte_index + 4] - dint = struct.unpack(">i", struct.pack("4B", *data))[0] + dint: int = struct.unpack(">i", struct.pack("4B", *data))[0] return dint @@ -292,7 +292,7 @@ def get_udint(bytearray_: bytearray, byte_index: int) -> int: 4294967295 """ data = bytearray_[byte_index : byte_index + 4] - dint = struct.unpack(">I", struct.pack("4B", *data))[0] + dint: int = struct.unpack(">I", struct.pack("4B", *data))[0] return dint @@ -437,7 +437,7 @@ def get_usint(bytearray_: bytearray, byte_index: int) -> int: """ data = bytearray_[byte_index] & 0xFF packed = struct.pack("B", data) - value = struct.unpack(">B", packed)[0] + value: int = struct.unpack(">B", packed)[0] return value @@ -463,11 +463,11 @@ def get_sint(bytearray_: bytearray, byte_index: int) -> int: """ data = bytearray_[byte_index] packed = struct.pack("B", data) - value = struct.unpack(">b", packed)[0] + value: int = struct.unpack(">b", packed)[0] return value -def get_lint(bytearray_: bytearray, byte_index: int): +def get_lint(bytearray_: bytearray, byte_index: int) -> NoReturn: """Get the long int THIS VALUE IS NEITHER TESTED NOR VERIFIED BY A REAL PLC AT THE MOMENT @@ -494,7 +494,7 @@ def get_lint(bytearray_: bytearray, byte_index: int): # raw_lint = bytearray_[byte_index:byte_index + 8] # lint = struct.unpack('>q', struct.pack('8B', *raw_lint))[0] # return lint - return NotImplementedError + raise NotImplementedError def get_lreal(bytearray_: bytearray, byte_index: int) -> float: @@ -519,7 +519,7 @@ def get_lreal(bytearray_: bytearray, byte_index: int) -> float: >>> snap7.util.get_lreal(data, 0) 12345.12345 """ - return struct.unpack_from(">d", bytearray_, offset=byte_index)[0] + return float(struct.unpack_from(">d", bytearray_, offset=byte_index)[0]) def get_lword(bytearray_: bytearray, byte_index: int) -> bytearray: @@ -571,7 +571,7 @@ def get_ulint(bytearray_: bytearray, byte_index: int) -> int: 12345 """ raw_ulint = bytearray_[byte_index : byte_index + 8] - lint = struct.unpack(">Q", struct.pack("8B", *raw_ulint))[0] + lint: int = struct.unpack(">Q", struct.pack("8B", *raw_ulint))[0] return lint @@ -715,5 +715,5 @@ def get_wstring(bytearray_: bytearray, byte_index: int) -> str: return bytearray_[wstring_start : wstring_start + wstr_symbols_amount].decode("utf-16-be") -def get_array(bytearray_: bytearray, byte_index: int) -> List: +def get_array(bytearray_: bytearray, byte_index: int) -> NoReturn: raise NotImplementedError diff --git a/snap7/util/setters.py b/snap7/util/setters.py index 83272ea5..19c633d8 100644 --- a/snap7/util/setters.py +++ b/snap7/util/setters.py @@ -62,7 +62,7 @@ def set_byte(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: return bytearray_ -def set_word(bytearray_: bytearray, byte_index: int, _int: int): +def set_word(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to word Notes: @@ -82,7 +82,7 @@ def set_word(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_int(bytearray_: bytearray, byte_index: int, _int: int): +def set_int(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to int Notes: @@ -108,7 +108,7 @@ def set_int(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_uint(bytearray_: bytearray, byte_index: int, _int: int): +def set_uint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set value in bytearray to unsigned int Notes: @@ -134,7 +134,7 @@ def set_uint(bytearray_: bytearray, byte_index: int, _int: int): return bytearray_ -def set_real(bytearray_: bytearray, byte_index: int, real) -> bytearray: +def set_real(bytearray_: bytearray, byte_index: int, real: Union[bool, str, float, int]) -> bytearray: """Set Real value Notes: @@ -154,15 +154,14 @@ def set_real(bytearray_: bytearray, byte_index: int, real) -> bytearray: >>> snap7.util.set_real(data, 0, 123.321) bytearray(b'B\\xf6\\xa4Z') """ - real = float(real) - real = struct.pack(">f", real) - _bytes = struct.unpack("4B", real) + real_packed = struct.pack(">f", float(real)) + _bytes = struct.unpack("4B", real_packed) for i, b in enumerate(_bytes): bytearray_[byte_index + i] = b return bytearray_ -def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: int): +def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: int) -> None: """Set space-padded fixed-length string value Args: @@ -200,7 +199,7 @@ def set_fstring(bytearray_: bytearray, byte_index: int, value: str, max_length: bytearray_[byte_index + r] = ord(" ") -def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int = 254): +def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int = 254) -> None: """Set string value Args: @@ -252,7 +251,7 @@ def set_string(bytearray_: bytearray, byte_index: int, value: str, max_size: int bytearray_[byte_index + 2 + r] = ord(" ") -def set_dword(bytearray_: bytearray, byte_index: int, dword: int): +def set_dword(bytearray_: bytearray, byte_index: int, dword: int) -> None: """Set a DWORD to the buffer. Notes: @@ -276,7 +275,7 @@ def set_dword(bytearray_: bytearray, byte_index: int, dword: int): bytearray_[byte_index + i] = b -def set_dint(bytearray_: bytearray, byte_index: int, dint: int): +def set_dint(bytearray_: bytearray, byte_index: int, dint: int) -> None: """Set value in bytearray to dint Notes: @@ -301,7 +300,7 @@ def set_dint(bytearray_: bytearray, byte_index: int, dint: int): bytearray_[byte_index + i] = b -def set_udint(bytearray_: bytearray, byte_index: int, udint: int): +def set_udint(bytearray_: bytearray, byte_index: int, udint: int) -> None: """Set value in bytearray to unsigned dint Notes: @@ -400,7 +399,7 @@ def set_usint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: return bytearray_ -def set_sint(bytearray_: bytearray, byte_index: int, _int) -> bytearray: +def set_sint(bytearray_: bytearray, byte_index: int, _int: int) -> bytearray: """Set small int to the buffer. Notes: