Skip to content

Commit

Permalink
add first low level code
Browse files Browse the repository at this point in the history
  • Loading branch information
gijzelaerr committed Nov 2, 2024
1 parent ad5bf4e commit d8916d7
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 6 deletions.
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ repos:
- id: check-docstring-first
- id: detect-private-key

- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.10.0'
hooks:
- id: mypy
additional_dependencies: [types-setuptools, types-click]
files: ^snap7
# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: 'v1.10.0'
# hooks:
# - id: mypy
# additional_dependencies: [types-setuptools, types-click]
# files: ^snap7

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.4.2'
Expand Down
Empty file added snap7/low_level/__init__.py
Empty file.
153 changes: 153 additions & 0 deletions snap7/low_level/iso_tcp_socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from ctypes import (
c_uint8,
c_uint16,
Structure,
c_byte,
)

from .snap_msg_sock import TMsgSocket

word = c_uint16
byte = c_byte
u_char = c_uint8

isoTcpVersion = 3 # RFC 1006
iso_tcp_port = 102 # RFC 1006
isoInvalidHandle = 0
MaxTSAPLength = 16 # Max Lenght for Src and Dst TSAP
MaxIsoFragments = 64 # Max fragments
IsoPayload_Size = 4096 # Iso telegram Buffer size

noError = 0


class TTPKT(Structure):
_fields_ = [
("Version", u_char), # Always 3 for RFC 1006
("Reserved", u_char), # 0
("HI_Lenght", u_char), # High part of packet length (entire frame, payload and TPDU included)
("LO_Lenght", u_char), # Low part of packet length (entire frame, payload and TPDU included)
]


class TCOTP_DT(Structure):
_fields_ = [
("HLength", u_char), # Header length : 3 for this header
("PDUType", u_char), # 0xF0 for this header
("EoT_Num", u_char), # EOT (bit 7) + PDU Number (bits 0..6)
# EOT = 1 -> End of Trasmission Packet (This packet is complete)
# PDU Number : Always 0
]


class TIsoDataPDU(Structure):
_fields_ = [
("TPKT", TTPKT), # TPKT Header
("COTP", TCOTP_DT), # COPT Header for DATA EXCHANGE
]


class TIsoTcpSocket(TMsgSocket):
def __init__(self):
super().__init__()
self.FControlPDU = None
self.IsoMaxFragments = MaxIsoFragments
self.PDU = TIsoDataPDU()
self.LastIsoError = 0
self.SrcTSap = 0
self.DstTSap = 0
self.SrcRef = 0
self.DstRef = 0
self.IsoPDUSize = 0

self.recv_timeout = 3000 # Some old equipments are a bit slow to answer....
self.remote_port = iso_tcp_port
# These fields should be $0000 and in any case RFC says that they are not considered.
# But some equipment...need a non zero value for the source reference.
self.dst_ref = 0x0000
self.src_ref = 0x0100
# PDU size requested
self.iso_pdu_size = 1024
self.iso_max_fragments = MaxIsoFragments
self.last_iso_error = 0

def CheckPDU(self, pPDU, PduTypeExpected):
pass

def isoRecvFragment(self, From, Max, Size, EoT):
pass

def SetIsoError(self, Error):
pass

def BuildControlPDU(self):
pass

def PDUSize(self, pPDU):
pass

def IsoParsePDU(self, PDU):
pass

def IsoConfirmConnection(self, PDUType):
pass

def ClrIsoError(self):
pass

def FragmentSkipped(self, Size):
pass

def isoConnect(self):
pass

def isoDisconnect(self, OnlyTCP):
if self.Connected:
self.Purge() # Flush pending
self.last_iso_error = 0
# OnlyTCP true -> Disconnect Request telegram is not required : only TCP disconnection
if not OnlyTCP:
# if we are connected -> we have a valid connection telegram
if self.Connected:
self.FControlPDU.COTP.PDUType = self.pdu_type_DR
# Checks the format
Result = self.CheckPDU(self.FControlPDU, self.pdu_type_DR)
if Result != 0:
return Result
# Sends Disconnect request
self.SendPacket(self.FControlPDU, self.PDUSize(self.FControlPDU))
if self.LastTcpError != 0:
Result = self.SetIsoError(self.errIsoSendPacket)
return Result
# TCP disconnect
self.SckDisconnect()
if self.LastTcpError != 0:
Result = self.SetIsoError(self.errIsoDisconnect)
else:
Result = 0

return Result

def isoSendBuffer(self, Data, Size):
pass

def isoRecvBuffer(self, Data, Size):
pass

def isoExchangeBuffer(self, Data, Size):
pass

def IsoPDUReady(self):
pass

def isoSendPDU(self, Data):
pass

def isoRecvPDU(self, Data):
pass

def isoExchangePDU(self, Data):
pass

def IsoPeek(self, pPDU, PduKind):
pass
83 changes: 83 additions & 0 deletions snap7/low_level/peer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from .iso_tcp_socket import IsoTcpSocket
from .type import PS7ReqHeader


