Skip to content

Commit

Permalink
MQTT cu_id request
Browse files Browse the repository at this point in the history
  • Loading branch information
javibu13 committed Oct 23, 2023
1 parent 94b0e30 commit 3479a3d
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 1 deletion.
2 changes: 1 addition & 1 deletion code/src/wattrex_battery_cycler/mid/mid_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import sys
from .mid_data_devices import (MidDataDeviceStatusE, MidDataDeviceTypeE, MidDataDeviceStatusC,
MidDataDeviceC, MidDataLinkConfC, MidDataCyclerStationC)
MidDataDeviceC, MidDataLinkConfC, MidDataCyclerStationC, MidDataCuC)
from .mid_data_experiment import (MidDataPwrLimitE, MidDataPwrModeE, MidDataProfileC, MidDataAlarmC,
MidDataPwrRangeC, MidDataExperimentC, MidDataExpStatusE, MidDataInstructionC)
from .mid_data_common import MidDataAllStatusC, MidDataExtMeasC, MidDataGenMeasC
Expand Down
39 changes: 39 additions & 0 deletions code/src/wattrex_battery_cycler/mid/mid_data/mid_data_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from __future__ import annotations

####################### GENERIC IMPORTS #######################
from os import getenv
from typing import Dict, List
from socket import gethostname, gethostbyname
from datetime import datetime
from uuid import getnode
from json import dumps

