Skip to content

Commit

Permalink
sync log code with client code
Browse files Browse the repository at this point in the history
  • Loading branch information
gijzelaerr committed Jul 4, 2024
1 parent e312086 commit b93b2ef
Show file tree
Hide file tree
Showing 16 changed files with 433 additions and 240 deletions.
2 changes: 1 addition & 1 deletion example/boolean.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# then you can specify an area to read from:
# https://github.com/gijzelaerr/python-snap7/blob/master/snap7/types.py

from snap7.types import Area # noqa: E402
from snap7.type import Area # noqa: E402


# play with these functions.
Expand Down
2 changes: 1 addition & 1 deletion example/read_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from snap7 import Client
from error import check_error
from snap7.types import S7DataItem, Area, WordLen
from snap7.type import S7DataItem, Area, WordLen
from snap7.util import get_real, get_int

client = Client()
Expand Down
2 changes: 1 addition & 1 deletion example/write_multi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ctypes
import snap7
from snap7.types import Area, S7DataItem, WordLen
from snap7.type import Area, S7DataItem, WordLen
from snap7.util import set_int, set_real, get_int, get_real, get_s5time


Expand Down
2 changes: 1 addition & 1 deletion snap7/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .logo import Logo
from .partner import Partner
from .util.db import Row, DB
from .types import Area, Block, WordLen, SrvEvent, SrvArea
from .type import Area, Block, WordLen, SrvEvent, SrvArea

__all__ = ["Client", "Server", "Logo", "Partner", "Row", "DB", "Area", "Block", "WordLen", "SrvEvent", "SrvArea"]

Expand Down
83 changes: 42 additions & 41 deletions snap7/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

from snap7.common import ipv4, load_library
from snap7.protocol import Snap7CliProtocol
from snap7.types import S7SZL, Area, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem, Block
from snap7.types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from snap7.types import S7Object, buffer_size, buffer_type, cpu_statuses
from snap7.types import CDataArrayType, Parameter
from snap7.type import S7SZL, Area, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem, Block
from snap7.type import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from snap7.type import S7Object, buffer_size, buffer_type, cpu_statuses
from snap7.type import CDataArrayType, Parameter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, lib_location: Optional[str] = None):
Examples:
>>> import snap7
>>> client = snap7.client.Client() # If the `snap7.dll` file is in the path location
>>> client2 = snap7.client.Client(lib_location="/path/to/snap7.dll") # If the `snap7.dll` file is in another location
>>> client2 = snap7.client.Client(lib_location="/path/to/snap7.dll") # If the dll is in another location
<snap7.client.Client object at 0x0000028B257128E0>
"""

Expand Down Expand Up @@ -154,10 +154,10 @@ def get_cpu_info(self) -> S7CpuInfo:
Examples:
>>> cpu_info = Client().get_cpu_info()
>>> print(cpu_info)
"<S7CpuInfo ModuleTypeName: b'CPU 315-2 PN/DP'
<S7CpuInfo ModuleTypeName: b'CPU 315-2 PN/DP'
SerialNumber: b'S C-C2UR28922012'
ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment'
ModuleName: b'CPU 315-2 PN/DP'>
ModuleName: 'CPU 315-2 PN/DP' >
"""
info = S7CpuInfo()
result = self._lib.Cli_GetCpuInfo(self._s7_client, byref(info))
Expand Down Expand Up @@ -234,7 +234,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int:
Args:
db_number: number of the DB to be written.
start: byte index to start writing to.
data: buffer to be writen.
data: buffer to be written.
Returns:
Buffer written.
Expand Down Expand Up @@ -398,10 +398,10 @@ def write_area(self, area: Area, db_number: int, start: int, data: bytearray) ->
"""Writes a data area into a PLC.
Args:
area: area to be writen.
db_number: number of the db to be writen to. In case of Inputs, Marks or Outputs, this should be equal to 0.
area: area to be written.
db_number: number of the db to be written to. In case of Inputs, Marks or Outputs, this should be equal to 0
start: byte index to start writting.
data: buffer to be writen.
data: buffer to be written.
Returns:
Snap7 error code.
Expand Down Expand Up @@ -450,16 +450,15 @@ def list_blocks(self) -> BlocksList:
Block list structure object.
Examples:
>>> block_list = Client().list_blocks()
>>> print(block_list)
>>> print(Client().list_blocks())
<block list count OB: 0 FB: 0 FC: 0 SFB: 0 SFC: 0x0 DB: 1 SDB: 0>
"""
logger.debug("listing blocks")
blocksList = BlocksList()
result = self._lib.Cli_ListBlocks(self._s7_client, byref(blocksList))
block_list = BlocksList()
result = self._lib.Cli_ListBlocks(self._s7_client, byref(block_list))
check_error(result, context="client")
logger.debug(f"blocks: {blocksList}")
return blocksList
logger.debug(f"blocks: {block_list}")
return block_list

