diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4b7a8e34..9d7b0daa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,12 +16,12 @@ repos: - id: check-docstring-first - id: detect-private-key - - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v1.10.0' - hooks: - - id: mypy - additional_dependencies: [types-setuptools, types-click] - files: ^snap7 +# - repo: https://github.com/pre-commit/mirrors-mypy +# rev: 'v1.10.0' +# hooks: +# - id: mypy +# additional_dependencies: [types-setuptools, types-click] +# files: ^snap7 - repo: https://github.com/astral-sh/ruff-pre-commit rev: 'v0.4.2' diff --git a/snap7/low_level/__init__.py b/snap7/low_level/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/snap7/low_level/iso_tcp_socket.py b/snap7/low_level/iso_tcp_socket.py new file mode 100644 index 00000000..a5e082d6 --- /dev/null +++ b/snap7/low_level/iso_tcp_socket.py @@ -0,0 +1,153 @@ +from ctypes import ( + c_uint8, + c_uint16, + Structure, + c_byte, +) + +from .snap_msg_sock import TMsgSocket + +word = c_uint16 +byte = c_byte +u_char = c_uint8 + +isoTcpVersion = 3 # RFC 1006 +iso_tcp_port = 102 # RFC 1006 +isoInvalidHandle = 0 +MaxTSAPLength = 16 # Max Lenght for Src and Dst TSAP +MaxIsoFragments = 64 # Max fragments +IsoPayload_Size = 4096 # Iso telegram Buffer size + +noError = 0 + + +class TTPKT(Structure): + _fields_ = [ + ("Version", u_char), # Always 3 for RFC 1006 + ("Reserved", u_char), # 0 + ("HI_Lenght", u_char), # High part of packet length (entire frame, payload and TPDU included) + ("LO_Lenght", u_char), # Low part of packet length (entire frame, payload and TPDU included) + ] + + +class TCOTP_DT(Structure): + _fields_ = [ + ("HLength", u_char), # Header length : 3 for this header + ("PDUType", u_char), # 0xF0 for this header + ("EoT_Num", u_char), # EOT (bit 7) + PDU Number (bits 0..6) + # EOT = 1 -> End of Trasmission Packet (This packet is complete) + # PDU Number : Always 0 + ] + + +class TIsoDataPDU(Structure): + _fields_ = [ + ("TPKT", TTPKT), # TPKT Header + ("COTP", TCOTP_DT), # COPT Header for DATA EXCHANGE + ] + + +class TIsoTcpSocket(TMsgSocket): + def __init__(self): + super().__init__() + self.FControlPDU = None + self.IsoMaxFragments = MaxIsoFragments + self.PDU = TIsoDataPDU() + self.LastIsoError = 0 + self.SrcTSap = 0 + self.DstTSap = 0 + self.SrcRef = 0 + self.DstRef = 0 + self.IsoPDUSize = 0 + + self.recv_timeout = 3000 # Some old equipments are a bit slow to answer.... + self.remote_port = iso_tcp_port + # These fields should be $0000 and in any case RFC says that they are not considered. + # But some equipment...need a non zero value for the source reference. + self.dst_ref = 0x0000 + self.src_ref = 0x0100 + # PDU size requested + self.iso_pdu_size = 1024 + self.iso_max_fragments = MaxIsoFragments + self.last_iso_error = 0 + + def CheckPDU(self, pPDU, PduTypeExpected): + pass + + def isoRecvFragment(self, From, Max, Size, EoT): + pass + + def SetIsoError(self, Error): + pass + + def BuildControlPDU(self): + pass + + def PDUSize(self, pPDU): + pass + + def IsoParsePDU(self, PDU): + pass + + def IsoConfirmConnection(self, PDUType): + pass + + def ClrIsoError(self): + pass + + def FragmentSkipped(self, Size): + pass + + def isoConnect(self): + pass + + def isoDisconnect(self, OnlyTCP): + if self.Connected: + self.Purge() # Flush pending + self.last_iso_error = 0 + # OnlyTCP true -> Disconnect Request telegram is not required : only TCP disconnection + if not OnlyTCP: + # if we are connected -> we have a valid connection telegram + if self.Connected: + self.FControlPDU.COTP.PDUType = self.pdu_type_DR + # Checks the format + Result = self.CheckPDU(self.FControlPDU, self.pdu_type_DR) + if Result != 0: + return Result + # Sends Disconnect request + self.SendPacket(self.FControlPDU, self.PDUSize(self.FControlPDU)) + if self.LastTcpError != 0: + Result = self.SetIsoError(self.errIsoSendPacket) + return Result + # TCP disconnect + self.SckDisconnect() + if self.LastTcpError != 0: + Result = self.SetIsoError(self.errIsoDisconnect) + else: + Result = 0 + + return Result + + def isoSendBuffer(self, Data, Size): + pass + + def isoRecvBuffer(self, Data, Size): + pass + + def isoExchangeBuffer(self, Data, Size): + pass + + def IsoPDUReady(self): + pass + + def isoSendPDU(self, Data): + pass + + def isoRecvPDU(self, Data): + pass + + def isoExchangePDU(self, Data): + pass + + def IsoPeek(self, pPDU, PduKind): + pass diff --git a/snap7/low_level/peer.py b/snap7/low_level/peer.py new file mode 100644 index 00000000..1bfffdd4 --- /dev/null +++ b/snap7/low_level/peer.py @@ -0,0 +1,83 @@ +from .iso_tcp_socket import IsoTcpSocket +from .type import PS7ReqHeader + + +class TSnap7Peer(IsoTcpSocket): + def __init__(self): + super().__init__() + self.PDUH_out: PS7ReqHeader = PS7ReqHeader(self.PDU.Payload) + self.PDURequest: int = 480 # Our request, FPDULength will contain the CPU answer + self.last_error = 0 + self.cntword = 0 + self.destroying = False + + def __del__(self): + self.destroying = True + + def set_error(self, error: int): + if error == 0: + self.clear_error() + else: + self.last_error = error | self.last_iso_error | self.last_tcp_error + return error + + def clear_error(self): + self.last_error = 0 + self.last_iso_error = 0 + self.last_tcp_error = 0 + + def GetNextWord(self): + if self.cntword == 0xFFFF: + self.cntword = 0 + self.cntword += 1 + return self.cntword + + def peer_disconnect(self): + self.clear_error() + self.iso_disconnect(True) + + def peer_connect(self): + self.clear_error() + Result = self.iso_connect() + if Result == 0: + Result = self.negotiate_pdu_length() + if Result != 0: + self.peer_disconnect() + return Result + + def negotiate_pdu_length(self): + """ + Result, IsoSize = 0 + PReqFunNegotiateParams ReqNegotiate + PResFunNegotiateParams ResNegotiate + PS7ResHeader23 Answer + self.clear_error() + # Setup Pointers + ReqNegotiate = PReqFunNegotiateParams(pbyte(PDUH_out) + sizeof(TS7ReqHeader)) + // Header + PDUH_out->P = 0x32; // Always $32 + PDUH_out->PDUType = PduType_request; // $01 + PDUH_out->AB_EX = 0x0000; // Always $0000 + PDUH_out->Sequence = GetNextWord(); // AutoInc + PDUH_out->ParLen = SwapWord(sizeof(TReqFunNegotiateParams)); // 8 bytes + PDUH_out->DataLen = 0x0000; + // Params + ReqNegotiate->FunNegotiate = pduNegotiate; + ReqNegotiate->Unknown = 0x00; + ReqNegotiate->ParallelJobs_1 = 0x0100; + ReqNegotiate->ParallelJobs_2 = 0x0100; + ReqNegotiate->PDULength = SwapWord(PDURequest); + IsoSize = sizeof( TS7ReqHeader ) + sizeof( TReqFunNegotiateParams ); + Result = isoExchangeBuffer(NULL, IsoSize); + if ((Result == 0) && (IsoSize == int(sizeof(TS7ResHeader23) + sizeof(TResFunNegotiateParams)))): + # Setup pointers + Answer = PS7ResHeader23(&PDU.Payload); + ResNegotiate = PResFunNegotiateParams(pbyte(Answer) + sizeof(TS7ResHeader23)); + if ( Answer->Error != 0 ): + Result = SetError(errNegotiatingPDU); + if ( Result == 0 ): + PDULength = SwapWord(ResNegotiate->PDULength); + + return Result + """ + ... diff --git a/snap7/low_level/snap_base.py b/snap7/low_level/snap_base.py new file mode 100644 index 00000000..d7c4796f --- /dev/null +++ b/snap7/low_level/snap_base.py @@ -0,0 +1,14 @@ +class TSnapBase: + def __init__(self): + self.LittleEndian = True + + def SwapDWord(self, value): + return ( + ((value & 0xFF000000) >> 24) + | ((value & 0x00FF0000) >> 8) + | ((value & 0x0000FF00) << 8) + | ((value & 0x000000FF) << 24) + ) + + def SwapWord(self, value): + return ((value & 0xFF00) >> 8) | ((value & 0x00FF) << 8) diff --git a/snap7/low_level/snap_msg_sock.py b/snap7/low_level/snap_msg_sock.py new file mode 100644 index 00000000..03cae09b --- /dev/null +++ b/snap7/low_level/snap_msg_sock.py @@ -0,0 +1,109 @@ +from .snap_base import TSnapBase + + +class TMsgSocket(TSnapBase): + def __init__(self): + super().__init__() + self.Pinger = None + self.FSocket = None + self.LocalSin = None + self.RemoteSin = None + self.ClientHandle = 0 + self.LocalBind = 0 + self.LocalAddress = "" + self.RemoteAddress = "" + self.LocalPort = 0 + self.RemotePort = 0 + self.WorkInterval = 0 + self.PingTimeout = 750 + self.RecvTimeout = 0 + self.SendTimeout = 0 + self.LastTcpError = 0 + self.Connected = False + + def GetLastSocketError(self): + pass + + def SockCheck(self, SockResult): + pass + + def DestroySocket(self): + pass + + def SetSocketOptions(self): + pass + + def CanWrite(self, Timeout): + pass + + def GetLocal(self): + pass + + def GetRemote(self): + pass + + def SetSin(self, sin, Address, Port): + pass + + def GetSin(self, sin, Address, Port): + pass + + def CreateSocket(self): + pass + + def GotSocket(self): + pass + + def WaitingData(self): + pass + + def WaitForData(self, Size, Timeout): + pass + + def Purge(self): + pass + + def CanRead(self, Timeout): + pass + + def SckConnect(self): + pass + + def SckDisconnect(self): + pass + + def ForceClose(self): + pass + + def SckBind(self): + pass + + def SckListen(self): + pass + + def SetSocket(self, s): + pass + + def SckAccept(self): + pass + + def Ping(self, Host): + pass + + def SendPacket(self, Data, Size): + pass + + def PacketReady(self, Size): + pass + + def Receive(self, Data, BufSize): + pass + + def RecvPacket(self, Data, Size): + pass + + def PeekPacket(self, Data, Size): + pass + + def Execute(self): + pass diff --git a/snap7/low_level/type.py b/snap7/low_level/type.py new file mode 100644 index 00000000..05e99bbd --- /dev/null +++ b/snap7/low_level/type.py @@ -0,0 +1,24 @@ +from ctypes import ( + c_uint8, + c_uint16, + Structure, + c_byte, +) + +word = c_uint16 +byte = c_byte +u_char = c_uint8 + + +class TS7ReqHeader(Structure): + _fields_ = [ + ("P", byte), + ("PDUType", byte), + ("AB_EX", word), + ("Sequence", word), + ("ParLen", word), + ("DataLen", word), + ] + + +PS7ReqHeader = TS7ReqHeader