Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.10.0 - New Device Vitals #95

Merged
merged 10 commits into from
Jun 3, 2024
Prev Previous commit
Next Next commit
Vitals improvements - Fix neurio add more error handling
jasonacox committed Jun 2, 2024
commit daac01dfad39c0541896ca1696f944aacc08c97d
149 changes: 85 additions & 64 deletions pypowerwall/tedapi/__init__.py
Original file line number Diff line number Diff line change
@@ -76,9 +76,8 @@ def __init__(self, gw_pwd, debug=False, pwcacheexpire: int = 5, timeout: int = 5
self.timeout = timeout
self.pwcooldown = 0
self.din = None
# Require both gw_ip and gw_pwd
if not gw_pwd:
raise ValueError("Missing gw_ip or gw_pwd")
raise ValueError("Missing gw_pwd")
if self.debug:
log.setLevel(logging.DEBUG)
self.gw_pwd = gw_pwd
@@ -367,26 +366,30 @@ def calculate_dc_power(V, I):

# Create NEURIO block
neurio = {}
c = 1000
# Loop through each Neurio device serial number
for n in lookup(status, ['neurio', 'readings']):
for n in lookup(status, ['neurio', 'readings']) or {}:
# Loop through each CT on the Neurio device
sn = n.get('serial', str(c))
cts = {}
i = 0
for ct in n['dataRead']:
c = c + 1
for ct in n['dataRead'] or {}:
device = f"NEURIO_CT{i}_"
cts[device + "InstRealPower"] = ct['realPowerW']
cts[device + "Location"] = "solarRGM"
i = i + 1
rest = {
"componentParentDin": lookup(config, ['vin']),
"firmwareVersion": None,
"lastCommunicationTime": n['timestamp'],
"lastCommunicationTime": n.get('timestamp', None),
"manufacturer": "NEURIO",
"meterAttributes": {
"meterLocation": []
},
"serialNumber": n['serial']
"serialNumber": sn
}
neurio[f"NEURIO--{n['serial']}"] = {**cts, **rest}
neurio[f"NEURIO--{sn}"] = {**cts, **rest}

# Create PVAC, PVS, and TESLA blocks - Assume the are aligned
pvac = {}
@@ -395,12 +398,14 @@ def calculate_dc_power(V, I):
i = 0
num = len(lookup(status, ['esCan', 'bus', 'PVAC']))
if num != len(lookup(status, ['esCan', 'bus', 'PVS'])):
log.error("PVAC and PVS device count mismatch")
log.warning("PVAC and PVS device count mismatch")
# Loop through each device serial number
for p in lookup(status, ['esCan', 'bus', 'PVAC']):
for p in lookup(status, ['esCan', 'bus', 'PVAC']) or {}:
if not p['packageSerialNumber']:
continue
pvac_name = f"PVAC--{p['packagePartNumber']}--{p['packageSerialNumber']}"
packagePartNumber = p.get('packagePartNumber', str(i))
packageSerialNumber = p.get('packageSerialNumber', str(i))
pvac_name = f"PVAC--{packagePartNumber}--{packageSerialNumber}"
V_A = p['PVAC_Logging']['PVAC_PVMeasuredVoltage_A']
V_B = p['PVAC_Logging']['PVAC_PVMeasuredVoltage_B']
V_C = p['PVAC_Logging']['PVAC_PVMeasuredVoltage_C']
@@ -414,7 +419,7 @@ def calculate_dc_power(V, I):
P_C = calculate_dc_power(V_C, I_C)
P_D = calculate_dc_power(V_D, I_D)
pvac[pvac_name] = {
"PVAC_Fout": p['PVAC_Status']['PVAC_Fout'],
"PVAC_Fout": lookup(p, ['PVAC_Status', 'PVAC_Fout']),
"PVAC_GridState": None,
"PVAC_InvState": None,
"PVAC_Iout": None,
@@ -431,51 +436,63 @@ def calculate_dc_power(V, I):
"PVAC_PVMeasuredVoltage_B": V_B,
"PVAC_PVMeasuredVoltage_C": V_C,
"PVAC_PVMeasuredVoltage_D": V_D,
"PVAC_Pout": p['PVAC_Status']['PVAC_Pout'],
"PVAC_PvState_A": None, # possibly from PVS.PVS_Status.PVS_StringA_Connected
"PVAC_PvState_B": None,
"PVAC_PvState_C": None,
"PVAC_PvState_D": None,
"PVAC_Pout": lookup(p, ['PVAC_Status', 'PVAC_Pout']),
"PVAC_PvState_A": None, # These are placeholders
"PVAC_PvState_B": None, # Compute from PVS below
"PVAC_PvState_C": None, # PV_Disabled, PV_Active, PV_Active_Parallel
"PVAC_PvState_D": None, # Not available in TEDAPI
"PVAC_Qout": None,
"PVAC_State": p['PVAC_Status']['PVAC_State'],
"PVAC_State": lookup(p, ['PVAC_Status', 'PVAC_State']),
"PVAC_VHvMinusChassisDC": None,
"PVAC_VL1Ground": p['PVAC_Logging']['PVAC_VL1Ground'],
"PVAC_VL2Ground": p['PVAC_Logging']['PVAC_VL2Ground'],
"PVAC_Vout": p['PVAC_Status']['PVAC_Vout'],
"PVAC_VL1Ground": lookup(p, ['PVAC_Logging', 'PVAC_VL1Ground']),
"PVAC_VL2Ground": lookup(p, ['PVAC_Logging', 'PVAC_VL2Ground']),
"PVAC_Vout": lookup(p, ['PVAC_Status', 'PVAC_Vout']),
"PVI-PowerStatusSetpoint": None,
"componentParentDin": None,
"firmwareVersion": None,
"lastCommunicationTime": None,
"manufacturer": "TESLA",
"partNumber": p['packagePartNumber'],
"serialNumber": p['packageSerialNumber'],
"partNumber": packagePartNumber,
"serialNumber": packageSerialNumber,
"teslaEnergyEcuAttributes": {
"ecuType": 296
}
}
pvs_name = f"PVS--{p['packagePartNumber']}--{p['packageSerialNumber']}"
pvs_data = lookup(status, ['esCan', 'bus', 'PVS'])[i]
pvs[pvs_name] = {
"PVS_EnableOutput": None,
"PVS_SelfTestState": pvs_data['PVS_Status']['PVS_SelfTestState'],
"PVS_State": pvs_data['PVS_Status']['PVS_State'],
"PVS_StringA_Connected": pvs_data['PVS_Status']['PVS_StringA_Connected'],
"PVS_StringB_Connected": pvs_data['PVS_Status']['PVS_StringB_Connected'],
"PVS_StringC_Connected": pvs_data['PVS_Status']['PVS_StringC_Connected'],
"PVS_StringD_Connected": pvs_data['PVS_Status']['PVS_StringD_Connected'],
"PVS_vLL": pvs_data['PVS_Status']['PVS_vLL'],
"alerts": lookup(pvs_data, ['alerts', 'active']) or [],
"componentParentDin": None,
"firmwareVersion": None,
"lastCommunicationTime": None,
"manufacturer": "TESLA",
"partNumber": p['packagePartNumber'],
"serialNumber": p['packageSerialNumber'],
"teslaEnergyEcuAttributes": {
"ecuType": 297
pvs_name = f"PVS--{packagePartNumber}--{packageSerialNumber}"
pvs_data = lookup(status, ['esCan', 'bus', 'PVS'])
if i < len(pvs_data):
pvs_data = pvs_data[i]
# Set String Connected states
string_a = lookup(pvs_data, ['PVS_Status', 'PVS_StringA_Connected'])
string_b = lookup(pvs_data, ['PVS_Status', 'PVS_StringB_Connected'])
string_c = lookup(pvs_data, ['PVS_Status', 'PVS_StringC_Connected'])
string_d = lookup(pvs_data, ['PVS_Status', 'PVS_StringD_Connected'])
# Set PVAC PvState based on PVS String Connected states
pvac[pvac_name]["PVAC_PvState_A"] = "PV_Active" if string_a else "PV_Disabled"
pvac[pvac_name]["PVAC_PvState_B"] = "PV_Active" if string_b else "PV_Disabled"
pvac[pvac_name]["PVAC_PvState_C"] = "PV_Active" if string_c else "PV_Disabled"
pvac[pvac_name]["PVAC_PvState_D"] = "PV_Active" if string_d else "PV_Disabled"
pvs[pvs_name] = {
"PVS_EnableOutput": None,
"PVS_SelfTestState": lookup(pvs_data, ['PVS_Status', 'PVS_SelfTestState']),
"PVS_State": lookup(pvs_data, ['PVS_Status', 'PVS_State']),
"PVS_StringA_Connected": string_a,
"PVS_StringB_Connected": string_b,
"PVS_StringC_Connected": string_c,
"PVS_StringD_Connected": string_d,
"PVS_vLL": lookup(pvs_data, ['PVS_Status', 'PVS_vLL']),
"alerts": lookup(pvs_data, ['alerts', 'active']) or [],
"componentParentDin": None,
"firmwareVersion": None,
"lastCommunicationTime": None,
"manufacturer": "TESLA",
"partNumber": packagePartNumber,
"serialNumber": packageSerialNumber,
"teslaEnergyEcuAttributes": {
"ecuType": 297
}
}
}
tesla_name = f"TESLA--{p['packagePartNumber']}--{p['packageSerialNumber']}"
tesla_name = f"TESLA--{packagePartNumber}--{packageSerialNumber}"
if i < len(config.get('solars', [{}])):
tesla_nameplate = config['solars'][i].get('power_rating_watts', None)
brand = config['solars'][i].get('brand', None)
@@ -490,7 +507,7 @@ def calculate_dc_power(V, I):
"pvInverterAttributes": {
"nameplateRealPowerW": tesla_nameplate,
},
"serialNumber": p['packageSerialNumber']
"serialNumber": packageSerialNumber,
}
i = i + 1

@@ -516,29 +533,29 @@ def calculate_dc_power(V, I):
bat_name = {}
battry_blocks = config.get('battery_blocks', [])
for bat in battry_blocks:
bat_name[i] = f"TEPINV--{bat['vin']}"
bat_name[i] = f"TEPINV--{bat.get('vin', str(i))}"
i = i + 1
i = 0
for p in lookup(status, ['esCan', 'bus', 'PINV']):
for p in lookup(status, ['esCan', 'bus', 'PINV']) or {}:
if i > len(bat_name) - 1:
break
name = bat_name[i]
tepinv[name] = {
"PINV_EnergyCharged": None,
"PINV_EnergyDischarged": None,
"PINV_Fout": p['PINV_Status']['PINV_Fout'],
"PINV_GridState": p['PINV_Status']['PINV_GridState'],
"PINV_Fout": lookup(p, ['PINV_Status', 'PINV_Fout']),
"PINV_GridState": lookup(p, ['PINV_Status', 'PINV_GridState']),
"PINV_HardwareEnableLine": None,
"PINV_PllFrequency": None,
"PINV_PllLocked": None,
"PINV_Pout": p['PINV_Status']['PINV_Pout'],
"PINV_Pout": lookup(p, ['PINV_Status', 'PINV_Pout']),
"PINV_PowerLimiter": None,
"PINV_Qout": None,
"PINV_ReadyForGridForming": None,
"PINV_State": p['PINV_Status']['PINV_State'],
"PINV_VSplit1": p['PINV_AcMeasurements']['PINV_VSplit1'],
"PINV_VSplit2": p['PINV_AcMeasurements']['PINV_VSplit2'],
"PINV_Vout": p['PINV_Status']['PINV_Vout'],
"PINV_State": lookup(p, ['PINV_Status', 'PINV_State']),
"PINV_VSplit1": lookup(p, ['PINV_AcMeasurements', 'PINV_VSplit1']),
"PINV_VSplit2": lookup(p, ['PINV_AcMeasurements', 'PINV_VSplit2']),
"PINV_Vout": lookup(p, ['PINV_Status', 'PINV_Vout']),
"alerts": lookup(p, ['alerts', 'active']) or [],
"componentParentDin": None,
"firmwareVersion": None,
@@ -557,10 +574,12 @@ def calculate_dc_power(V, I):
tethc = {}
i = 0
# Loop through each THC device serial number
for p in lookup(status, ['esCan', 'bus', 'THC']):
for p in lookup(status, ['esCan', 'bus', 'THC']) or {}:
if not p['packageSerialNumber']:
continue
name = f"TEPOD--{p['packagePartNumber']}--{p['packageSerialNumber']}"
packagePartNumber = p.get('packagePartNumber', str(i))
packageSerialNumber = p.get('packageSerialNumber', str(i))
name = f"TEPOD--{packagePartNumber}--{packageSerialNumber}"
pod = lookup(status, ['esCan', 'bus', 'POD'])[i]
energy_remaining = lookup(pod, ['POD_EnergyStatus', 'POD_nom_energy_remaining'])
full_pack_energy = lookup(pod, ['POD_EnergyStatus', 'POD_nom_full_pack_energy'])
@@ -588,14 +607,14 @@ def calculate_dc_power(V, I):
"firmwareVersion": None,
"lastCommunicationTime": None,
"manufacturer": "TESLA",
"partNumber": p['packagePartNumber'],
"serialNumber": p['packageSerialNumber'],
"partNumber": packagePartNumber,
"serialNumber": packageSerialNumber,
"teslaEnergyEcuAttributes": {
"ecuType": 226
}
}
i = i + 1
name = f"TETHC--{p['packagePartNumber']}--{p['packageSerialNumber']}"
name = f"TETHC--{packagePartNumber}--{packageSerialNumber}"
tethc[name] = {
"THC_AmbientTemp": None,
"THC_State": None,
@@ -604,8 +623,8 @@ def calculate_dc_power(V, I):
"firmwareVersion": None,
"lastCommunicationTime": None,
"manufacturer": "TESLA",
"partNumber": p['packagePartNumber'],
"serialNumber": p['packageSerialNumber'],
"partNumber": packagePartNumber,
"serialNumber": packageSerialNumber,
"teslaEnergyEcuAttributes": {
"ecuType": 224
}
@@ -629,7 +648,9 @@ def calculate_dc_power(V, I):
tesync = {}
sync = lookup(status, ['esCan', 'bus', 'SYNC'])
islander = lookup(status, ['esCan', 'bus', 'ISLANDER'])
name = f"TESYNC--{sync['packagePartNumber']}--{sync['packageSerialNumber']}"
packagePartNumber = sync.get('packagePartNumber', None)
packageSerialNumber = sync.get('packageSerialNumber', None)
name = f"TESYNC--{packagePartNumber}--{packageSerialNumber}"
tesync[name] = {
"ISLAND_FreqL1_Load": lookup(islander, ['ISLAND_AcMeasurements', 'ISLAND_FreqL1_Load']),
"ISLAND_FreqL1_Main": lookup(islander, ['ISLAND_AcMeasurements', 'ISLAND_FreqL1_Main']),
@@ -689,8 +710,8 @@ def calculate_dc_power(V, I):
"componentParentDin": None,
"firmwareVersion": None,
"manufacturer": "TESLA",
"partNumber": lookup(sync, ['packagePartNumber']),
"serialNumber": lookup(sync, ['packageSerialNumber']),
"partNumber": packagePartNumber,
"serialNumber": packageSerialNumber,
"teslaEnergyEcuAttributes": {
"ecuType": 259
}