class TSnap7Peer(IsoTcpSocket):
def __init__(self):
super().__init__()
self.PDUH_out: PS7ReqHeader = PS7ReqHeader(self.PDU.Payload)
self.PDURequest: int = 480 # Our request, FPDULength will contain the CPU answer
self.last_error = 0
self.cntword = 0
self.destroying = False

def __del__(self):
self.destroying = True

def set_error(self, error: int):
if error == 0:
self.clear_error()
else:
self.last_error = error | self.last_iso_error | self.last_tcp_error
return error

def clear_error(self):
self.last_error = 0
self.last_iso_error = 0
self.last_tcp_error = 0

def GetNextWord(self):
if self.cntword == 0xFFFF:
self.cntword = 0
self.cntword += 1
return self.cntword

def peer_disconnect(self):
self.clear_error()
self.iso_disconnect(True)

def peer_connect(self):
self.clear_error()
Result = self.iso_connect()
if Result == 0:
Result = self.negotiate_pdu_length()
if Result != 0:
self.peer_disconnect()
return Result

def negotiate_pdu_length(self):
"""
Result, IsoSize = 0
PReqFunNegotiateParams ReqNegotiate
PResFunNegotiateParams ResNegotiate
PS7ResHeader23 Answer
self.clear_error()
# Setup Pointers
ReqNegotiate = PReqFunNegotiateParams(pbyte(PDUH_out) + sizeof(TS7ReqHeader))
// Header
PDUH_out->P = 0x32; // Always $32
PDUH_out->PDUType = PduType_request; // $01
PDUH_out->AB_EX = 0x0000; // Always $0000
PDUH_out->Sequence = GetNextWord(); // AutoInc
PDUH_out->ParLen = SwapWord(sizeof(TReqFunNegotiateParams)); // 8 bytes
PDUH_out->DataLen = 0x0000;
// Params
ReqNegotiate->FunNegotiate = pduNegotiate;
ReqNegotiate->Unknown = 0x00;
ReqNegotiate->ParallelJobs_1 = 0x0100;
ReqNegotiate->ParallelJobs_2 = 0x0100;
ReqNegotiate->PDULength = SwapWord(PDURequest);
IsoSize = sizeof( TS7ReqHeader ) + sizeof( TReqFunNegotiateParams );
Result = isoExchangeBuffer(NULL, IsoSize);
if ((Result == 0) && (IsoSize == int(sizeof(TS7ResHeader23) + sizeof(TResFunNegotiateParams)))):
# Setup pointers
Answer = PS7ResHeader23(&PDU.Payload);
ResNegotiate = PResFunNegotiateParams(pbyte(Answer) + sizeof(TS7ResHeader23));
if ( Answer->Error != 0 ):
Result = SetError(errNegotiatingPDU);
if ( Result == 0 ):
PDULength = SwapWord(ResNegotiate->PDULength);
return Result
"""
...
14 changes: 14 additions & 0 deletions snap7/low_level/snap_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class TSnapBase:
def __init__(self):
self.LittleEndian = True

def SwapDWord(self, value):
return (
((value & 0xFF000000) >> 24)
| ((value & 0x00FF0000) >> 8)
| ((value & 0x0000FF00) << 8)
| ((value & 0x000000FF) << 24)
)

def SwapWord(self, value):
return ((value & 0xFF00) >> 8) | ((value & 0x00FF) << 8)
109 changes: 109 additions & 0 deletions snap7/low_level/snap_msg_sock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from .snap_base import TSnapBase


class TMsgSocket(TSnapBase):
def __init__(self):
super().__init__()
self.Pinger = None
self.FSocket = None
self.LocalSin = None
self.RemoteSin = None
self.ClientHandle = 0
self.LocalBind = 0
self.LocalAddress = ""
self.RemoteAddress = ""
self.LocalPort = 0
self.RemotePort = 0
self.WorkInterval = 0
self.PingTimeout = 750
self.RecvTimeout = 0
self.SendTimeout = 0
self.LastTcpError = 0
self.Connected = False

def GetLastSocketError(self):
pass

def SockCheck(self, SockResult):
pass

def DestroySocket(self):
pass

def SetSocketOptions(self):
pass

def CanWrite(self, Timeout):
pass

def GetLocal(self):
pass

def GetRemote(self):
pass

def SetSin(self, sin, Address, Port):
pass

def GetSin(self, sin, Address, Port):
pass

def CreateSocket(self):
pass

def GotSocket(self):
pass

def WaitingData(self):
pass

def WaitForData(self, Size, Timeout):
pass

def Purge(self):
pass

def CanRead(self, Timeout):
pass

def SckConnect(self):
pass

def SckDisconnect(self):
pass

def ForceClose(self):
pass

def SckBind(self):
pass

def SckListen(self):
pass

def SetSocket(self, s):
pass

def SckAccept(self):
pass

def Ping(self, Host):
pass

def SendPacket(self, Data, Size):
pass

def PacketReady(self, Size):
pass

def Receive(self, Data, BufSize):
pass

def RecvPacket(self, Data, Size):
pass

def PeekPacket(self, Data, Size):
pass

def Execute(self):
pass
Loading

0 comments on commit d8916d7

Please sign in to comment.