####################### THIRD PARTY IMPORTS #######################
from enum import Enum
Expand Down Expand Up @@ -189,3 +194,37 @@ def __init__(self, cs_id: int| None= None, name: str| None= None,
self.cs_id : int| None = cs_id
self.devices : List[MidDataDeviceC]| None = devices
self.deprecated: bool|None = deprecated

class MidDataCuC:
'''
Computational Unit information.
'''
def __init__(self):
'''
Initialize CU instance with the given parameters.
Args:
name (str): Name of the CU
cu_id (str): ID of the CU
devices (List[MidDataDeviceC]): List of devices included in the CU
deprecated (bool, optional): Flag that indicates if the CU is deprecated
'''

self.user : str = getenv('USER', '')
self.host_name : str = gethostname()
self.ip : str = gethostbyname(self.host_name)
self.port : int = 0
self.mac : str = hex(getnode())
self.last_connection : datetime = datetime.now()

def to_json(self):
'''
Convert the object to a json string.
'''
return dumps(self, default=lambda o: self._try(o), sort_keys=True, indent=0, separators=(',',':')).replace('\n', '')

def _try(self, o):
try:
return o.__dict__
except Exception:
return str(o)
65 changes: 65 additions & 0 deletions devops/cu_manager/broker_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/python3
"""
Wrapper for the MQTT client
"""
####################### MANDATORY IMPORTS #######################

####################### GENERIC IMPORTS #######################
from typing import Callable, List
import json

####################### THIRD PARTY IMPORTS #######################

####################### SYSTEM ABSTRACTION IMPORTS #######################
from system_logger_tool import sys_log_logger_get_module_logger, SysLogLoggerC, Logger

####################### LOGGER CONFIGURATION #######################
if __name__ == '__main__':
cycler_logger = SysLogLoggerC(file_log_levels='../log_config.yaml')
log: Logger = sys_log_logger_get_module_logger(__name__)

####################### MODULE IMPORTS #######################

####################### PROJECT IMPORTS #######################
import context
from mid.mid_data import MidDataDeviceC, MidDataCuC
from wattrex_driver_mqtt import DrvMqttDriverC

####################### ENUMS #######################

####################### CLASSES #######################

class BrokerClientC():
"""
Broker Client Class to instanciate a Broker Client object
"""
def __init__(self, error_callback : Callable,
launch_callback : Callable,
detect_callback : Callable,
cu_id_msg_callback : Callable) -> None:
self.mqtt : DrvMqttDriverC = DrvMqttDriverC(error_callback=error_callback,
cred_path='.cred.yaml')
self.__detect_cb : Callable = detect_callback
self.__launch_cb : Callable = launch_callback
self.mqtt.subscribe(topic='/cu_id_assigned', callback=cu_id_msg_callback)

def process_launch(self) -> None:
pass

def precess_detect(self) -> None:
pass

def publish_devices(self, detected_devices : List[MidDataDeviceC]) -> None:
pass

def publish_cu_info(self, cu_info : MidDataCuC) -> None:
self.mqtt.publish(topic='/register', data=cu_info.to_json())

def publish_heartbeat(self) -> None:
pass

def process_iteration(self) -> None:
pass


####################### FUNCTIONS #######################
22 changes: 22 additions & 0 deletions devops/cu_manager/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/python3
'''
project context configuration.
'''
####################### MANDATORY IMPORTS #######################

####################### GENERIC IMPORTS #######################
import os
import sys

####################### THIRD PARTY IMPORTS #######################

####################### SYSTEM ABSTRACTION IMPORTS #######################

####################### PROJECT IMPORTS #######################

####################### MODULE IMPORTS #######################

####################### ENUMS #######################

####################### CLASSES #######################
sys.path.append(os.getcwd()+'/code/src/wattrex_battery_cycler/')
168 changes: 168 additions & 0 deletions devops/cu_manager/cu_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/python3
"""
Cu Manager
"""
####################### MANDATORY IMPORTS #######################

####################### GENERIC IMPORTS #######################
import subprocess
import threading
from os import path
from typing import List
from time import sleep
import json

####################### THIRD PARTY IMPORTS #######################

####################### SYSTEM ABSTRACTION IMPORTS #######################
from system_logger_tool import sys_log_logger_get_module_logger, SysLogLoggerC, Logger

####################### LOGGER CONFIGURATION #######################
if __name__ == '__main__':
cycler_logger = SysLogLoggerC(file_log_levels='./log_config.yaml')
log: Logger = sys_log_logger_get_module_logger(__name__)

####################### MODULE IMPORTS #######################
import context
from broker_client import BrokerClientC

####################### PROJECT IMPORTS #######################
from system_shared_tool import SysShdIpcChanC, SysShdNodeC
from mid.mid_data import MidDataCuC, MidDataDeviceC, MidDataDeviceTypeE, MidDataLinkConfC

####################### ENUMS #######################
SECONDS_TO_WAIT_FOR_CU_ID = 60
####################### CLASSES #######################

class CuManagerNodeC(SysShdNodeC):
"""
Cu Manager Class to instanciate a CU Manager Node
"""
def __init__(self, working_flag : threading.Event, cycle_period : int) -> None:
''' Initialize the node.
'''
super().__init__(name='cu_manager_node', cycle_period=cycle_period, working_flag=working_flag)
self.heartbeat_queue : SysShdIpcChanC = SysShdIpcChanC(name='heartbeat_queue')
self.active_cs : dict = {} #{cs_id : last_connection}
self.client_mqtt : BrokerClientC = BrokerClientC(error_callback=self.on_mqtt_error,
launch_callback=self.launch_cs,
detect_callback=self.detect_devices,
cu_id_msg_callback=self.on_register_answer)
# self.sync_node : MidSyncNoceC = MidSyncNoceC()
self.working_flag_sync : threading.Event = working_flag
self.cycle_period : int = cycle_period

self.cu_id = None
if path.exists('./devops/cu_manager/.cu_id'):
with open('./devops/cu_manager/.cu_id', 'r', encoding='utf-8') as cu_id_file:
self.cu_id = int(cu_id_file.read())
else:
self.register_cu()


def process_heartbeat(self) -> None:
pass

def __gather_heartbeat(self) -> None:
pass

def register_cu(self) -> None:
'''
Register the CU in the broker to get from it an id.
'''
self.client_mqtt.publish_cu_info(MidDataCuC())
for _ in range(SECONDS_TO_WAIT_FOR_CU_ID*10):
if self.cu_id is not None:
break
self.client_mqtt.mqtt.process_data()
sleep(0.1)

if self.cu_id is None:
log.critical('No CU_ID assigned')
raise RuntimeError('No CU_ID assigned')

log.info('CU_ID assigned: %s', self.cu_id)
with open('./devops/cu_manager/.cu_id', 'w', encoding='utf-8') as cu_id_file:
cu_id_file.write(str(self.cu_id))


def detect_devices(self) -> List[MidDataDeviceC]:
'''Detect the devices that are currently connected to the device.
Returns:
List[MidDataDeviceC]: List of devices connected to the device that are detected and
which type has been previously defined.
'''

test_detected_device = MidDataDeviceC(dev_id=1,
manufacturer='test',
model='test',
serial_number='test',
device_type=MidDataDeviceTypeE.EPC,
iface_name='test',
mapping_names={'test':'test'},
link_configuration=MidDataLinkConfC(
baudrate=9600,
parity='N',
stopbits=1,
bytesize=8,
timeout=0.1,
write_timeout=0.1,
inter_byte_timeout=0.1,
separator='\n'))
return [test_detected_device]

def launch_cs(self, mqtt_msg) -> None:
'''Callback function executed from the Broker Client when a message is received from the
broker in the CU_ID/launch/ topic.
Args:
mqtt_msg ([type]): [description]
'''
if mqtt_msg.payload == 'launch':
result = subprocess.run(['./devops/deploy.sh', '', 'cs', '1'],
stdout=subprocess.PIPE,
universal_newlines=True,
check=False)
log.info(result.stdout)

def on_mqtt_error(self) -> None:
''' Callback function executed from the Broker Client when an error is detected
'''
log.critical('MQTT Error')

def on_register_answer(self, msg_data) -> None:
'''
Callback function executed from the Broker Client when
a message is received from the mqtt broker
Args:
msg_data (MidDataCuC): Message data
'''
log.critical(msg_data)
log.critical(json.loads(msg_data))
msg_data = json.loads(msg_data)
if msg_data['mac'] == MidDataCuC().mac:
self.cu_id = msg_data['cu_id']
log.info('Cu_id received from mqtt: %s', self.cu_id)
else:
log.info('Cu_id received from mqtt but is not for this CU: %s', msg_data['cu_id'])


def sync_shd_data(self) -> None:
pass

def process_iteration(self) -> None:
pass

def stop(self) -> None:
pass


####################### FUNCTIONS #######################

if __name__ == '__main__':
working_flag_event : threading.Event = threading.Event()
working_flag_event.set()
cu_manager_node = CuManagerNodeC(working_flag=working_flag_event, cycle_period=1)
cu_manager_node.run()

0 comments on commit 3479a3d

Please sign in to comment.