-
Notifications
You must be signed in to change notification settings - Fork 36
/
pyPlcHomeplug.py
1279 lines (1194 loc) · 67.2 KB
/
pyPlcHomeplug.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Preconditions:
# Library pcap-ct (not libpcap, not pylibpcap, not pypcap)
#
# Version 2022-08-14:
# - Selection of interfaces ok
# - Sniffing of the SLAC-request ok
# - Transmission of a demo message ok
#
# Test results 2022-10-15
# 1. GET_SW.REQ broadcast is answered by both TPlinks, while the first is connected to eth, the other via PLC at the first.
# 2. Step 1 works also, if we use a different MAC address than the original laptop ethernet MAC.
# 3. CM_SET_KEY addressed to the correct destination works, with the following results:
# - there were cases, when the TPlink responded "negative", but with a "valid" none (each time it used a new mynonce, and correctly
# reflected our mynonce.
# - but also there is "positive" response, also with correct nonces.
# - also the Devolo reponds positive and with correct nonces.
# - not yet checked, whether the NMK is really set
# 4. CM_GET_KEY with wrong NID is refused by the devolo. But at least it delivers the NID (e.g. d57c1fe9544e01), and if we use this NID in the next
# request, it responds with KEY_GRANTED, Keytype=NMK, and a key 7f19ba0261892d59b7ea42aed875d2320000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
# This key delivers exactly the NMK, which we have set in the CM_SET_KEY.
# But: There are some pitfalls:
# The CM_SET_KEY responds in some cases (wrong request) positive, but does not apply the NMK.
# If the CM_SET_KEY is well-formatted, including the correct NID, we get a false-negative response, and
# we see the LEDs on the adaptor shortly going completely off, completely on, and back to normal state. This
# is the sign, that the new key was accepted. It means, the
# adaptor is making a reset, to apply the new key.
# 5. CM_SET_KEY and CM_GET_KEY works also when sent to broadcast address. For both, devolo and tpLink.
# 2022-10-18 further tests
# 6. The devolo reports the SLAC_PARAM from the standalone-IONIQ to the wirkshark. Even in the case, when the devolo is paired to a tpLink.
# 7. The tpLink does NOT report the SLAC_PARAM to ethernet. Bad.
# 8. The tpLink has software from 2017, maybe the SLAC was removed at this version.
# 9. Article regarding firmware- and configuration update: https://fitzcarraldoblog.wordpress.com/2020/07/22/updating-the-powerline-adapters-in-my-home-network/
import pcap
import pyPlcIpv6
import udplog
import time
import os
from helpers import * # prettyMac etc
from pyPlcModes import *
from mytestsuite import *
from random import random
from configmodule import getConfigValue, getConfigValueBool
from datetime import datetime
import sys
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
CM_SET_KEY = 0x6008
CM_GET_KEY = 0x600C
CM_SC_JOIN = 0x6010
CM_CHAN_EST = 0x6014
CM_TM_UPDATE = 0x6018
CM_AMP_MAP = 0x601C
CM_BRG_INFO = 0x6020
CM_CONN_NEW = 0x6024
CM_CONN_REL = 0x6028
CM_CONN_MOD = 0x602C
CM_CONN_INFO = 0x6030
CM_STA_CAP = 0x6034
CM_NW_INFO = 0x6038
CM_GET_BEACON = 0x603C
CM_HFID = 0x6040
CM_MME_ERROR = 0x6044
CM_NW_STATS = 0x6048
CM_SLAC_PARAM = 0x6064
CM_START_ATTEN_CHAR = 0x6068
CM_ATTEN_CHAR = 0x606C
CM_PKCS_CERT = 0x6070
CM_MNBC_SOUND = 0x6074
CM_VALIDATE = 0x6078
CM_SLAC_MATCH = 0x607C
CM_SLAC_USER_DATA = 0x6080
CM_ATTEN_PROFILE = 0x6084
CM_GET_SW = 0xA000
MMTYPE_REQ = 0x0000
MMTYPE_CNF = 0x0001
MMTYPE_IND = 0x0002
MMTYPE_RSP = 0x0003
STATE_INITIAL = 0
STATE_MODEM_SEARCH_ONGOING = 1
STATE_READY_FOR_SLAC = 2
STATE_WAITING_FOR_MODEM_RESTARTED = 3
STATE_WAITING_FOR_SLAC_PARAM_CNF = 4
STATE_SLAC_PARAM_CNF_RECEIVED = 5
STATE_BEFORE_START_ATTEN_CHAR = 6
STATE_SOUNDING = 7
STATE_WAIT_FOR_ATTEN_CHAR_IND = 8
STATE_ATTEN_CHAR_IND_RECEIVED = 9
STATE_DELAY_BEFORE_MATCH = 10
STATE_WAITING_FOR_SLAC_MATCH_CNF = 11
STATE_WAITING_FOR_RESTART2 = 12
STATE_FIND_MODEMS2 = 13
STATE_WAITING_FOR_SW_VERSIONS = 14
STATE_READY_FOR_SDP = 15
STATE_SDP = 16
class pyPlcHomeplug():
def showIpAddresses(self, mybytearray):
addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4))
self.addToTrace('SRC %-16s\tDST %-16s' % (addr(mybytearray, self.sniffer.dloff + 12), addr(mybytearray, self.sniffer.dloff + 16)))
def showMacAddresses(self, mybytearray):
strDestMac = ""
for i in range(0, 6):
strDestMac = strDestMac + twoCharHex(mybytearray[i]) + ":"
strSourceMac = ""
for i in range(5, 12):
strSourceMac = strSourceMac + twoCharHex(mybytearray[i]) + ":"
lastThreeOfSource = mybytearray[6]*256*256 + mybytearray[7]*256 + mybytearray[8]
strSourceFriendlyName = ""
if (lastThreeOfSource == 0x0a663a):
strSourceFriendlyName="Fritzbox"
if (lastThreeOfSource == 0x0064c3):
strSourceFriendlyName="Ioniq"
self.addToTrace("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac)
def getSourceMacAddressAsString(self):
strSourceMac = ""
for i in range(6, 12):
strSourceMac = strSourceMac + twoCharHex(self.myreceivebuffer[i])
if (i<11):
strSourceMac = strSourceMac + ":"
return strSourceMac
def getEtherType(self, messagebufferbytearray):
etherType=0
if len(messagebufferbytearray)>(6+6+2):
etherType=messagebufferbytearray[12]*256 + messagebufferbytearray[13]
return etherType
def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC
# we can give a different offset, to re-use the MAC also in the data area
for i in range(0, 6):
self.mytransmitbuffer[offset+i]=mac[i]
def fillDestinationMac(self, mac, offset=0): # at offset 0 in the ethernet frame, we have the destination MAC
# we can give a different offset, to re-use the MAC also in the data area
for i in range(0, 6):
self.mytransmitbuffer[offset+i]=mac[i]
def fillRunId(self, offset):
# at the given offset in the transmit buffer, fill the 8-bytes-RunId.
for i in range(0, 8):
self.mytransmitbuffer[offset+i]=self.pevRunId[i]
def cleanTransmitBuffer(self): # fill the complete ethernet transmit buffer with 0x00
for i in range(0, len(self.mytransmitbuffer)):
self.mytransmitbuffer[i]=0
def setNmkAt(self, index):
# sets the Network Membership Key (NMK) at a certain position in the transmit buffer
for i in range(0, 16):
if (self.iAmEvse):
# In EvseMode, the NMK is freely chosen:
self.mytransmitbuffer[index+i]=self.NMK_EVSE_random[i] # NMK
else:
# In PevMode, the NMK is the one which was received in the SlacMatchConf. Or a default, if we did not receive any.
self.mytransmitbuffer[index+i]=self.NMK[i] # NMK
def setNidAt(self, index):
# (b0f2e695666b03 was NID of TPlink)
# copies the network ID (NID, 7 bytes) into the wished position in the transmit buffer
for i in range(0, 7):
self.mytransmitbuffer[index+i]=self.NID[i]
def getManagementMessageType(self):
# calculates the MMTYPE (base value + lower two bits), see Table 11-2 of homeplug spec
return (self.myreceivebuffer[16]<<8) + self.myreceivebuffer[15]
def composeGetSwReq(self):
# GET_SW.REQ request, as used by the win10 laptop
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x00 # version
self.mytransmitbuffer[15]=0x00 # GET_SW.REQ
self.mytransmitbuffer[16]=0xA0 #
self.mytransmitbuffer[17]=0x00 # Vendor OUI
self.mytransmitbuffer[18]=0xB0 #
self.mytransmitbuffer[19]=0x52 #
def composeGetSwWithRamdomMac(self):
# GET_SW.REQ request, as used by the win10 laptop
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# patch the lower three bytes of the MAC with a random value
self.mytransmitbuffer[8] = self.randomMac & 0xff
self.mytransmitbuffer[9] = (self.randomMac>>16) & 0xff
self.mytransmitbuffer[10] = (self.randomMac>>8) & 0xff
self.mytransmitbuffer[11] = self.randomMac & 0xff
if (1):
if ((self.randomMac%16)==0):
self.fillSourceMac([0xb8, 0x27, 0xeb, 0xa3, 0xaf, 0x34 ])
if ((self.randomMac%16)==1):
self.fillSourceMac([0xb8, 0x27, 0xeb, 0x72, 0x66, 0x06 ])
self.randomMac += 1 # new MAC for the next round
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x00 # version
self.mytransmitbuffer[15]=0x00 # GET_SW.REQ
self.mytransmitbuffer[16]=0xA0 #
self.mytransmitbuffer[17]=0x00 # Vendor OUI
self.mytransmitbuffer[18]=0xB0 #
self.mytransmitbuffer[19]=0x52 #
def composeSetKey(self, variation=0):
# CM_SET_KEY.REQ request
# From example trace from catphish https://openinverter.org/forum/viewtopic.php?p=40558&sid=9c23d8c3842e95c4cf42173996803241#p40558
# Table 11-88 in the homeplug_av21_specification_final_public.pdf
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
#self.fillDestinationMac(MAC_DEVOLO_26)
#self.fillDestinationMac(MAC_TPLINK_E4)
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x08 # CM_SET_KEY.REQ
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # frag_index
self.mytransmitbuffer[18]=0x00 # frag_seqnum
self.mytransmitbuffer[19]=0x01 # 0 key info type
self.mytransmitbuffer[20]=0xaa # 1 my nonce
self.mytransmitbuffer[21]=0xaa # 2
self.mytransmitbuffer[22]=0xaa # 3
self.mytransmitbuffer[23]=0xaa # 4
self.mytransmitbuffer[24]=0x00 # 5 your nonce
self.mytransmitbuffer[25]=0x00 # 6
self.mytransmitbuffer[26]=0x00 # 7
self.mytransmitbuffer[27]=0x00 # 8
self.mytransmitbuffer[28]=0x04 # 9 nw info pid
self.mytransmitbuffer[29]=0x00 # 10 info prn
self.mytransmitbuffer[30]=0x00 # 11
self.mytransmitbuffer[31]=0x00 # 12 pmn
self.mytransmitbuffer[32]=0x00 # 13 cco cap
self.setNidAt(33) # 14-20 nid 7 bytes from 33 to 39
# Network ID to be associated with the key distributed herein.
# The 54 LSBs of this field contain the NID (refer to Section 3.4.3.1). The
# two MSBs shall be set to 0b00.
self.addToTrace("NID is " + prettyHexMessage(self.NID))
self.mytransmitbuffer[40]=0x01 # 21 peks (payload encryption key select) Table 11-83. 01 is NMK. We had 02 here, why???
# with 0x0F we could choose "no key, payload is sent in the clear"
self.setNmkAt(41)
self.mytransmitbuffer[41]+=variation # to try different NMKs
# and three remaining zeros
def composeGetKey(self):
# CM_GET_KEY.REQ request
# from https://github.com/uhi22/plctool2/blob/master/listen_to_eth.c
# and homeplug_av21_specification_final_public.pdf
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
#self.fillDestinationMac(MAC_DEVOLO_26)
#self.fillDestinationMac(MAC_TPLINK_E4)
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x0C # CM_GET_KEY.REQ https://github.com/uhi22/plctool2/blob/master/plc_homeplug.h
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # 0 Request Type 0=direct
self.mytransmitbuffer[20]=0x01 # 1 RequestedKeyType only "NMK" is permitted over the H1 interface.
# value see HomeplugAV2.1 spec table 11-89. 1 means AES-128.
self.setNidAt(21)# NID starts here (table 11-91 Homeplug spec is wrong. Verified by accepted command.)
self.mytransmitbuffer[28]=0xaa # 10-13 mynonce. The position at 28 is verified by the response of the devolo.
self.mytransmitbuffer[29]=0xaa #
self.mytransmitbuffer[30]=0xaa #
self.mytransmitbuffer[31]=0xaa #
self.mytransmitbuffer[32]=0x04 # 14 PID. According to ISO15118-3 fix value 4, "HLE protocol"
self.mytransmitbuffer[33]=0x00 # 15-16 PRN Protocol run number
self.mytransmitbuffer[34]=0x00 #
self.mytransmitbuffer[35]=0x00 # 17 PMN Protocol message number
def composeSlacParamReq(self):
# SLAC_PARAM request, as it was recorded 2021-12-17 WP charger 2
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.pevMac)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x64 # SLAC_PARAM.REQ
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 #
self.mytransmitbuffer[20]=0x00 #
self.fillRunId(21) # 21 to 28: 8 bytes runid. The Ioniq uses the PEV mac plus 00 00. Tesla uses "TESLA EV".
# rest is 00
def composeSlacParamCnf(self):
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(self.pevMac)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x65 # SLAC_PARAM.confirm
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0xff # 19-24 sound target
self.mytransmitbuffer[20]=0xff #
self.mytransmitbuffer[21]=0xff #
self.mytransmitbuffer[22]=0xff #
self.mytransmitbuffer[23]=0xff #
self.mytransmitbuffer[24]=0xff #
self.mytransmitbuffer[25]=0x0A # sound count
self.mytransmitbuffer[26]=0x06 # timeout
self.mytransmitbuffer[27]=0x01 # resptype
self.fillDestinationMac(self.pevMac, 28) # forwarding_sta, same as PEV MAC, plus 2 bytes 00 00
self.mytransmitbuffer[34]=0x00 #
self.mytransmitbuffer[35]=0x00 #
self.fillRunId(36) # 36 to 43 runid 8 bytes
# rest is 00
def composeSpecialMessage(self):
# special "homeplug" message, to control a hardware device.
# We re-purpose the ATTEN_CHAR.IND, because a AR4720 PEV modem is transparent for it also in unpaired state,
# and it contains a lot of space which can be used to transfer data. Also it is not expected to disturb the
# normal traffic, because it may be also caused by cross-coupling from an other charger, and the normal
# communication should be immune to such things.
self.mytransmitbuffer = bytearray(129)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x6E # ATTEN_CHAR.IND
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # security
self.fillDestinationMac(MAC_BROADCAST, 21) # The wireshark calls it source_mac, but alpitronic fills it with PEV mac.
self.fillRunId(27) # runid 8 bytes
self.mytransmitbuffer[35]=0x00 # 35 - 51 source_id, 17 bytes. The alpitronic fills it with 00
self.mytransmitbuffer[52]=0x00 # 52 - 68 response_id, 17 bytes. The alpitronic fills it with 00.
self.mytransmitbuffer[69]=0x0A # Number of sounds. 10 in normal case.
self.mytransmitbuffer[70]=0x3A # Number of groups = 58.
for i in range(71, 129): # 71 to 128: 58 special-purpose-bytes
self.mytransmitbuffer[i]=self.specialMessageTransmitBuffer[i-71]
def composeStartAttenCharInd(self):
# reference: see wireshark interpreted frame from ioniq
self.mytransmitbuffer = bytearray(60)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x6A # START_ATTEN_CHAR.IND
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # sectype
self.mytransmitbuffer[21]=0x0a # number of sounds: 10
self.mytransmitbuffer[22]=6 # timeout N*100ms. Normally 6, means in 600ms all sounds must have been tranmitted.
# Todo: As long we are a little bit slow, lets give 1000ms instead of 600, so that the
# charger is able to catch it all.
self.mytransmitbuffer[23]=0x01 # response type
self.fillSourceMac(self.myMAC, 24) # 24 to 29: sound_forwarding_sta, MAC of the PEV
self.fillRunId(30) # 30 to 37: runid 8 bytes
# rest is 00
def composeNmbcSoundInd(self):
# reference: see wireshark interpreted frame from Ioniq
self.mytransmitbuffer = bytearray(71)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_BROADCAST)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x76 # NMBC_SOUND.IND
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # sectype
self.mytransmitbuffer[21]=0x00 # 21 to 37 sender ID, all 00
self.mytransmitbuffer[38]=self.remainingNumberOfSounds # countdown. Remaining number of sounds. Starts with 9 and counts down to 0.
self.fillRunId(39) # 39 to 46: runid
self.mytransmitbuffer[47]=0x00 # 47 to 54: reserved, all 00
# 55 to 70: random number. All 0xff in the ioniq message.
for i in range(55, 71):
self.mytransmitbuffer[i]=0xFF
def composeAttenCharInd(self):
self.mytransmitbuffer = bytearray(129)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(self.pevMac)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x6E # ATTEN_CHAR.IND
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # security
self.fillDestinationMac(self.pevMac, 21) # The wireshark calls it source_mac, but alpitronic fills it with PEV mac. We use the PEV MAC.
self.fillRunId(27) # runid 8 bytes
self.mytransmitbuffer[35]=0x00 # 35 - 51 source_id, 17 bytes. The alpitronic fills it with 00
self.mytransmitbuffer[52]=0x00 # 52 - 68 response_id, 17 bytes. The alpitronic fills it with 00.
self.mytransmitbuffer[69]=0x0A # Number of sounds. 10 in normal case. Should this be more flexible, e.g. using the counter from first MNBC_SOUND?
self.mytransmitbuffer[70]=0x3A # Number of groups = 58. Should this be more flexible?
for i in range(71, 129): # 71 to 128: The group attenuation for the 58 announced groups.
self.mytransmitbuffer[i]=9 # Typical values are between 1 and 0x19. Since we have no real measurements from the AR7020,
# we just simulate something. 0 seems to be interpreted as "defect", the IONIQ does not send
# a positive response in this case.
# higher attenuation for the higher frequencies, to be a little bit realistic (real data from alpitronic trace)
self.mytransmitbuffer[126]=0x0f
self.mytransmitbuffer[127]=0x13
self.mytransmitbuffer[128]=0x19
def composeAttenCharRsp(self):
# reference: see wireshark interpreted frame from Ioniq
self.mytransmitbuffer = bytearray(70)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(self.evseMac)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x6F # ATTEN_CHAR.RSP
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # sectype
self.fillSourceMac(self.myMAC, 21) # 21 to 26: source MAC
self.fillRunId(27) # 27 to 34: runid
# 35 to 51: source_id, all 00
# 52 to 68: resp_id, all 00
# 69: result. 0 is ok
def composeSlacMatchReq(self):
# reference: see wireshark interpreted frame from Ioniq
self.mytransmitbuffer = bytearray(85)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(self.evseMac)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x7C # SLAC_MATCH.REQ
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # sectype
self.mytransmitbuffer[21]=0x3E # 21 to 22: length
self.mytransmitbuffer[22]=0x00 #
# 23 to 39: pev_id, all 00
self.fillSourceMac(self.myMAC, 40) # 40 to 45: PEV MAC
# 46 to 62: evse_id, all 00
self.fillDestinationMac(self.evseMac, 63) # 63 to 68: EVSE MAC
self.fillRunId(69) # 69 to 76: runid
# 77 to 84: reserved, all 00
def composeSlacMatchCnf(self):
self.mytransmitbuffer = bytearray(109)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(self.pevMac)
# Source MAC
self.fillSourceMac(self.myMAC)
# Protocol
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
self.mytransmitbuffer[13]=0xE1
self.mytransmitbuffer[14]=0x01 # version
self.mytransmitbuffer[15]=0x7D # SLAC_MATCH.CNF
self.mytransmitbuffer[16]=0x60 #
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
self.mytransmitbuffer[18]=0x00 #
self.mytransmitbuffer[19]=0x00 # apptype
self.mytransmitbuffer[20]=0x00 # security
self.mytransmitbuffer[21]=0x56 # length 2 byte
self.mytransmitbuffer[22]=0x00 #
# 23 - 39: pev_id 17 bytes. All zero in alpi/Ioniq trace.
self.fillDestinationMac(self.pevMac, 40) # 40 - 45 pev_mac
# 46 - 62: evse_id 17 bytes. All zero in alpi/Ioniq trace.
self.fillSourceMac(self.myMAC, 63) # 63 - 68 evse_mac
self.fillRunId(69) # runid 8 bytes 69-76 run_id. Is the ioniq mac plus 00 00.
# 77 to 84 reserved 0
self.setNidAt(85) # 85-91 NID. We can nearly freely choose this, but the upper two bits need to be zero
# 92 reserved 0
self.setNmkAt(93) # 93 to 108 NMK. We can freely choose this. Normally we should use a random number.
def sendTestFrame(self, selection):
if (selection=="1"):
self.composeSlacParamReq()
self.addToTrace("transmitting SLAC_PARAM.REQ...")
self.transmit(self.mytransmitbuffer)
if (selection=="2"):
self.composeSlacParamCnf()
self.addToTrace("transmitting SLAC_PARAM.CNF...")
self.transmit(self.mytransmitbuffer)
if (selection=="S"):
self.composeGetSwReq()
self.addToTrace("transmitting GetSwReq...")
self.transmit(self.mytransmitbuffer)
if (selection=="s"):
self.composeSetKey(0)
self.addToTrace("transmitting SET_KEY.REQ (key 0)")
self.transmit(self.mytransmitbuffer)
if (selection=="t"):
self.composeSetKey(2) # set key with modified content
self.addToTrace("transmitting SET_KEY.REQ (key 2)")
self.transmit(self.mytransmitbuffer)
if (selection=="G"):
self.composeGetKey()
self.addToTrace("transmitting GET_KEY")
self.transmit(self.mytransmitbuffer)
if (selection=="M"):
self.composeGetSwWithRamdomMac()
self.addToTrace("transmitting GetSwWithRamdomMac")
self.transmit(self.mytransmitbuffer)
if (selection=="5"):
self.sendSpecialMessageToControlThePowerSupply(20, 1)
if (selection=="6"):
self.sendSpecialMessageToControlThePowerSupply(100, 1)
if (selection=="7"):
self.sendSpecialMessageToControlThePowerSupply(200, 1)
if (selection=="8"):
self.sendSpecialMessageToControlThePowerSupply(300, 1)
if (selection=="9"):
self.sendSpecialMessageToControlThePowerSupply(400, 1)
if (selection=="0"):
self.sendSpecialMessageToControlThePowerSupply(0, 0)
def transmit(self, pkt):
self.sniffer.sendpacket(bytes(pkt))
def evaluateGetKeyCnf(self):
self.addToTrace("received GET_KEY.CNF")
self.numberOfFoundModems += 1
sourceMac=bytearray(6)
for i in range(0, 6):
sourceMac[i] = self.myreceivebuffer[6+i]
strMac=prettyMac(sourceMac)
result = self.myreceivebuffer[19] # 0 in case of success
if (result==0):
strResult="(OK)"
else:
strResult="(NOK)"
self.addToTrace("Modem #" + str(self.numberOfFoundModems) + " has " + strMac + " and result code is " + str(result) + strResult)
if (self.numberOfFoundModems>1):
self.addToTrace("Info: NOK is normal for remote modems.")
# We observed the following cases:
# (A) Result=1 (NOK), NID all 00, key all 00: We requested the key with the wrong NID.
# (B) Result=0 (OK), NID all 00, key non-zero: We used the correct NID for the request.
# It is the local TPlink adaptor. A fresh started non-coordinator, like the PEV side.
# (C) Result=0 (OK), NID non-zero, key non-zero: We used the correct NID for the request.
# It is the local TPlink adaptor.
# (D) Result=1 (NOK), NID non-zero, key all 00: It was a remote device. They are rejecting the GET_KEY.
if (result==0):
# The ok case is for sure the local modem. Let's store its data.
self.localModemMac = sourceMac
self.localModemCurrentKey=bytearray(16)
s=""
for i in range(0, 16): # NMK has 16 bytes
self.localModemCurrentKey[i] = self.myreceivebuffer[41+i]
s=s+hex(self.localModemCurrentKey[i])+ " "
self.addToTrace("The local modem has key " + s)
if (self.localModemCurrentKey == bytearray(self.NMKdevelopment)):
self.addToTrace("This is the developer NMK.")
self.isDeveloperLocalKey = 1
else:
self.addToTrace("This is NOT the developer NMK.")
s = ""
# The getkey response contains the Network ID (NID), even if the request was rejected. We store the NID,
# to have it available for the next request. Use case: A fresh started, unconnected non-Coordinator
# modem has the default-NID all 00. On the other hand, a fresh started coordinator has the
# NID which he was configured before. We want to be able to cover both cases. That's why we
# ask GET_KEY, it will tell the NID (even if response code is 1 (NOK), and we will use this
# received NID for the next request. This will be ansered positive (for the local modem).
for i in range(0, 7): # NID has 7 bytes
self.NID[i] = self.myreceivebuffer[29+i]
s=s+hex(self.NID[i])+ " "
self.addToTrace("From GetKeyCnf, got network ID (NID) " + s)
def evaluateSetKeyCnf(self):
# The Setkey confirmation
# In spec, the result 0 means "success". But in reality, the 0 means: did not work. When it works,
# then the LEDs are blinking (device is restarting), and the response is 1.
self.addToTrace("received SET_KEY.CNF")
result = self.myreceivebuffer[19]
if (result == 0):
self.addToTrace("SetKeyCnf says 0, this would be a bad sign for local modem, but normal for remote.")
else:
self.addToTrace("SetKeyCnf says " + str(result) + ", this is formally 'rejected', but indeed ok.")
self.publishStatus("modem is", "restarting")
self.connMgr.SlacOk()
def evaluateGetSwCnf(self):
# The GET_SW confirmation. This contains the software version of the homeplug modem.
# Reference: see wireshark interpreted frame from TPlink, Ioniq and Alpitronic charger
self.addToTrace("[SNIFFER] received GET_SW.CNF")
self.numberOfSoftwareVersionResponses+=1
sourceMac=bytearray(6)
for i in range(0, 6):
sourceMac[i] = self.myreceivebuffer[6+i]
strMac=prettyMac(sourceMac)
verLen = self.myreceivebuffer[22]
strVersion = ""
if ((verLen>0) and (verLen<0x30)):
for i in range(0, verLen):
x = self.myreceivebuffer[23+i]
if (x<0x20):
x=0x20 # make unprintable character to space.
strVersion+=chr(x) # convert ASCII code to string
self.addToTrace("[SNIFFER] For " + strMac + " the software version is " + strVersion)
def evaluateSlacParamReq(self):
# We received a SLAC_PARAM request from the PEV. This is the initiation of a SLAC procedure.
# We extract the pev MAC from it.
if (self.iAmEvse==1):
self.addToTrace("received SLAC_PARAM.REQ")
for i in range(0, 6):
self.pevMac[i] = self.myreceivebuffer[6+i]
self.addressManager.setPevMac(self.pevMac)
self.showStatus(prettyMac(self.pevMac), "pevmac")
# extract the RunId from the SlacParamReq, and store it for later use
for i in range(0, 8):
self.pevRunId[i] = self.myreceivebuffer[21+i]
# We are EVSE, we want to answer.
self.showStatus("SLAC started", "evseState")
self.composeSlacParamCnf()
self.addToTrace("[EVSE] transmitting CM_SLAC_PARAM.CNF")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
def evaluateSlacParamCnf(self):
# As PEV, we receive the first response from the charger.
self.addToTrace("Checkpoint102: received SLAC_PARAM.CNF")
if (self.iAmPev==1):
if (self.pevSequenceState==STATE_WAITING_FOR_SLAC_PARAM_CNF): # we were waiting for the SlacParamCnf
self.pevSequenceDelayCycles = 4 # original Ioniq is waiting 200ms
self.enterState(STATE_SLAC_PARAM_CNF_RECEIVED) # enter next state. Will be handled in the cyclic runPevSequencer
if ((self.iAmListener==1) or (self.iAmPev==1)):
# Take the MAC of the charger from the frame, and store it for later use.
for i in range(0, 6):
self.evseMac[i] = self.myreceivebuffer[6+i] # source MAC starts at offset 6
self.addressManager.setEvseMac(self.evseMac)
if getConfigValueBool("log_the_evse_mac_to_file"):
# Write the MAC address of the charger to a log file
self.addToTrace("SECC MAC is " + self.getSourceMacAddressAsString())
strDateTime=datetime.today().strftime('%Y-%m-%dT%H:%M:%S.%f')
MacLogFile = open('MacLog.txt', 'a')
MacLogFile.write(strDateTime + " SECC MAC " + self.getSourceMacAddressAsString() + "\n") # write the MAC to the MacLogFile
MacLogFile.close()
def evaluateMnbcSoundInd(self):
# We received MNBC_SOUND.IND from the PEV. Normally this happens 10times, with a countdown (remaining number of sounds)
# running from 9 to 0. If the countdown is 0, this is the last message. In case we are the EVSE, we need
# to answer with a ATTEN_CHAR.IND, which normally contains the attenuation for 10 sounds, 58 groups.
self.addToTrace("received MNBC_SOUND.IND")
if (self.iAmEvse==1):
self.showStatus("SLAC 2", "evseState")
countdown = self.myreceivebuffer[38]
if (countdown == 0):
self.composeAttenCharInd()
self.addToTrace("[EVSE] transmitting ATTEN_CHAR.IND")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
def evaluateStartAttenCharInd(self):
# self.addToTrace("received START_ATTEN_CHAR.IND")
# nothing to do as PEV or EVSE.
# interpretation just in case we use it as special message in EVSE mode to get information from the power supply
if (self.iAmEvse==1):
if ((self.myreceivebuffer[38] == 0xDC) and (self.myreceivebuffer[39] == 0x55) and (self.myreceivebuffer[40] == 0xAA)):
uPresent = self.myreceivebuffer[43]
uPresent*=256
uPresent+=self.myreceivebuffer[44]
uPresent/=10 # scaling in the message is 0.1V
self.callbackShowStatus(str(uPresent), "PowerSupplyUPresent")
# Todo: evaluate other information of the power supply, like cable check result, current, temperature, ...
def evaluateAttenCharInd(self):
self.addToTrace("received ATTEN_CHAR.IND")
if (self.iAmPev==1):
self.addToTrace("[PEVSLAC] received AttenCharInd in state " + str(self.pevSequenceState))
if (self.pevSequenceState==STATE_WAIT_FOR_ATTEN_CHAR_IND): # we were waiting for the AttenCharInd
# todo: Handle the case when we receive multiple responses from different chargers.
# Wait a certain time, and compare the attenuation profiles. Decide for the nearest charger.
self.AttenCharIndNumberOfSounds = self.myreceivebuffer[69]
self.addToTrace("[PEVSLAC] number of sounds reported by the EVSE (should be 10): " + str(self.AttenCharIndNumberOfSounds))
self.composeAttenCharRsp()
self.addToTrace("[PEVSLAC] transmitting ATTEN_CHAR.RSP...")
self.transmit(self.mytransmitbuffer)
self.pevSequenceState=STATE_ATTEN_CHAR_IND_RECEIVED # enter next state. Will be handled in the cyclic runPevSequencer
def evaluateSlacMatchReq(self):
# We received SLAC_MATCH.REQ from the PEV.
# If we are EVSE, we send the response.
self.addToTrace("received SLAC_MATCH.REQ")
if (self.iAmEvse==1):
self.showStatus("SLAC match", "evseState")
self.composeSlacMatchCnf()
self.addToTrace("[EVSE] transmitting SLAC_MATCH.CNF")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
def evaluateSlacMatchCnf(self):
# The SLAC_MATCH.CNF contains the NMK and the NID.
# We extract this information, so that we can use it for the CM_SET_KEY afterwards.
# References: https://github.com/qca/open-plc-utils/blob/master/slac/evse_cm_slac_match.c
# 2021-12-16_HPC_säule1_full_slac.pcapng
if (self.iAmEvse==1):
# If we are EVSE, nothing to do. We have sent the match.CNF by our own.
# The SET_KEY was already done at startup.
pass
else:
self.addToTrace("received SLAC_MATCH.CNF")
s = ""
for i in range(0, 7): # NID has 7 bytes
self.NID[i] = self.myreceivebuffer[85+i]
s=s+hex(self.NID[i])+ " "
self.addToTrace("From SlacMatchCnf, got network ID (NID) " + s)
s = ""
for i in range(0, 16):
self.NMK[i] = self.myreceivebuffer[93+i]
s=s+hex(self.NMK[i])+ " "
self.addToTrace("From SlacMatchCnf, got network membership key (NMK) " + s)
if (self.iAmPev==1):
# use the extracted NMK and NID to set the key in the adaptor:
self.composeSetKey(0)
self.addToTrace("Checkpoint170: transmitting CM_SET_KEY.REQ")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
if (self.pevSequenceState==STATE_WAITING_FOR_SLAC_MATCH_CNF): # we were waiting for finishing the SLAC_MATCH.CNF and SET_KEY.REQ
if (self.isSimulationMode!=0):
# In simulation mode, we pretend a successful SetKey response:
self.connMgr.SlacOk()
self.enterState(STATE_WAITING_FOR_RESTART2)
else:
# We are neither Evse nor PEV, so we are just listener. Do not set the key, to avoid disturbing the two participants.
self.enterState(STATE_WAITING_FOR_RESTART2) # does not really matter
def evaluateReceivedHomeplugPacket(self):
mmt = self.getManagementMessageType()
# print(hex(mmt))
if (mmt == CM_GET_KEY + MMTYPE_CNF):
self.evaluateGetKeyCnf()
if (mmt == CM_SLAC_MATCH + MMTYPE_REQ):
self.evaluateSlacMatchReq()
if (mmt == CM_SLAC_MATCH + MMTYPE_CNF):
self.evaluateSlacMatchCnf()
if (mmt == CM_SLAC_PARAM + MMTYPE_REQ):
self.evaluateSlacParamReq()
if (mmt == CM_SLAC_PARAM + MMTYPE_CNF):
self.evaluateSlacParamCnf()
if (mmt == CM_START_ATTEN_CHAR + MMTYPE_IND):
self.evaluateStartAttenCharInd()
if (mmt == CM_MNBC_SOUND + MMTYPE_IND):
self.evaluateMnbcSoundInd()
if (mmt == CM_ATTEN_CHAR + MMTYPE_IND):
self.evaluateAttenCharInd()
if (mmt == CM_SET_KEY + MMTYPE_CNF):
self.evaluateSetKeyCnf()
if (mmt == CM_GET_SW + MMTYPE_CNF):
self.evaluateGetSwCnf()
def isEvseModemFound(self):
#return 0 # todo: look whether the MAC of the EVSE modem is in the list of detected modems
return self.numberOfFoundModems>1
def enterState(self, n):
self.addToTrace("[PEVSLAC] from " + str(self.pevSequenceState) + " entering " + str(n))
self.pevSequenceState = n
self.pevSequenceCyclesInState = 0
def isTooLong(self):
# The timeout handling function.
return (self.pevSequenceCyclesInState > 500)
def runEvseSlacHandler(self):
if (self.evseSlacHandlerState==0):
# we did not yet configure our EVSE modem with the random key. Do it now.
# Fill some of the bytes of the NMK with random numbers. The others stay at 0x77 for easy visibility.
self.NMK_EVSE_random[2] = int(random()*255)
self.NMK_EVSE_random[3] = int(random()*255)
self.NMK_EVSE_random[4] = int(random()*255)
self.NMK_EVSE_random[5] = int(random()*255)
self.NMK_EVSE_random[6] = int(random()*255)
self.composeSetKey(0)
self.addToTrace("transmitting SET_KEY.REQ, to configure the EVSE modem with random NMK " +prettyHexMessage(self.NMK_EVSE_random))
self.transmit(self.mytransmitbuffer)
self.evseSlacHandlerState = 1 # setkey was done
return
def publishStatus(self, s1, s2="", s3=""):
self.showStatus(s1+s2+s3, "pevState")
def modemFinder_Mainfunction(self):
if ((self.connMgr.getConnectionLevel()==5) and (self.mofi_state==0)):
# We want the modem search only, if no connection is present at all.
if (self.isSimulationMode!=0):
self.addToTrace("[ModemFinder] We are in SimulationMode. Pretending that one modem is present.")
self.composeGetSwReq() # Send a GetSoftwareVersionRequest never the less. Just to have it in the trace.
self.transmit(self.mytransmitbuffer)
self.numberOfSoftwareVersionResponses = 1 # One pretended modem
self.connMgr.ModemFinderOk(self.numberOfSoftwareVersionResponses) # report "success" to the connection manager
self.mofi_state=2
return
self.addToTrace("[ModemFinder] Starting modem search")
self.publishStatus("Modem search")
self.composeGetSwReq()
self.transmit(self.mytransmitbuffer)
self.numberOfSoftwareVersionResponses = 0 # we want to count the modems. Start from zero.
self.mofi_stateDelay = 15 # 0.5s should be sufficient to receive the software versions from the modems
self.mofi_state = 1
return
if (self.mofi_state==1):
# waiting for responses of the modems
if (self.mofi_stateDelay>0):
self.mofi_stateDelay-=1
return
# waiting time is expired. Lets look how many responses we got.
self.addToTrace("[ModemFinder] Number of modems:" + str(self.numberOfSoftwareVersionResponses))
self.publishStatus("Modems:", str(self.numberOfSoftwareVersionResponses))
if (self.numberOfSoftwareVersionResponses>0):
self.connMgr.ModemFinderOk(self.numberOfSoftwareVersionResponses)
self.mofi_stateDelay = 15 # 0.5s to show the number of modems, before we start a new search if necessary
self.mofi_state=2
return
if (self.mofi_state==2):
# just waiting, to give the user time to read the result.
if (self.mofi_stateDelay>0):
self.mofi_stateDelay-=1
return
self.mofi_state=0 # back to idle state
def runPevSequencer(self):
# in PevMode, check whether homeplug modem is connected, run the SLAC and SDP
self.pevSequenceCyclesInState+=1
if (self.connMgr.getConnectionLevel()<10):
# we have no modem seen. --> nothing to do for the SLAC
if (self.pevSequenceState!=STATE_INITIAL):
self.enterState(STATE_INITIAL)
return
if (self.connMgr.getConnectionLevel()>=20):
# we have two modems in the AVLN. This means, the modem pairing is already done. --> nothing to do for the SLAC
if (self.pevSequenceState!=STATE_INITIAL):
self.enterState(STATE_INITIAL)
return
if (self.pevSequenceState==STATE_INITIAL): # Initial state.
# In real life we would check whether we see 5% PWM on the pilot line. We skip this check.
self.isSDPDone = 0
self.isDeveloperLocalKey = 0
self.nEvseModemMissingCounter = 0
self.enterState(STATE_READY_FOR_SLAC)
return
if (self.pevSequenceState==STATE_READY_FOR_SLAC):
if (self.isSimulationMode!=0):
self.showStatus("Simu SLAC", "pevState")
else:
self.showStatus("Starting SLAC", "pevState")
self.addToTrace("[PEVSLAC] Checkpoint100: Sending SLAC_PARAM.REQ...")
self.composeSlacParamReq()
self.transmit(self.mytransmitbuffer)
self.enterState(STATE_WAITING_FOR_SLAC_PARAM_CNF)
return
if (self.pevSequenceState==STATE_WAITING_FOR_SLAC_PARAM_CNF): # Waiting for slac_param confirmation.
if (self.pevSequenceCyclesInState>=30):
# No response for 1s, this is an error.
self.addToTrace("[PEVSLAC] Timeout while waiting for SLAC_PARAM.CNF")
self.enterState(STATE_INITIAL)
# (the normal state transition is done in the reception handler)
return
if (self.pevSequenceState==STATE_SLAC_PARAM_CNF_RECEIVED): # slac_param confirmation was received.
self.pevSequenceDelayCycles = 1 # 1*30=30ms as preparation for the next state.
# Between the SLAC_PARAM.CNF and the first START_ATTEN_CHAR.IND the Ioniq waits 100ms.
# The allowed time TP_match_sequence is 0 to 100ms.
# Alpitronic and ABB chargers are more tolerant, they worked with a delay of approx
# 250ms. In contrast, Supercharger and Compleo do not respond anymore if we
# wait so long.
self.nRemainingStartAttenChar = 3 # There shall be 3 START_ATTEN_CHAR messages.
self.enterState(STATE_BEFORE_START_ATTEN_CHAR)
return
if (self.pevSequenceState==STATE_BEFORE_START_ATTEN_CHAR): # received SLAC_PARAM.CNF. Multiple transmissions of START_ATTEN_CHAR.
if (self.pevSequenceDelayCycles>0):
self.pevSequenceDelayCycles-=1
return
# The delay time is over. Let's transmit.
if (self.nRemainingStartAttenChar>0):
self.nRemainingStartAttenChar-=1
self.composeStartAttenCharInd()
self.addToTrace("[PEVSLAC] transmitting START_ATTEN_CHAR.IND...")
self.transmit(self.mytransmitbuffer)
self.pevSequenceDelayCycles = 0 # original from ioniq is 20ms between the START_ATTEN_CHAR. Shall be 20ms to 50ms. So we set to 0 and the normal 30ms call cycle is perfect.
return
else:
# all three START_ATTEN_CHAR.IND are finished. Now we send 10 MNBC_SOUND.IND
self.pevSequenceDelayCycles = 0 # original from ioniq is 40ms after the last START_ATTEN_CHAR.IND.
# Shall be 20ms to 50ms. So we set to 0 and the normal 30ms call cycle is perfect.
self.remainingNumberOfSounds = 10 # We shall transmit 10 sound messages.
self.enterState(STATE_SOUNDING)
return
if (self.pevSequenceState==STATE_SOUNDING): # Multiple transmissions of MNBC_SOUND.IND.
if (self.pevSequenceDelayCycles>0):
self.pevSequenceDelayCycles-=1
return
if (self.remainingNumberOfSounds>0):
self.remainingNumberOfSounds-=1
self.composeNmbcSoundInd()
self.addToTrace("[PEVSLAC] transmitting MNBC_SOUND.IND...") # original from ioniq is 40ms after the last START_ATTEN_CHAR.IND
self.transmit(self.mytransmitbuffer)
if (self.remainingNumberOfSounds==0):
self.enterState(STATE_WAIT_FOR_ATTEN_CHAR_IND) # move fast to the next state, so that a fast response is catched in the correct state
self.pevSequenceDelayCycles = 0 # original from ioniq is 20ms between the messages.
# Shall be 20ms to 50ms. So we set to 0 and the normal 30ms call cycle is perfect.
return
if (self.pevSequenceState==STATE_WAIT_FOR_ATTEN_CHAR_IND): # waiting for ATTEN_CHAR.IND
# todo: it is possible that we receive this message from multiple chargers. We need
# to select the charger with the loudest reported signals.
if (self.isTooLong()):
self.enterState(STATE_INITIAL)
return
#(the normal state transition is done in the reception handler)
if (self.pevSequenceState==STATE_ATTEN_CHAR_IND_RECEIVED): # ATTEN_CHAR.IND was received and the
# nearest charger decided and the
# ATTEN_CHAR.RSP was sent.
self.enterState(STATE_DELAY_BEFORE_MATCH)
self.pevSequenceDelayCycles = 30 # original from ioniq is 860ms to 980ms from ATTEN_CHAR.RSP to SLAC_MATCH.REQ
return