def list_blocks_of_type(self, block_type: Block, size: int) -> Union[int, Array[c_uint16]]:
"""This function returns the AG list of a specified block type.
Expand Down Expand Up @@ -569,7 +568,7 @@ def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int)
Raises:
:obj:`ValueError`: if the `address` is not a valid IPV4.
:obj:`ValueError`: if the result of setting the connection params is
different than 0.
different from 0.
"""
if not re.match(ipv4, address):
raise ValueError(f"{address} is invalid ipv4")
Expand All @@ -578,14 +577,14 @@ def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int)
raise ValueError("The parameter was invalid")

def set_connection_type(self, connection_type: int) -> None:
"""Sets the connection resource type, i.e the way in which the Clients connects to a PLC.
"""Sets the connection resource type, i.e. the way in which the Clients connect to a PLC.
Args:
connection_type: 1 for PG, 2 for OP, 3 to 10 for S7 Basic
Raises:
:obj:`ValueError`: if the result of setting the connection type is
different than 0.
different from 0.
"""
result = self._lib.Cli_SetConnectionType(self._s7_client, c_uint16(connection_type))
if result != 0:
Expand Down Expand Up @@ -692,7 +691,7 @@ def as_copy_ram_to_rom(self, timeout: int = 1) -> int:
"""Performs the Copy Ram to Rom action asynchronously.
Args:
timeout: time to wait unly fail.
timeout: time to wait until fail.
Returns:
Snap7 code.
Expand Down Expand Up @@ -722,7 +721,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
Args:
start: byte index to start to write from.
amount: amount of bytes to write.
data: buffer to be write.
data: buffer to write.
Returns:
Snap7 code.
Expand Down Expand Up @@ -779,9 +778,8 @@ def as_db_read(self, db_number: int, start: int, size: int, data: CDataArrayType
Examples:
>>> import ctypes
>>> data = (ctypes.c_uint8 * size)() # In this ctypes array data will be stored.
>>> result = Client().as_db_read(1, 0, size, data)
>>> result # 0 = success
>>> content = (ctypes.c_uint8 * size)() # In this ctypes array data will be stored.
>>> Client().as_db_read(1, 0, size, content)
0
"""
result = self._lib.Cli_AsDBRead(self._s7_client, db_number, start, size, byref(data))
Expand All @@ -792,10 +790,10 @@ def as_db_write(self, db_number: int, start: int, size: int, data: CDataArrayTyp
"""Writes a part of a DB into a PLC.
Args:
db_number: number of DB to be writen.
db_number: number of DB to be written.
start: byte index from where start to write to.
size: amount of bytes to write.
data: buffer to be writen.
data: buffer to be written.
Returns:
Snap7 code.
Expand Down Expand Up @@ -923,7 +921,9 @@ def set_plc_datetime(self, dt: datetime) -> int:
return self._lib.Cli_SetPlcDateTime(self._s7_client, byref(buffer))

def check_as_completion(self, p_value: c_int) -> int:
"""Method to check Status of an async request. Result contains if the check was successful, not the data value itself
"""Method to check Status of an async request.
Result contains if the check was successful, not the data value itself
Args:
p_value: Pointer where result of this check shall be written.
Expand All @@ -937,17 +937,17 @@ def check_as_completion(self, p_value: c_int) -> int:

def set_as_callback(self, call_back: Callable[..., Any]) -> int:
"""
Sets the user callback that is called when a asynchronous data sent is complete.
Sets the user callback that is called when an asynchronous data sent is complete.
"""
logger.info("setting event callback")
callback_wrap: Callable[..., Any] = CFUNCTYPE(None, c_void_p, c_int, c_int)

def wrapper(usrptr: Optional[c_void_p], op_code: int, op_result: int) -> int:
def wrapper(_: None, op_code: int, op_result: int) -> int:
"""Wraps python function into a ctypes function
Args:
usrptr: not used
_: not used
op_code:
op_result:
Expand All @@ -959,9 +959,8 @@ def wrapper(usrptr: Optional[c_void_p], op_code: int, op_result: int) -> int:
return 0

self._callback = callback_wrap(wrapper)
usrPtr = c_void_p()

result = self._lib.Cli_SetAsCallback(self._s7_client, self._callback, usrPtr)
data = c_void_p()
result = self._lib.Cli_SetAsCallback(self._s7_client, self._callback, data)
check_error(result, context="client")
return result

Expand All @@ -981,7 +980,7 @@ def wait_as_completion(self, timeout: int) -> int:

def as_read_area(self, area: Area, db_number: int, start: int, size: int, word_len: WordLen, data: CDataArrayType) -> int:
"""Reads a data area from a PLC asynchronously.
With this you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
With this you can read DB, Inputs, Outputs, Markers, Timers and Counters.
Args:
area: memory area to be read from.
Expand Down Expand Up @@ -1018,7 +1017,9 @@ def as_write_area(self, area: Area, db_number: int, start: int, size: int, word_
"""
type_ = WordLen.Byte.ctype
logger.debug(
f"writing area: {area.name} db_number: {db_number} start: {start}: size {size}: " f"word_len {word_len} type: {type_}"
f"writing area: {area.name} db_number: {db_number} "
f"start: {start}: size {size}: "
f"word_len {word_len} type: {type_}"
)
cdata = (type_ * len(data)).from_buffer_copy(data)
res = self._lib.Cli_AsWriteArea(self._s7_client, area, db_number, start, size, word_len.value, byref(cdata))
Expand Down Expand Up @@ -1092,7 +1093,7 @@ def as_list_blocks_of_type(self, block_type: Block, data: CDataArrayType, count:
return result

def as_mb_read(self, start: int, size: int, data: CDataArrayType) -> int:
"""Reads a part of Merkers area from a PLC.
"""Reads a part of Markers area from a PLC.
Args:
start: byte index from where to start to read from.
Expand All @@ -1107,7 +1108,7 @@ def as_mb_read(self, start: int, size: int, data: CDataArrayType) -> int:
return result

def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Merkers area into a PLC.
"""Writes a part of Markers area into a PLC.
Args:
start: byte index from where to start to write to.
Expand Down Expand Up @@ -1401,7 +1402,7 @@ def iso_exchange_buffer(self, data: bytearray) -> bytearray:
return result

def mb_read(self, start: int, size: int) -> bytearray:
"""Reads a part of Merkers area from a PLC.
"""Reads a part of Markers area from a PLC.
Args:
start: byte index to be read from.
Expand All @@ -1417,7 +1418,7 @@ def mb_read(self, start: int, size: int) -> bytearray:
return bytearray(data)

def mb_write(self, start: int, size: int, data: bytearray) -> int:
"""Writes a part of Merkers area into a PLC.
"""Writes a part of Markers area into a PLC.
Args:
start: byte index to be written.
Expand Down Expand Up @@ -1494,7 +1495,7 @@ def tm_write(self, start: int, amount: int, data: bytearray) -> int:
Args:
start: byte index from where is start to write to.
amount: amount of byte to be written.
data: data to be writen.
data: data to be written.
Returns:
Snap7 code.
Expand Down
2 changes: 1 addition & 1 deletion snap7/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Callable, Any, Hashable

from .common import logger, load_library
from .types import Context
from .type import Context

s7_client_errors = {
0x00100000: "errNegotiatingPDU",
Expand Down
Loading

0 comments on commit b93b2ef

Please sign in to comment.