diff --git a/snap7/client.py b/snap7/client.py index cbc20242..5108ec56 100644 --- a/snap7/client.py +++ b/snap7/client.py @@ -174,8 +174,7 @@ def disconnect(self) -> int: logger.info("disconnecting snap7 client") return self._lib.Cli_Disconnect(self._s7_client) - @error_wrap(context="client") - def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> int: + def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "Client": """Connects a Client Object to a PLC. Args: @@ -185,7 +184,7 @@ def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> in tcp_port: port of the PLC. Returns: - Error code from snap7 library. + The snap7 Logo instance Example: >>> import snap7 @@ -195,7 +194,8 @@ def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> in logger.info(f"connecting to {address}:{tcp_port} rack {rack} slot {slot}") self.set_param(parameter=Parameter.RemotePort, value=tcp_port) - return self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot)) + check_error(self._lib.Cli_ConnectTo(self._s7_client, c_char_p(address.encode()), c_int(rack), c_int(slot))) + return self def db_read(self, db_number: int, start: int, size: int) -> bytearray: """Reads a part of a DB from a PLC @@ -279,11 +279,11 @@ def full_upload(self, block_type: Block, block_num: int) -> Tuple[bytearray, int Returns: Tuple of the buffer and size. """ - _buffer = buffer_type() - size = c_int(sizeof(_buffer)) - result = self._lib.Cli_FullUpload(self._s7_client, block_type.ctype, block_num, byref(_buffer), byref(size)) + buffer = buffer_type() + size = c_int(sizeof(buffer)) + result = self._lib.Cli_FullUpload(self._s7_client, block_type.ctype, block_num, byref(buffer), byref(size)) check_error(result, context="client") - return bytearray(_buffer)[: size.value], size.value + return bytearray(buffer)[: size.value], size.value def upload(self, block_num: int) -> bytearray: """Uploads a block from AG. @@ -298,14 +298,14 @@ def upload(self, block_num: int) -> bytearray: Buffer with the uploaded block. """ logger.debug(f"db_upload block_num: {block_num}") - _buffer = buffer_type() - size = c_int(sizeof(_buffer)) + buffer = buffer_type() + size = c_int(sizeof(buffer)) - result = self._lib.Cli_Upload(self._s7_client, Block.DB.ctype, block_num, byref(_buffer), byref(size)) + result = self._lib.Cli_Upload(self._s7_client, Block.DB.ctype, block_num, byref(buffer), byref(size)) check_error(result, context="client") logger.info(f"received {size} bytes") - return bytearray(_buffer) + return bytearray(buffer) @error_wrap(context="client") def download(self, data: bytearray, block_num: int = -1) -> int: diff --git a/snap7/logo.py b/snap7/logo.py index 55ebb74b..f1f9778d 100644 --- a/snap7/logo.py +++ b/snap7/logo.py @@ -9,7 +9,7 @@ from .type import WordLen, Area, Parameter -from .error import check_error, error_wrap +from .error import check_error from snap7.client import Client logger = logging.getLogger(__name__) @@ -28,8 +28,7 @@ class Logo(Client): For more information see examples for Siemens Logo 7 and 8 """ - @error_wrap(context="client") - def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int = 102) -> int: + def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcp_port: int = 102) -> "Logo": """Connect to a Siemens LOGO server. Notes: @@ -39,17 +38,16 @@ def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int ip_address: IP ip_address of server tsap_snap7: TSAP SNAP7 Client (e.g. 10.00 = 0x1000) tsap_logo: TSAP Logo Server (e.g. 20.00 = 0x2000) + tcp_port: TCP port of server Returns: - Error code from snap7 library. + The snap7 Logo instance """ - logger.info(f"connecting to {ip_address}:{tcpport} tsap_snap7 {tsap_snap7} tsap_logo {tsap_logo}") - # special handling for Siemens Logo - # 1st set connection params - # 2nd connect without any parameters - self.set_param(Parameter.RemotePort, tcpport) + logger.info(f"connecting to {ip_address}:{tcp_port} tsap_snap7 {tsap_snap7} tsap_logo {tsap_logo}") + self.set_param(Parameter.RemotePort, tcp_port) self.set_connection_params(ip_address, tsap_snap7, tsap_logo) - return self._lib.Cli_Connect(self._s7_client) + check_error(self._lib.Cli_Connect(self._s7_client)) + return self def read(self, vm_address: str) -> int: """Reads from VM addresses of Siemens Logo. Examples: read("V40") / read("VW64") / read("V10.2") diff --git a/snap7/server/__init__.py b/snap7/server/__init__.py index 5a8ca1f2..a7d46460 100644 --- a/snap7/server/__init__.py +++ b/snap7/server/__init__.py @@ -397,6 +397,7 @@ def mainloop(tcpport: int = 1102, init_standard_values: bool = False) -> None: server.register_area(SrvArea.CT, 1, CTdata) if init_standard_values: + logger.info("initialising with standard values") ba = _init_standard_values() userdata = WordLen.Byte.ctype * len(ba) server.register_area(SrvArea.DB, 0, userdata.from_buffer(ba)) diff --git a/tests/test_client.py b/tests/test_client.py index e26b09a7..f198a44c 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -184,21 +184,28 @@ def test_read_multi_vars(self) -> None: self.assertEqual(result_values[1], test_values[1]) self.assertEqual(result_values[2], test_values[2]) + @unittest.skip("Not implemented by the snap7 server") def test_upload(self) -> None: """ - this raises an exception due to missing authorization? maybe not - implemented in server emulator + This is not implemented by the server and will always raise a RuntimeError (security error) """ self.assertRaises(RuntimeError, self.client.upload, db_number) + @unittest.skip("Not implemented by the snap7 server") def test_as_upload(self) -> None: + """ + This is not implemented by the server and will always raise a RuntimeError (security error) + """ _buffer = typing_cast(Array[c_int32], buffer_type()) size = sizeof(_buffer) self.client.as_upload(1, _buffer, size) self.assertRaises(RuntimeError, self.client.wait_as_completion, 500) - @unittest.skip("TODO: not yet implemented") + @unittest.skip("Not implemented by the snap7 server") def test_download(self) -> None: + """ + This is not implemented by the server and will always raise a RuntimeError (security error) + """ data = bytearray([0b11111111]) self.client.download(block_num=0, data=data) @@ -435,6 +442,7 @@ def test_as_db_write(self) -> None: self.client.wait_as_completion(500) self.assertEqual(data, result) + @unittest.skip("Not implemented by the snap7 server") def test_as_download(self) -> None: data = bytearray(128) self.client.as_download(block_num=-1, data=data) @@ -479,105 +487,13 @@ def test_db_write_with_byte_literal_does_not_throw(self) -> None: finally: self.client._lib.Cli_DBWrite = original - def test_download_with_byte_literal_does_not_throw(self) -> None: - mock_download = mock.MagicMock() - mock_download.return_value = None - original = self.client._lib.Cli_Download - self.client._lib.Cli_Download = mock_download - data = b"\xde\xad\xbe\xef" - - try: - self.client.download(block_num=db_number, data=bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_Download = original - - def test_write_area_with_byte_literal_does_not_throw(self) -> None: - mock_writearea = mock.MagicMock() - mock_writearea.return_value = None - original = self.client._lib.Cli_WriteArea - self.client._lib.Cli_WriteArea = mock_writearea - - area = Area.DB - dbnumber = 1 - start = 1 - data = b"\xde\xad\xbe\xef" - - try: - self.client.write_area(area, dbnumber, start, bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_WriteArea = original - - def test_ab_write_with_byte_literal_does_not_throw(self) -> None: - mock_write = mock.MagicMock() - mock_write.return_value = None - original = self.client._lib.Cli_ABWrite - self.client._lib.Cli_ABWrite = mock_write - - start = 1 - data = b"\xde\xad\xbe\xef" - - try: - self.client.ab_write(start=start, data=bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_ABWrite = original - - def test_as_ab_write_with_byte_literal_does_not_throw(self) -> None: - mock_write = mock.MagicMock() - mock_write.return_value = None - original = self.client._lib.Cli_AsABWrite - self.client._lib.Cli_AsABWrite = mock_write - - start = 1 - data = b"\xde\xad\xbe\xef" - - try: - self.client.as_ab_write(start=start, data=bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_AsABWrite = original - - def test_as_db_write_with_byte_literal_does_not_throw(self) -> None: - mock_write = mock.MagicMock() - mock_write.return_value = None - original = self.client._lib.Cli_AsDBWrite - self.client._lib.Cli_AsDBWrite = mock_write - data = b"\xde\xad\xbe\xef" - - try: - self.client.db_write(db_number=1, start=0, data=bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_AsDBWrite = original - - def test_as_download_with_byte_literal_does_not_throw(self) -> None: - mock_download = mock.MagicMock() - mock_download.return_value = None - original = self.client._lib.Cli_AsDownload - self.client._lib.Cli_AsDownload = mock_download - data = b"\xde\xad\xbe\xef" - - try: - self.client.as_download(block_num=db_number, data=bytearray(data)) - except TypeError as e: - self.fail(str(e)) - finally: - self.client._lib.Cli_AsDownload = original - def test_get_plc_time(self) -> None: self.assertAlmostEqual(datetime.now().replace(microsecond=0), self.client.get_plc_datetime(), delta=timedelta(seconds=1)) def test_set_plc_datetime(self) -> None: new_dt = datetime(2011, 1, 1, 1, 1, 1, 0) self.client.set_plc_datetime(new_dt) - # Can't actual set datetime in emulated PLC, get_plc_datetime always returns system time. + # Can't actually set datetime in emulated PLC, get_plc_datetime always returns system time. # self.assertEqual(new_dt, self.client.get_plc_datetime()) def test_wait_as_completion_pass(self, timeout: int = 1000) -> None: diff --git a/tests/test_mainloop.py b/tests/test_mainloop.py index 8ca2ac80..ba2bc334 100644 --- a/tests/test_mainloop.py +++ b/tests/test_mainloop.py @@ -47,15 +47,6 @@ def tearDown(self) -> None: self.client.disconnect() self.client.destroy() - def test_read_prefill_db(self) -> None: - buffer = bytearray([0b11111111]) - self.client.db_write(0, 0, buffer) - data = self.client.db_read(0, 0, 7) - boolean = get_bool(data, 0, 0) - self.assertEqual(boolean, True) - integer = get_int(data, 0) - self.assertEqual(integer, -256) - def test_read_booleans(self) -> None: data = self.client.db_read(0, 0, 1) self.assertEqual(False, get_bool(data, 0, 0)) diff --git a/tests/test_partner.py b/tests/test_partner.py index 51401371..f9ef4d4f 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -24,11 +24,9 @@ def tearDown(self) -> None: def test_as_b_send(self) -> None: self.partner.as_b_send() - def test_b_recv(self) -> None: - self.partner.b_recv() - - def test_b_send(self) -> None: + def test_b_send_recv(self) -> None: self.partner.b_send() + self.partner.b_recv() def test_check_as_b_recv_completion(self) -> None: self.partner.check_as_b_recv_completion()