diff --git a/docs/basics/remote_components.rst b/docs/basics/remote_components.rst new file mode 100644 index 00000000..1b2ba290 --- /dev/null +++ b/docs/basics/remote_components.rst @@ -0,0 +1,96 @@ +.. _remote_components: + +***************** +Remote Components +***************** + +The purpose of remote components is to provide a means of adding a remote physics analysis to a local OpenMDAO problem. +One situation in which this may be desirable is when the time to carry out a full optimization exceeds an HPC job time limit. +Such a situation, without remote components, may normally require manual restarts of the optimization, and would thus limit one to optimizers with this capability. +Using remote components, one can keep a serial OpenMDAO optimization running continuously on a login node (e.g., using the nohup or screen Linux commands) while the parallel physics analyses are evaluated across several HPC jobs. +Another situation where these components may be advantageous is when the OpenMDAO problem contains components not streamlined for massively parallel environments. + +In general, remote components use nested OpenMDAO problems in a server-client arrangement. +The outer, client-side OpenMDAO model serves as the overarching analysis/optimization problem while the inner, server-side model serves as the isolated high-fidelity analysis. +The server inside the HPC job remains open to evaluate function or gradient calls. +Wall times for function and gradient calls are saved, and when the maximum previous time multiplied by a scale factor exceeds the remaining job time, the server will be relaunched. + +Three general base classes are used to achieve this. + +* :class:`~mphys.network.remote_component.RemoteComp`: Explicit component that wraps communication with server, replicating inputs/outputs to/from server-side group and requesting new a server when estimated analysis time exceeds remaining job time. +* :class:`~mphys.network.server_manager.ServerManager`: Used by ``RemoteComp`` to control and communicate with the server. +* :class:`~mphys.network.server.Server`: Loads the inner OpenMDAO problem and evaluates function or gradient calls as requested by the ``ServerManager``. + +Currently, there is one derived class for each, which use pbs4py for HPC job control and ZeroMQ for network communication. + +* :class:`~mphys.network.zmq_pbs.RemoteZeroMQComp`: Through the use of ``MPhysZeroMQServerManager``, uses encoded JSON dictionaries to send and receive necessary information to and from the server. +* :class:`~mphys.network.zmq_pbs.MPhysZeroMQServerManager`: Uses ZeroMQ socket and ssh port forwarding from login to compute node to communicate with server, and pbs4py to start, stop, and check status of HPC jobs. +* :class:`~mphys.network.zmq_pbs.MPhysZeroMQServer`: Uses ZeroMQ socket to send and receive encoded JSON dictionaries. + +RemoteZeroMQComp Options +======================== +.. embed-options:: + mphys.network.zmq_pbs + RemoteZeroMQComp + options + +Usage +===== +When adding a :code:`RemoteZeroMQComp` component, the two required options are :code:`run_server_filename`, which is the server to be launched on an HPC job, and :code:`pbs`, which is the pbs4py Launcher object. +The server file should accept port number as an argument to facilitate communication with the client. +Within this file, the :code:`MPhysZeroMQServer` class's :code:`get_om_group_function_pointer` option is the pointer to the OpenMDAO Group or Multipoint class to be evaluated. +By default, any design variables, objectives, and constraints defined in the group will be added on the client side. +Any other desired inputs or outputs must be added in the :code:`additional_remote_inputs` or :code:`additional_remote_outputs` options. +On the client side, any "." characters in these input and output names will be replaced by :code:`var_naming_dot_replacement`. + +The screen output from a particular remote component's Nth server will be sent to :code:`mphys__serverN.out`, where :code:`component name` is the subsystem name of the :code:`RemoteZeroMQComp` instance. +Searching for the keyword "SERVER" will display what the server is currently doing; the keyword "CLIENT" will do the same on the client-side. +The HPC job for the component's server is named :code:`MPhys`; the pbs4py-generated job submission script is the same followed by ".pbs". +Note that running the remote component in parallel is not supported, and a SystemError will be triggered otherwise. + +Example +======= +Two examples are provided for the `supersonic panel aerostructural case `_: :code:`as_opt_remote_serial.py` and :code:`as_opt_remote_parallel.py`. +Both run the optimization problem defined in :code:`as_opt_parallel.py`, which contains a :code:`MultipointParallel` class and thus evaluates two aerostructural scenarios in parallel. +The serial remote example runs this group on one server. +The parallel remote example, on the other hand, contains an OpenMDAO parallel group which runs two servers in parallel. +Both examples use the same server file, :code:`mphys_server.py`, but point to either :code:`as_opt_parallel.py` or :code:`run.py` by sending the model's filename through the use of the :code:`RemoteZeroMQComp`'s :code:`additional_server_args` option. +As demonstrated in this server file, additional configuration options may be sent to the server-side OpenMDAO group through the use of a functor (called :code:`GetModel` in this case) in combination with :code:`additional_server_args`. +In this particular case, scenario name(s) are sent as :code:`additional_server_args` from the client side; on the server side, the :code:`GetModel` functor allows the scenario name(s) to be sent as OpenMDAO options to the server-side group. +Using the scenario :code:`run_directory` option, the scenarios can then be evaluated in different directories. +In both examples, the remote component(s) use a :code:`K4` pbs4py Launcher object, which will launch, monitor, and stop jobs using the K4 queue of the NASA K-cluster. + +Troubleshooting +=============== +The :code:`dump_json` option for :code:`RemoteZeroMQComp` will make the component write input and output JSON files, which contain all data sent to and received from the server. +An exception is the :code:`wall_time` entry (given in seconds) in the output JSON file, which is added on the client-side after the server has completed the design evaluation. +Another entry that is only provided for informational purposes is :code:`design_counter`, which keeps track of how many different designs have been evaluated on the current server. +If :code:`dump_separate_json` is set to True, then separate files will be written for each design evaluation. +On the server side, an n2 file titled :code:`n2_inner_analysis_.html` will be written after each evaluation. + +Current Limitations +=================== +* A pbs4py Launcher must be implemented for your HPC environment +* On the client side, :code:`RemoteZeroMQComp.stop_server()` should be added after your analysis/optimization to stop the HPC job and ssh port forwarding, which the server manager starts as a background process. +* If :code:`stop_server` is not called or the server stops unexpectedly, stopping the port forwarding manually is difficult, as it involves finding the ssh process associated with the remote server's port number. This must be done on the same login node that the server was launched from. +* Stopping the HPC job is somewhat easier as the job name will be :code:`MPhys` followed by the port number; however, if runs are launched from multiple login nodes then one may have multiple jobs with the same name. +* Currently, the :code:`of` option (as well as :code:`wrt`) for :code:`check_totals` or :code:`compute_totals` is not used by the remote component; on the server side, :code:`compute_totals` will be evaluated for all responses (objectives, constraints, and :code:`additional_remote_outputs`). Depending on how many :code:`of` responses are desired, this may be more costly than not using remote components. +* The HPC environment must allow ssh port forwarding from the login node to a compute node. + +.. autoclass:: mphys.network.remote_component.RemoteComp + :members: + +.. autoclass:: mphys.network.server_manager.ServerManager + :members: + +.. autoclass:: mphys.network.server.Server + :members: + +.. autoclass:: mphys.network.zmq_pbs.RemoteZeroMQComp + :members: + +.. autoclass:: mphys.network.zmq_pbs.MPhysZeroMQServerManager + :members: + +.. autoclass:: mphys.network.zmq_pbs.MPhysZeroMQServer + :members: diff --git a/docs/index.rst b/docs/index.rst index 9ae8e425..64570468 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ These are descriptions of how MPhys works and how it interfaces with solvers and basics/tagged_promotion.rst basics/builders.rst basics/naming_conventions.rst + basics/remote_components.rst .. _scenario_library: diff --git a/examples/aerostructural/supersonic_panel/as_opt_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_parallel.py new file mode 100644 index 00000000..85b764ba --- /dev/null +++ b/examples/aerostructural/supersonic_panel/as_opt_parallel.py @@ -0,0 +1,208 @@ +import numpy as np +import openmdao.api as om +import os + +from mphys import Multipoint, MultipointParallel +from mphys.scenario_aerostructural import ScenarioAeroStructural + +from structures_mphys import StructBuilder +from aerodynamics_mphys import AeroBuilder +from xfer_mphys import XferBuilder +from geometry_morph import GeometryBuilder + +check_totals = False # True=check objective/constraint derivatives, False=run optimization + +# panel geometry +panel_chord = 0.3 +panel_width = 0.01 + +# panel discretization +N_el_struct = 20 +N_el_aero = 7 + +# Mphys parallel multipoint scenarios +class AerostructParallel(MultipointParallel): + + def initialize(self): + self.options.declare('aero_builder') + self.options.declare('struct_builder') + self.options.declare('xfer_builder') + self.options.declare('geometry_builder') + self.options.declare('scenario_names') + + def setup(self): + for i in range(len(self.options['scenario_names'])): + + # create the run directory + if self.comm.rank==0: + if not os.path.isdir(self.options['scenario_names'][i]): + os.mkdir(self.options['scenario_names'][i]) + self.comm.Barrier() + + nonlinear_solver = om.NonlinearBlockGS(maxiter=100, iprint=2, use_aitken=True, aitken_initial_factor=0.5) + linear_solver = om.LinearBlockGS(maxiter=40, iprint=2, use_aitken=True, aitken_initial_factor=0.5) + self.mphys_add_scenario(self.options['scenario_names'][i], + ScenarioAeroStructural( + aero_builder=self.options['aero_builder'], + struct_builder=self.options['struct_builder'], + ldxfer_builder=self.options['xfer_builder'], + geometry_builder=self.options['geometry_builder'], + in_MultipointParallel=True, + run_directory=self.options['scenario_names'][i]), + coupling_nonlinear_solver=nonlinear_solver, + coupling_linear_solver=linear_solver) + +# OM group +class Model(om.Group): + def initialize(self): + self.options.declare('scenario_names', default=['aerostructural1','aerostructural2']) + + def setup(self): + self.scenario_names = self.options['scenario_names'] + + # ivc + self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) + self.ivc.add_output('modulus', val=70E9) + self.ivc.add_output('yield_stress', val=270E6) + self.ivc.add_output('density', val=2800.) + self.ivc.add_output('mach', val=[5.,3.]) + self.ivc.add_output('qdyn', val=[3E4,1E4]) + #self.ivc.add_output('aoa', val=[3.,2.], units='deg') # derivatives are wrong when using vector aoa and coloring; see OpenMDAO issue 2919 + self.ivc.add_output('aoa1', val=3., units='deg') + self.ivc.add_output('aoa2', val=2., units='deg') + self.ivc.add_output('geometry_morph_param', val=1.) + + # create dv_struct, which is the thickness of each structural element + thickness = 0.001*np.ones(N_el_struct) + self.ivc.add_output('dv_struct', thickness) + + # structure setup and builder + structure_setup = {'panel_chord' : panel_chord, + 'panel_width' : panel_width, + 'N_el' : N_el_struct} + struct_builder = StructBuilder(structure_setup) + + # aero builder + aero_setup = {'panel_chord' : panel_chord, + 'panel_width' : panel_width, + 'N_el' : N_el_aero} + aero_builder = AeroBuilder(aero_setup) + + # xfer builder + xfer_builder = XferBuilder( + aero_builder=aero_builder, + struct_builder=struct_builder + ) + + # geometry + builders = {'struct': struct_builder, 'aero': aero_builder} + geometry_builder = GeometryBuilder(builders) + + # add parallel multipoint group + self.add_subsystem('multipoint',AerostructParallel( + aero_builder=aero_builder, + struct_builder=struct_builder, + xfer_builder=xfer_builder, + geometry_builder=geometry_builder, + scenario_names=self.scenario_names)) + + for i in range(len(self.scenario_names)): + + # connect scalar inputs to the scenario + for var in ['modulus', 'yield_stress', 'density', 'dv_struct']: + self.connect(var, 'multipoint.'+self.scenario_names[i]+'.'+var) + + # connect vector inputs + for var in ['mach', 'qdyn']: #, 'aoa']: + self.connect(var, 'multipoint.'+self.scenario_names[i]+'.'+var, src_indices=[i]) + + self.connect(f'aoa{i+1}', 'multipoint.'+self.scenario_names[i]+'.aoa') + + # connect top-level geom parameter + self.connect('geometry_morph_param', 'multipoint.'+self.scenario_names[i]+'.geometry.geometry_morph_param') + + # add design vars + self.add_design_var('geometry_morph_param', lower=0.1, upper=10.0) + self.add_design_var('dv_struct', lower=1.e-4, upper=1.e-2, ref=1.e-3) + #self.add_design_var('aoa', lower=-10., upper=10.) + self.add_design_var('aoa1', lower=-20., upper=20.) + self.add_design_var('aoa2', lower=-20., upper=20.) + + # add objective/constraints + self.add_objective(f'multipoint.{self.scenario_names[0]}.mass', ref=0.01) + self.add_constraint(f'multipoint.{self.scenario_names[0]}.func_struct', upper=1.0, parallel_deriv_color='struct_cons') # run func_struct derivatives in parallel + self.add_constraint(f'multipoint.{self.scenario_names[1]}.func_struct', upper=1.0, parallel_deriv_color='struct_cons') + self.add_constraint(f'multipoint.{self.scenario_names[0]}.C_L', lower=0.15, ref=0.1, parallel_deriv_color='lift_cons') # run C_L derivatives in parallel + self.add_constraint(f'multipoint.{self.scenario_names[1]}.C_L', lower=0.45, ref=0.1, parallel_deriv_color='lift_cons') + +def get_model(scenario_names): + return Model(scenario_names=scenario_names) + +# run model and check derivatives +if __name__ == "__main__": + + prob = om.Problem() + prob.model = Model() + + if check_totals: + prob.setup(mode='rev') + om.n2(prob, show_browser=False, outfile='n2.html') + prob.run_model() + prob.check_totals(step_calc='rel_avg', + compact_print=True, + directional=False, + show_progress=True) + + else: + + # setup optimization driver + prob.driver = om.ScipyOptimizeDriver(debug_print=['nl_cons','objs','desvars','totals']) + prob.driver.options['optimizer'] = 'SLSQP' + prob.driver.options['tol'] = 1e-5 + prob.driver.options['disp'] = True + prob.driver.options['maxiter'] = 300 + + # add optimization recorder + prob.driver.recording_options['record_objectives'] = True + prob.driver.recording_options['record_constraints'] = True + prob.driver.recording_options['record_desvars'] = True + prob.driver.recording_options['record_derivatives'] = True + + recorder = om.SqliteRecorder("optimization_history.sql") + prob.driver.add_recorder(recorder) + + # run the optimization + prob.setup(mode='rev') + prob.run_driver() + prob.cleanup() + + # write out data + cr = om.CaseReader("optimization_history.sql") + driver_cases = cr.list_cases('driver') + + case = cr.get_case(0) + cons = case.get_constraints() + dvs = case.get_design_vars() + objs = case.get_objectives() + + f = open("optimization_history.dat","w+") + + for i, k in enumerate(objs.keys()): + f.write('objective: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(cons.keys()): + f.write('constraint: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(dvs.keys()): + f.write('DV: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + f.close() diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py new file mode 100644 index 00000000..c859d079 --- /dev/null +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py @@ -0,0 +1,151 @@ +import openmdao.api as om +from mphys.network.zmq_pbs import RemoteZeroMQComp +from pbs4py import PBS +import numpy as np + +check_totals = False # True=check objective/constraint derivatives, False=run optimization + +# for running scenarios on different servers in parallel +class ParallelRemoteGroup(om.ParallelGroup): + def initialize(self): + self.options.declare('num_scenarios') + def setup(self): + # NOTE: make sure setup isn't called multiple times, otherwise the first jobs/port forwarding will go unused and you'll have to stop them manually + for i in range(self.options['num_scenarios']): + + # initialize pbs4py Launcher + pbs_launcher = PBS.k4(time=1) + pbs_launcher.mpiexec = 'mpirun' + pbs_launcher.requested_number_of_nodes = 1 + + # output functions of interest, which aren't already added as objective/constraints on server side + if i==0: + additional_remote_outputs = [f'aerostructural{i}.mass', f'aerostructural{i}.C_L', f'aerostructural{i}.func_struct'] + else: # exclude mass (which comes from first scenario), otherwise mass derivatives will be computed needlessly + additional_remote_outputs = [f'aerostructural{i}.C_L', f'aerostructural{i}.func_struct'] + + # add the remote server component + self.add_subsystem(f'remote_scenario{i}', + RemoteZeroMQComp( + run_server_filename='mphys_server.py', + pbs=pbs_launcher, + port=5054+i*4, + acceptable_port_range=[5054+i*4, 5054+(i+1)*4-1], + dump_separate_json=True, + additional_remote_inputs=['mach', 'qdyn', 'aoa'], + additional_remote_outputs=additional_remote_outputs, + additional_server_args=f'--model_filename run --scenario_name aerostructural{i}'), + promotes_inputs=['geometry_morph_param', 'dv_struct'], # non-distributed IVCs + promotes_outputs=['*']) + +class TopLevelGroup(om.Group): + def setup(self): + if self.comm.size!=2: + raise SystemError('Please launch with 2 processors') + + # IVCs that feed into both parallel groups + self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) + + # distributed IVCs + self.ivc.add_output('mach', [5., 3.]) + self.ivc.add_output('qdyn', [3E4, 1E4]) + #self.ivc.add_output('aoa', [3., 2.]) # derivatives are wrong when using vector aoa and coloring; see OpenMDAO issue 2919 + self.ivc.add_output('aoa1', 3.) + self.ivc.add_output('aoa2', 2.) + + # add distributed design vars + #self.add_design_var('aoa', lower=-20., upper=20.) + self.add_design_var('aoa1', lower=-20., upper=20.) + self.add_design_var('aoa2', lower=-20., upper=20.) + + # add the parallel servers + self.add_subsystem('multipoint', ParallelRemoteGroup(num_scenarios=2), promotes=['*']) + + # connect distributed IVCs to servers, which are size (2,) and (1,) on client and server sides + for i in range(2): + for var in ['mach', 'qdyn']: #, 'aoa']: + self.connect(var, f'remote_scenario{i}.{var}', src_indices=[i]) + self.connect(f'aoa{i+1}', f'remote_scenario{i}.aoa') + + # add CL and stress constraints + min_CL = [0.15, 0.45] + for i in range(2): + self.add_constraint(f'aerostructural{i}:C_L', + lower=min_CL[i], + ref=0.1, + parallel_deriv_color='lift_cons') + self.add_constraint(f'aerostructural{i}:func_struct', + upper=1.0, + parallel_deriv_color='struct_cons') + + # add objective + self.add_objective('aerostructural0:mass', ref=0.01) + +# add remote component to the model +prob = om.Problem() +prob.model = TopLevelGroup() + +if check_totals: + prob.setup(mode='rev') + om.n2(prob, show_browser=False, outfile='n2.html') + prob.run_model() + prob.check_totals(step_calc='rel_avg', + compact_print=True, + directional=False, + show_progress=True) + +else: + + # setup optimization driver + prob.driver = om.ScipyOptimizeDriver(debug_print=['nl_cons','objs','desvars','totals']) + prob.driver.options['optimizer'] = 'SLSQP' + prob.driver.options['tol'] = 1e-5 + prob.driver.options['disp'] = True + prob.driver.options['maxiter'] = 300 + + # add optimization recorder + prob.driver.recording_options['record_objectives'] = True + prob.driver.recording_options['record_constraints'] = True + prob.driver.recording_options['record_desvars'] = True + prob.driver.recording_options['record_derivatives'] = True + + recorder = om.SqliteRecorder("optimization_history_parallel.sql") + prob.driver.add_recorder(recorder) + + # run the optimization + prob.setup(mode='rev') + prob.run_driver() + prob.cleanup() + + # write out data + if prob.model.comm.rank==0: + cr = om.CaseReader("optimization_history_parallel.sql") + driver_cases = cr.list_cases('driver') + + case = cr.get_case(0) + cons = case.get_constraints() + dvs = case.get_design_vars() + objs = case.get_objectives() + + with open("optimization_history.dat","w+") as f: + + for i, k in enumerate(objs.keys()): + f.write('objective: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(cons.keys()): + f.write('constraint: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(dvs.keys()): + f.write('DV: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + +# shutdown each rank's server +eval(f'prob.model.multipoint.remote_scenario{prob.model.comm.rank}.stop_server()') diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py new file mode 100644 index 00000000..16a985c3 --- /dev/null +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py @@ -0,0 +1,93 @@ +import openmdao.api as om +from mphys.network.zmq_pbs import RemoteZeroMQComp +from pbs4py import PBS + +check_totals = False # True=check objective/constraint derivatives, False=run optimization + +# initialize pbs4py +pbs = PBS.k4(time=1) +pbs.mpiexec = 'mpirun' +pbs.requested_number_of_nodes = 1 + +# add remote component to the model +prob = om.Problem() +prob.model.add_subsystem('remote', + RemoteZeroMQComp( + run_server_filename='mphys_server.py', # default server filename + pbs=pbs, + additional_server_args='--model_filename as_opt_parallel ' + + '--scenario_name cruise pullup') # customizable options for server file + ) + +if check_totals: + prob.setup(mode='rev') + om.n2(prob, show_browser=False, outfile='n2.html') + prob.run_model() + prob.check_totals(step_calc='rel_avg', + compact_print=True, + directional=False, + show_progress=True) + prob.model.remote.stop_server() + +else: + + # setup optimization driver + prob.driver = om.ScipyOptimizeDriver(debug_print=['nl_cons','objs','desvars','totals']) + prob.driver.options['optimizer'] = 'SLSQP' + prob.driver.options['tol'] = 1e-5 + prob.driver.options['disp'] = True + prob.driver.options['maxiter'] = 300 + + # add optimization recorder + prob.driver.recording_options['record_objectives'] = True + prob.driver.recording_options['record_constraints'] = True + prob.driver.recording_options['record_desvars'] = True + prob.driver.recording_options['record_derivatives'] = True + + recorder = om.SqliteRecorder("optimization_history.sql") + prob.driver.add_recorder(recorder) + + # run the optimization + prob.setup(mode='rev') + prob.run_driver() + prob.model.remote.stop_server() + prob.cleanup() + + # write out data + cr = om.CaseReader("optimization_history.sql") + driver_cases = cr.list_cases('driver') + + case = cr.get_case(0) + cons = case.get_constraints() + dvs = case.get_design_vars() + objs = case.get_objectives() + + with open("optimization_history.dat","w+") as f: + + for i, k in enumerate(objs.keys()): + f.write('objective: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(cons.keys()): + f.write('constraint: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(dvs.keys()): + f.write('DV: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + f.write('run times, function\n') + for i in range(len(prob.model.remote.times_function)): + f.write(f'{prob.model.remote.times_function[i]}\n') + f.write(' ' + '\n') + + f.write('run times, gradient\n') + for i in range(len(prob.model.remote.times_gradient)): + f.write(f'{prob.model.remote.times_gradient[i]}\n') + f.write(' ' + '\n') diff --git a/examples/aerostructural/supersonic_panel/mphys_server.py b/examples/aerostructural/supersonic_panel/mphys_server.py new file mode 100644 index 00000000..63448f5d --- /dev/null +++ b/examples/aerostructural/supersonic_panel/mphys_server.py @@ -0,0 +1,30 @@ +from mphys.network.zmq_pbs import MPhysZeroMQServer, get_default_zmq_pbs_argparser + +class GetModel: + def __init__(self, scenario_name: str, model_filename: str): + self.scenario_name = scenario_name + self.model_filename = model_filename + def __call__(self): + return __import__(self.model_filename).get_model(self.scenario_name) + +def run_server(args): + get_model = GetModel(args.scenario_name, args.model_filename) + server = MPhysZeroMQServer(args.port, + get_om_group_function_pointer=get_model, + ignore_setup_warnings=True, + ignore_runtime_warnings=True, + rerun_initial_design=True) + server.run() + +if __name__ == "__main__": + # argparse with port number input + parser = get_default_zmq_pbs_argparser() + + # add some customizable options + parser.add_argument('--model_filename', type=str, default='run', + help='filename (excluding .py) containing the get_model function to set as get_om_group_function_pointer') + parser.add_argument('--scenario_name', nargs='+', type=str, default=['cruise'], + help='scenario name to feed into get_om_group_function_pointer') + + args = parser.parse_args() + run_server(args) diff --git a/examples/aerostructural/supersonic_panel/run.py b/examples/aerostructural/supersonic_panel/run.py index bb208499..a645abe8 100644 --- a/examples/aerostructural/supersonic_panel/run.py +++ b/examples/aerostructural/supersonic_panel/run.py @@ -1,13 +1,14 @@ import numpy as np from mpi4py import MPI import openmdao.api as om +import os from mphys import Multipoint from mphys.scenario_aerostructural import ScenarioAeroStructural from structures_mphys import StructBuilder from aerodynamics_mphys import AeroBuilder -from xfer_mphys import XferBuilder +from xfer_mphys import XferBuilder from geometry_morph import GeometryBuilder @@ -24,7 +25,11 @@ # Mphys class Model(Multipoint): + def initialize(self): + self.options.declare('scenario_name', default='aerostructural') + def setup(self): + self.scenario_name = self.options['scenario_name'] # ivc self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) @@ -73,19 +78,34 @@ def setup(self): self.connect('struct_mesh.x_struct0', 'x_struct_in') self.connect('aero_mesh.x_aero0', 'x_aero_in') - + + # create the run directory + if self.comm.rank==0: + if not os.path.isdir(self.scenario_name): + os.mkdir(self.scenario_name) + self.comm.Barrier() + # aerostructural analysis nonlinear_solver = om.NonlinearBlockGS(maxiter=100, iprint=2, use_aitken=True, aitken_initial_factor=0.5) linear_solver = om.LinearBlockGS(maxiter=40, iprint=2, use_aitken=True, aitken_initial_factor=0.5) - self.mphys_add_scenario('aerostructural', + self.mphys_add_scenario(self.scenario_name, ScenarioAeroStructural( - aero_builder=aero_builder, struct_builder=struct_builder, - ldxfer_builder=xfer_builder), + aero_builder=aero_builder, + struct_builder=struct_builder, + ldxfer_builder=xfer_builder, + run_directory=self.scenario_name), coupling_nonlinear_solver=nonlinear_solver, coupling_linear_solver=linear_solver) for var in ['modulus', 'yield_stress', 'density', 'mach', 'qdyn', 'aoa', 'dv_struct', 'x_struct0', 'x_aero0']: - self.connect(var, 'aerostructural.'+var) + self.connect(var, self.scenario_name+'.'+var) + + # add design variables, to simplify remote setup + self.add_design_var('geometry_morph_param', lower=0.1, upper=10.0) + self.add_design_var('dv_struct', lower=1.e-4, upper=1.e-2, ref=1.e-3) + +def get_model(scenario_name): + return Model(scenario_name=scenario_name[0]) # run model and check derivatives if __name__ == "__main__": @@ -95,24 +115,24 @@ def setup(self): prob.setup(mode='rev') om.n2(prob, show_browser=False, outfile='n2.html') - + prob.run_model() print('mass = ' + str(prob['aerostructural.mass'])) print('func_struct = ' + str(prob['aerostructural.func_struct'])) print('C_L = ' + str(prob['aerostructural.C_L'])) - + prob.check_totals( of=['aerostructural.mass', 'aerostructural.func_struct', - 'aerostructural.C_L'], + 'aerostructural.C_L'], wrt=['modulus', 'yield_stress', 'density', 'mach', 'qdyn', - 'aoa', + 'aoa', 'dv_struct', - 'geometry_morph_param'], + 'geometry_morph_param'], step_calc='rel_avg', compact_print=True ) diff --git a/mphys/network/__init__.py b/mphys/network/__init__.py new file mode 100644 index 00000000..5585d43e --- /dev/null +++ b/mphys/network/__init__.py @@ -0,0 +1,3 @@ +from .server_manager import ServerManager +from .remote_component import RemoteComp +from .server import Server diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py new file mode 100644 index 00000000..2800a05c --- /dev/null +++ b/mphys/network/remote_component.py @@ -0,0 +1,279 @@ +import openmdao.api as om +import json, time, os +import numpy as np + +class RemoteComp(om.ExplicitComponent): + """ + A component used for network communication between top-level OpenMDAO + problem and remote problem evaluated on an HPC job. Serves as the + top-level component on the client side. + + To make a particular derived class, implement the _setup_server_manager, + _send_inputs_to_server, and _receive_outputs_from_server functions. + """ + def stop_server(self): + # shortcut for stopping server from top level + self.server_manager.stop_server() + + def initialize(self): + self.options.declare('run_server_filename', default="mphys_server.py", desc="python file that will launch the Server class") + self.options.declare('time_estimate_multiplier', default=2.0, desc="when determining whether to reboot the server, estimate model run time as this times max prior run time") + self.options.declare('reboot_only_on_function_call', default=True, desc="only allows server reboot before function call, not gradient call. " + +"Avoids having to rerun forward solution on next job, but shortens current job time") + self.options.declare('dump_json', default=False, desc="dump input/output json file in client") + self.options.declare('dump_separate_json', default=False, desc="dump a separate input/output json file for each evaluation") + self.options.declare('var_naming_dot_replacement', default=":", desc="what to replace '.' within dv/response name trees") + self.options.declare('additional_remote_inputs', default=[], types=list, desc="additional inputs not defined as design vars in the remote component") + self.options.declare('additional_remote_outputs', default=[], types=list, desc="additional outputs not defined as objective/constraints in the remote component") + self.options.declare('use_derivative_coloring', default=False, types=bool, desc="assign derivative coloring to objective/constraints. Only for cases with parallel servers") + + def setup(self): + if self.comm.size>1: + raise SystemError('Using Remote Component on more than 1 rank is not supported') + self.time_estimate_multiplier = self.options['time_estimate_multiplier'] + self.reboot_only_on_function_call = self.options['reboot_only_on_function_call'] + self.dump_json = self.options['dump_json'] + self.dump_separate_json = self.options['dump_separate_json'] + self.var_naming_dot_replacement = self.options['var_naming_dot_replacement'] + self.additional_remote_inputs = self.options['additional_remote_inputs'] + self.additional_remote_outputs = self.options['additional_remote_outputs'] + self.use_derivative_coloring = self.options['use_derivative_coloring'] + self.derivative_coloring_num = 0 + if self.dump_separate_json: + self.dump_json = True + + self._setup_server_manager() + + # for tracking model times, and determining whether to relaunch servers + self.times_function = np.array([]) + self.times_gradient = np.array([]) + + # get baseline model + print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True) + output_dict = self.evaluate_model(command='initialize', + remote_input_dict={'additional_inputs': self.additional_remote_inputs, + 'additional_outputs': self.additional_remote_outputs, + 'component_name': self.name}) + + self._add_design_inputs_from_baseline_model(output_dict) + self._add_objectives_from_baseline_model(output_dict) + self._add_constraints_from_baseline_model(output_dict) + + self._add_additional_inputs_from_baseline_model(output_dict) + self._add_additional_outputs_from_baseline_model(output_dict) + + self.declare_partials('*', '*') + + def compute(self,inputs,outputs): + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate') + + self._assign_objectives_from_remote_output(remote_dict, outputs) + self._assign_constraints_from_remote_output(remote_dict, outputs) + self._assign_additional_outputs_from_remote_output(remote_dict, outputs) + + def compute_partials(self, inputs, partials): + # NOTE: this will not use of and wrt inputs, if given in outer script's compute_totals/check_totals + + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives') + + self._assign_objective_partials_from_remote_output(remote_dict, partials) + self._assign_constraint_partials_from_remote_output(remote_dict, partials) + self._assign_additional_partials_from_remote_output(remote_dict, partials) + + def evaluate_model(self, remote_input_dict=None, command='initialize'): + if self._need_to_restart_server(command): + self.server_manager.stop_server() + self.server_manager.start_server() + + if self.dump_json: + self._dump_json(remote_input_dict, command) + + model_start_time = time.time() + self._send_inputs_to_server(remote_input_dict, command) + remote_output_dict = self._receive_outputs_from_server() + + model_time_elapsed = time.time() - model_start_time + + if self.dump_json: + remote_output_dict.update({'wall_time': model_time_elapsed}) + self._dump_json(remote_output_dict, command) + + if self._doing_derivative_evaluation(command): + self.times_gradient = np.hstack([self.times_gradient, model_time_elapsed]) + else: + self.times_function = np.hstack([self.times_function, model_time_elapsed]) + + return remote_output_dict + + def _assign_objective_partials_from_remote_output(self, remote_dict, partials): + for obj in remote_dict['objective'].keys(): + for dv in remote_dict['design_vars'].keys(): + partials[( obj.replace('.',self.var_naming_dot_replacement), dv.replace('.',self.var_naming_dot_replacement) )] = remote_dict['objective'][obj]['derivatives'][dv] + for inp in remote_dict['additional_inputs'].keys(): + partials[( obj.replace('.',self.var_naming_dot_replacement), inp.replace('.',self.var_naming_dot_replacement) )] = remote_dict['objective'][obj]['derivatives'][inp] + + def _assign_constraint_partials_from_remote_output(self, remote_dict, partials): + for con in remote_dict['constraints'].keys(): + for dv in remote_dict['design_vars'].keys(): + partials[( con.replace('.',self.var_naming_dot_replacement), dv.replace('.',self.var_naming_dot_replacement) )] = remote_dict['constraints'][con]['derivatives'][dv] + for inp in remote_dict['additional_inputs'].keys(): + partials[( con.replace('.',self.var_naming_dot_replacement), inp.replace('.',self.var_naming_dot_replacement) )] = remote_dict['constraints'][con]['derivatives'][inp] + + def _assign_additional_partials_from_remote_output(self, remote_dict, partials): + for output in remote_dict['additional_outputs'].keys(): + for dv in remote_dict['design_vars'].keys(): + partials[( output.replace('.',self.var_naming_dot_replacement), dv.replace('.',self.var_naming_dot_replacement) )] = remote_dict['additional_outputs'][output]['derivatives'][dv] + for inp in remote_dict['additional_inputs'].keys(): + partials[( output.replace('.',self.var_naming_dot_replacement), inp.replace('.',self.var_naming_dot_replacement) )] = remote_dict['additional_outputs'][output]['derivatives'][inp] + + def _create_input_dict_for_server(self, inputs): + input_dict = {'design_vars': {}, 'additional_inputs': {}, 'additional_outputs': self.additional_remote_outputs, 'component_name': self.name} + for dv in self.design_var_keys: + input_dict['design_vars'][dv.replace('.',self.var_naming_dot_replacement)] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()} + for input in self.additional_remote_inputs: + input_dict['additional_inputs'][input] = {'val': inputs[input.replace('.',self.var_naming_dot_replacement)].tolist()} + return input_dict + + def _doing_derivative_evaluation(self, command: str): + return command == 'evaluate derivatives' + + def _is_first_function_evaluation(self): + return len(self.times_function) == 0 + + def _is_first_gradient_evaluation(self): + return len(self.times_gradient) == 0 + + def _need_to_restart_server(self, command: str): + if self._doing_derivative_evaluation(command): + if self._is_first_gradient_evaluation() or self.reboot_only_on_function_call: + return False + else: + estimated_model_time = self.time_estimate_multiplier*max(self.times_gradient) + + else: + if self._is_first_function_evaluation(): + return False + else: + if self.reboot_only_on_function_call and not self._is_first_gradient_evaluation(): + estimated_model_time = self.time_estimate_multiplier*(max(self.times_function)+max(self.times_gradient)) + else: + estimated_model_time = self.time_estimate_multiplier*max(self.times_function) + return not self.server_manager.enough_time_is_remaining(estimated_model_time) + + def _dump_json(self, remote_dict: dict, command: str): + if 'objective' in remote_dict.keys(): + dict_type = 'outputs' + else: + dict_type = 'inputs' + if self.dump_separate_json: + save_dir = 'remote_json_files' + if not os.path.isdir(save_dir): + try: + os.mkdir(save_dir) + except: pass # may have been created by now, by a parallel server + if self._doing_derivative_evaluation(command): + filename = f'{save_dir}/{self.name}_{dict_type}_derivative{len(self.times_gradient)}.json' + else: + filename = f'{save_dir}/{self.name}_{dict_type}_function{len(self.times_function)}.json' + else: + filename = f'{self.name}_{dict_type}.json' + with open(filename, 'w') as f: + json.dump(remote_dict, f, indent=4) + + def _add_design_inputs_from_baseline_model(self, output_dict): + self.design_var_keys = output_dict['design_vars'].keys() + for dv in self.design_var_keys: + self.add_input(dv.replace('.',self.var_naming_dot_replacement), output_dict['design_vars'][dv]['val']) + if dv not in self._design_vars.keys(): + self.add_design_var(dv.replace('.',self.var_naming_dot_replacement), + ref=output_dict['design_vars'][dv]['ref'], + ref0=output_dict['design_vars'][dv]['ref0'], + lower=output_dict['design_vars'][dv]['lower'], + upper=output_dict['design_vars'][dv]['upper'], + scaler=output_dict['design_vars'][dv]['scaler'], + adder=output_dict['design_vars'][dv]['adder']) + + def _add_additional_inputs_from_baseline_model(self, output_dict): + self.additional_remote_inputs = list(output_dict['additional_inputs'].keys()) + for input in self.additional_remote_inputs: + self.add_input(input.replace('.',self.var_naming_dot_replacement), + output_dict['additional_inputs'][input]['val']) + + def _add_additional_outputs_from_baseline_model(self, output_dict): + self.additional_remote_outputs = list(output_dict['additional_outputs'].keys()) + for output in self.additional_remote_outputs: + self.add_output(output.replace('.',self.var_naming_dot_replacement), + output_dict['additional_outputs'][output]['val']) + + def _add_objectives_from_baseline_model(self, output_dict): + for obj in output_dict['objective'].keys(): + self.add_output(obj.replace('.',self.var_naming_dot_replacement), output_dict['objective'][obj]['val']) + self.add_objective(obj.replace('.',self.var_naming_dot_replacement), + ref=output_dict['objective'][obj]['ref'], + ref0=output_dict['objective'][obj]['ref0'], + scaler=output_dict['objective'][obj]['scaler'], + adder=output_dict['objective'][obj]['adder'], + parallel_deriv_color=f'color{self.derivative_coloring_num}' if self.use_derivative_coloring else None) + self.derivative_coloring_num += 1 + + def _add_constraints_from_baseline_model(self, output_dict): + for con in output_dict['constraints'].keys(): + self.add_output(con.replace('.',self.var_naming_dot_replacement), output_dict['constraints'][con]['val']) + if output_dict['constraints'][con]['equals'] is not None: # equality constraint + self.add_constraint(con.replace('.',self.var_naming_dot_replacement), + ref=output_dict['constraints'][con]['ref'], + ref0=output_dict['constraints'][con]['ref0'], + equals=output_dict['constraints'][con]['equals'], + scaler=output_dict['constraints'][con]['scaler'], + adder=output_dict['constraints'][con]['adder'], + parallel_deriv_color=f'color{self.derivative_coloring_num}' if self.use_derivative_coloring else None) + else: + if output_dict['constraints'][con]['lower']>-1e20 and output_dict['constraints'][con]['upper']<1e20: # enforce lower and upper bounds + self.add_constraint(con.replace('.',self.var_naming_dot_replacement), + ref=output_dict['constraints'][con]['ref'], + ref0=output_dict['constraints'][con]['ref0'], + lower=output_dict['constraints'][con]['lower'], + upper=output_dict['constraints'][con]['upper'], + scaler=output_dict['constraints'][con]['scaler'], + adder=output_dict['constraints'][con]['adder'], + parallel_deriv_color=f'color{self.derivative_coloring_num}' if self.use_derivative_coloring else None) + elif output_dict['constraints'][con]['lower']>-1e20: # enforce lower bound + self.add_constraint(con.replace('.',self.var_naming_dot_replacement), + ref=output_dict['constraints'][con]['ref'], + ref0=output_dict['constraints'][con]['ref0'], + lower=output_dict['constraints'][con]['lower'], + scaler=output_dict['constraints'][con]['scaler'], + adder=output_dict['constraints'][con]['adder'], + parallel_deriv_color=f'color{self.derivative_coloring_num}' if self.use_derivative_coloring else None) + else: # enforce upper bound + self.add_constraint(con.replace('.',self.var_naming_dot_replacement), + ref=output_dict['constraints'][con]['ref'], + ref0=output_dict['constraints'][con]['ref0'], + upper=output_dict['constraints'][con]['upper'], + scaler=output_dict['constraints'][con]['scaler'], + adder=output_dict['constraints'][con]['adder'], + parallel_deriv_color=f'color{self.derivative_coloring_num}' if self.use_derivative_coloring else None) + self.derivative_coloring_num += 1 + + def _assign_objectives_from_remote_output(self, remote_dict, outputs): + for obj in remote_dict['objective'].keys(): + outputs[obj.replace('.',self.var_naming_dot_replacement)] = remote_dict['objective'][obj]['val'] + + def _assign_constraints_from_remote_output(self, remote_dict, outputs): + for con in remote_dict['constraints'].keys(): + outputs[con.replace('.',self.var_naming_dot_replacement)] = remote_dict['constraints'][con]['val'] + + def _assign_additional_outputs_from_remote_output(self, remote_dict, outputs): + for output in remote_dict['additional_outputs'].keys(): + outputs[output.replace('.',self.var_naming_dot_replacement)] = remote_dict['additional_outputs'][output]['val'] + + def _send_inputs_to_server(self, remote_input_dict, command: str): + raise NotImplementedError + + def _receive_outputs_from_server(self): + raise NotImplementedError + + def _setup_server_manager(self): + raise NotImplementedError diff --git a/mphys/network/server.py b/mphys/network/server.py new file mode 100644 index 00000000..1fb509bd --- /dev/null +++ b/mphys/network/server.py @@ -0,0 +1,353 @@ +import openmdao.api as om +import warnings + +class Server: + """ + A class that serves as an OpenMDAO model analysis server. Launched + by a server run file by the ServerManager and runs on an HPC job, + awaiting design variables to evaluate and sending back resulting + function or derivative information. + + To make a particular derived class, implement the _parse_incoming_message + and _send_outputs_to_client functions. + + Parameters + ---------- + get_om_group_function_pointer : function pointer + Pointer to the OpenMDAO/MPhys group to evaluate on the server + ignore_setup_warnings : bool + Whether to ignore OpenMDAO setup warnings + ignore_runtime_warnings : bool + Whether to ignore OpenMDAO runtime warnings + rerun_initial_design : bool + Whether to evaluate the baseline design upon starup + """ + def __init__(self, get_om_group_function_pointer, + ignore_setup_warnings = False, + ignore_runtime_warnings = False, + rerun_initial_design = False): + + self.get_om_group_function_pointer = get_om_group_function_pointer + self.ignore_setup_warnings = ignore_setup_warnings + self.ignore_runtime_warnings = ignore_runtime_warnings + self.rerun_initial_design = rerun_initial_design + + self.current_design_has_been_evaluated = False + self.current_derivatives_have_been_evaluated = False + self.derivatives = None + self.additional_inputs = None + self.additional_outputs = None + self.design_counter = 0 # more debugging info for client side json dumping + + self._load_the_model() + + def _parse_incoming_message(self): + raise NotImplementedError + + def _send_outputs_to_client(self): + raise NotImplementedError + + def _load_the_model(self): + self.prob = om.Problem() + self.prob.model = self.get_om_group_function_pointer() + if self.ignore_setup_warnings: + with warnings.catch_warnings(record=True) as w: + self.prob.setup(mode='rev') + else: + self.prob.setup(mode='rev') + self.rank = self.prob.model.comm.rank + + # temporary fix for MELD initialization issue + if self.rerun_initial_design: + if self.rank==0: + print('SERVER: Evaluating baseline design', flush=True) + self._run_model() + + def _run_model(self): + if self.ignore_runtime_warnings: + with warnings.catch_warnings(record=True) as w: + self.prob.run_model() + else: + self.prob.run_model() + self.current_design_has_been_evaluated = True + self.derivatives = None + self.design_counter += 1 + + def _compute_totals(self): + of, wrt = self._get_derivative_inputs_outputs() + if self.ignore_runtime_warnings: + with warnings.catch_warnings(record=True) as w: + self.derivatives = self.prob.compute_totals(of=of, wrt=wrt) + else: + self.derivatives = self.prob.compute_totals(of=of, wrt=wrt) + self.current_derivatives_have_been_evaluated = True + + def _get_derivative_inputs_outputs(self): + of = [] + for r in self.prob.model._responses.keys(): + of += [self.prob.model._responses[r]['source']] + of += self.additional_outputs + + wrt = [] + for dv in self.prob.model._design_vars.keys(): + wrt += [self.prob.model._design_vars[dv]['source']] + wrt += self.additional_inputs + + return of, wrt + + def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}): + design_vars = self.prob.model._design_vars + remote_output_dict['design_vars'] = {} + for dv in design_vars.keys(): + remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv), + 'ref': design_vars[dv]['ref'], + 'ref0': design_vars[dv]['ref0'], + 'lower': design_vars[dv]['lower'], + 'upper': design_vars[dv]['upper'], + 'units': design_vars[dv]['units']} + remote_output_dict['design_vars'][dv] = self._set_reference_vals(remote_output_dict['design_vars'][dv], design_vars[dv]) + remote_output_dict['design_vars'][dv] = self._apply_reference_vals_to_desvar_bounds(remote_output_dict['design_vars'][dv]) + + # convert to lists for json input/output + for key in remote_output_dict['design_vars'][dv].keys(): + if hasattr(remote_output_dict['design_vars'][dv][key], 'tolist'): + remote_output_dict['design_vars'][dv][key] = remote_output_dict['design_vars'][dv][key].tolist() + return remote_output_dict + + def _gather_additional_inputs_from_om_problem(self, remote_output_dict = {}): + remote_output_dict['additional_inputs'] = {} + for input in self.additional_inputs: + remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input)} + if hasattr(remote_output_dict['additional_inputs'][input]['val'], 'tolist'): + remote_output_dict['additional_inputs'][input]['val'] = remote_output_dict['additional_inputs'][input]['val'].tolist() + return remote_output_dict + + def _gather_design_outputs_from_om_problem(self, remote_output_dict = {}): + responses = self.prob.model._responses + remote_output_dict.update({'objective':{}, 'constraints':{}}) + for r in responses.keys(): + + if responses[r]['type']=='obj': + response_type = 'objective' + elif responses[r]['type']=='con': + response_type = 'constraints' + + remote_output_dict[response_type][r] = {'val': self.prob.get_val(r, get_remote=True), + 'ref': responses[r]['ref'], + 'ref0': responses[r]['ref0']} + remote_output_dict[response_type][r] = self._set_reference_vals(remote_output_dict[response_type][r], responses[r]) + + if response_type=='constraints': # get constraint bounds + remote_output_dict[response_type][r].update({'lower': responses[r]['lower'], + 'upper': responses[r]['upper'], + 'equals': responses[r]['equals']}) + remote_output_dict[response_type][r] = self._apply_reference_vals_to_constraint_bounds(remote_output_dict[response_type][r]) + + # convert to lists for json input/output + for key in remote_output_dict[response_type][r].keys(): + if hasattr(remote_output_dict[response_type][r][key], 'tolist'): + remote_output_dict[response_type][r][key] = remote_output_dict[response_type][r][key].tolist() + return remote_output_dict + + def _set_reference_vals(self, remote_dict, om_dict): + if remote_dict['ref'] is not None or remote_dict['ref0'] is not None: # using ref/ref0 + remote_dict.update({'scaler': None, + 'adder': None}) + if remote_dict['ref'] is None: + remote_dict['ref'] = 1.0 + if remote_dict['ref0'] is None: + remote_dict['ref0'] = 0.0 + else: # using adder/scaler + remote_dict.update({'scaler': om_dict['scaler'], + 'adder': om_dict['adder']}) + if remote_dict['scaler'] is None: + remote_dict['scaler'] = 1.0 + if remote_dict['adder'] is None: + remote_dict['adder'] = 0.0 + return remote_dict + + def _apply_reference_vals_to_desvar_bounds(self, desvar_dict): + if desvar_dict['adder'] is None and desvar_dict['scaler'] is None: # using ref/ref0 + desvar_dict['lower'] = desvar_dict['lower']*(desvar_dict['ref']-desvar_dict['ref0']) + desvar_dict['ref0'] + desvar_dict['upper'] = desvar_dict['upper']*(desvar_dict['ref']-desvar_dict['ref0']) + desvar_dict['ref0'] + else: # using adder/scaler + desvar_dict['lower'] = desvar_dict['lower']/desvar_dict['scaler'] - desvar_dict['adder'] + desvar_dict['upper'] = desvar_dict['upper']/desvar_dict['scaler'] - desvar_dict['adder'] + return desvar_dict + + def _apply_reference_vals_to_constraint_bounds(self, constraint_dict): + if constraint_dict['adder'] is None and constraint_dict['scaler'] is None: # using ref/ref0 + if constraint_dict['equals'] is not None: # equality constraint + constraint_dict['equals'] = constraint_dict['equals']*(constraint_dict['ref']-constraint_dict['ref0']) + constraint_dict['ref0'] + else: + if constraint_dict['lower']>-1e20: + constraint_dict['lower'] = constraint_dict['lower']*(constraint_dict['ref']-constraint_dict['ref0']) + constraint_dict['ref0'] + if constraint_dict['upper']<1e20: + constraint_dict['upper'] = constraint_dict['upper']*(constraint_dict['ref']-constraint_dict['ref0']) + constraint_dict['ref0'] + else: # using adder/scaler + if constraint_dict['equals'] is not None: # equality constraint + constraint_dict['equals'] = constraint_dict['equals']/constraint_dict['scaler'] - constraint_dict['adder'] + else: + if constraint_dict['lower']>-1e20: + constraint_dict['lower'] = constraint_dict['lower']/constraint_dict['scaler'] - constraint_dict['adder'] + if constraint_dict['upper']<1e20: + constraint_dict['upper'] = constraint_dict['upper']/constraint_dict['scaler'] - constraint_dict['adder'] + return constraint_dict + + def _gather_additional_outputs_from_om_problem(self, remote_output_dict = {}): + remote_output_dict['additional_outputs'] = {} + for output in self.additional_outputs: + remote_output_dict['additional_outputs'][output] = {'val': self.prob.get_val(output, get_remote=True)} + if hasattr(remote_output_dict['additional_outputs'][output]['val'], 'tolist'): + remote_output_dict['additional_outputs'][output]['val'] = remote_output_dict['additional_outputs'][output]['val'].tolist() + return remote_output_dict + + def _gather_design_derivatives_from_om_problem(self, remote_output_dict): + design_vars = self.prob.model._design_vars + responses = self.prob.model._responses + for r in responses.keys(): + + if responses[r]['type']=='obj': + response_type = 'objective' + elif responses[r]['type']=='con': + response_type = 'constraints' + + remote_output_dict[response_type][r]['derivatives'] = {} + for dv in design_vars.keys(): + deriv = self.derivatives[(responses[r]['source'], design_vars[dv]['source'])] + if hasattr(deriv,'tolist'): + deriv = deriv.tolist() + remote_output_dict[response_type][r]['derivatives'][dv] = deriv + return remote_output_dict + + def _gather_additional_output_derivatives_from_om_problem(self, remote_output_dict): + for output in self.additional_outputs: + remote_output_dict['additional_outputs'][output]['derivatives'] = {} + + # wrt design vars + for dv in self.prob.model._design_vars.keys(): + deriv = self.derivatives[( output , self.prob.model._design_vars[dv]['source'] )] + if hasattr(deriv,'tolist'): + deriv = deriv.tolist() + remote_output_dict['additional_outputs'][output]['derivatives'][dv] = deriv + + # wrt additional_inputs + for dv in self.additional_inputs: + deriv = self.derivatives[( output , dv )] + if hasattr(deriv,'tolist'): + deriv = deriv.tolist() + remote_output_dict['additional_outputs'][output]['derivatives'][dv] = deriv + + return remote_output_dict + + def _gather_additional_input_derivatives_from_om_problem(self, remote_output_dict): + responses = self.prob.model._responses + for r in responses.keys(): + + if responses[r]['type']=='obj': + response_type = 'objective' + elif responses[r]['type']=='con': + response_type = 'constraints' + + for dv in self.additional_inputs: + deriv = self.derivatives[( responses[r]['source'] , dv )] + if hasattr(deriv,'tolist'): + deriv = deriv.tolist() + remote_output_dict[response_type][r]['derivatives'][dv] = deriv + return remote_output_dict + + def _gather_inputs_and_outputs_from_om_problem(self): + remote_output_dict = self._gather_design_inputs_from_om_problem() + remote_output_dict = self._gather_design_outputs_from_om_problem(remote_output_dict) + remote_output_dict = self._gather_additional_inputs_from_om_problem(remote_output_dict) + remote_output_dict = self._gather_additional_outputs_from_om_problem(remote_output_dict) + if self.derivatives is not None: + remote_output_dict = self._gather_design_derivatives_from_om_problem(remote_output_dict) + remote_output_dict = self._gather_additional_output_derivatives_from_om_problem(remote_output_dict) + remote_output_dict = self._gather_additional_input_derivatives_from_om_problem(remote_output_dict) + remote_output_dict['design_counter'] = self.design_counter + return remote_output_dict + + def _set_design_variables_into_the_server_problem(self, input_dict): + design_changed = False + for key in input_dict['design_vars'].keys(): + if (self.prob.get_val(key)!=input_dict['design_vars'][key]['val']).any(): + design_changed = True + self.prob.set_val(key, input_dict['design_vars'][key]['val']) + return design_changed + + def _set_additional_inputs_into_the_server_problem(self, input_dict, design_changed): + for key in input_dict['additional_inputs'].keys(): + if (self.prob.get_val(key)!=input_dict['additional_inputs'][key]['val']).any(): + design_changed = True + self.prob.set_val(key, input_dict['additional_inputs'][key]['val']) + return design_changed + + def _save_additional_variable_names(self, input_dict): + self.additional_inputs = input_dict['additional_inputs'] + self.additional_outputs = input_dict['additional_outputs'] + if hasattr(self.additional_inputs,'keys'): + self.additional_inputs = list(self.additional_inputs.keys()) + + def run(self): + """ + Run the server. + """ + while True: + + if self.rank==0: + print('SERVER: Waiting for new design...', flush=True) + + command, input_dict = self._parse_incoming_message() + + # interpret command (options are "shutdown", "initialize", "evaluate", or "evaluate derivatives") + if command=='shutdown': + if self.rank==0: + print('SERVER: Received signal to shutdown', flush=True) + break + + self._save_additional_variable_names(input_dict) + + if command=='initialize': # evaluate baseline model for RemoteComp setup + if self.rank==0: + print('SERVER: Initialization requested... using baseline design', flush=True) + if self.current_design_has_been_evaluated: + if self.rank==0: + print('SERVER: Design already evaluated, skipping run_model', flush=True) + else: + self._run_model() + else: + design_changed = self._set_design_variables_into_the_server_problem(input_dict) + design_changed = self._set_additional_inputs_into_the_server_problem(input_dict, design_changed) + if design_changed: + self.current_design_has_been_evaluated = False + self.current_derivatives_have_been_evaluated = False + + if command=='evaluate derivatives': # compute derivatives + if self.current_derivatives_have_been_evaluated: + if self.rank==0: + print('SERVER: Derivatives already evaluated, skipping compute_totals', flush=True) + else: + if not self.current_design_has_been_evaluated: + if self.rank==0: + print('SERVER: Derivative needed, but design has changed... evaluating forward solution first', flush=True) + self._run_model() + if self.rank==0: + print('SERVER: Evaluating derivatives', flush=True) + self._compute_totals() + + elif command=='evaluate': # run model + if self.current_design_has_been_evaluated: + if self.rank==0: + print('SERVER: Design already evaluated, skipping run_model', flush=True) + else: + if self.rank==0: + print('SERVER: Evaluating design', flush=True) + self._run_model() + + # gather/return outputs + output_dict = self._gather_inputs_and_outputs_from_om_problem() + self._send_outputs_to_client(output_dict) + + # write current n2 with values + om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html") diff --git a/mphys/network/server_manager.py b/mphys/network/server_manager.py new file mode 100644 index 00000000..dda1c75d --- /dev/null +++ b/mphys/network/server_manager.py @@ -0,0 +1,31 @@ +class ServerManager: + """ + A class used by the client-side RemoteComp to facilitate communication + with the remote, server-side OpenMDAO problem. + + To make a particular derived class, implement the start_server, + stop_server, and enough_time_is_remaining functions. + """ + def start_server(self): + """ + Start the server. + """ + pass + + def stop_server(self): + """ + Stop the server. + """ + pass + + def enough_time_is_remaining(self, estimated_model_time): + """ + Check if the current HPC job has enough time remaining + to run the next analysis. + + Parameters + ---------- + estimated_model_time : float + How much time the new analysis is estimated to take + """ + return True diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py new file mode 100644 index 00000000..85f646d6 --- /dev/null +++ b/mphys/network/zmq_pbs.py @@ -0,0 +1,202 @@ +import argparse +import json +import socket +import subprocess +import time +import zmq + +from pbs4py import PBS +from pbs4py.job import PBSJob +from mphys.network import RemoteComp, Server, ServerManager + +class RemoteZeroMQComp(RemoteComp): + """ + A derived RemoteComp class that uses pbs4py for HPC job management + and ZeroMQ for network communication. + """ + def initialize(self): + self.options.declare('pbs', "pbs4py Launcher object") + self.options.declare('port', default=5081, desc="port number for server/client communication") + self.options.declare('acceptable_port_range', default=[5081,6000], desc="port range to look through if 'port' is currently busy") + self.options.declare('additional_server_args', default="", desc="Optional arguments to give server, in addition to --port ") + super().initialize() + self.server_manager = None # for avoiding reinitialization due to multiple setup calls + + def _send_inputs_to_server(self, remote_input_dict, command: str): + if self._doing_derivative_evaluation(command): + print(f'CLIENT (subsystem {self.name}): Requesting derivative call from server', flush=True) + else: + print(f'CLIENT (subsystem {self.name}): Requesting function call from server', flush=True) + input_str = f"{command}|{str(json.dumps(remote_input_dict))}" + self.server_manager.socket.send(input_str.encode()) + + def _receive_outputs_from_server(self): + return json.loads(self.server_manager.socket.recv().decode()) + + def _setup_server_manager(self): + if self.server_manager is None: + self.server_manager = MPhysZeroMQServerManager(pbs=self.options['pbs'], + run_server_filename=self.options['run_server_filename'], + component_name=self.name, + port=self.options['port'], + acceptable_port_range=self.options['acceptable_port_range'], + additional_server_args=self.options['additional_server_args']) + +class MPhysZeroMQServerManager(ServerManager): + """ + A derived ServerManager class that uses pbs4py for HPC job management + and ZeroMQ for network communication. + + Parameters + ---------- + pbs : :class:`~pbs4py.PBS` + pbs4py launcher used for HPC job management + run_server_filename : str + Python filename that initializes and runs the :class:`~mphys.network.zmq_pbs.MPhysZeroMQServer` server + component_name : str + Name of the remote component, for capturing output from separate remote components to mphys_{component_name}_server{server_number}.out + port : int + Desired port number for ssh port forwarding + acceptable_port_range : list + Range of alternative port numbers if specified port is already in use + additional_server_args : str + Optional arguments to give server, in addition to --port + """ + def __init__(self, + pbs: PBS, + run_server_filename: str, + component_name: str, + port=5081, + acceptable_port_range=[5081,6000], + additional_server_args='' + ): + self.pbs = pbs + self.run_server_filename = run_server_filename + self.component_name = component_name + self.port = port + self.acceptable_port_range = acceptable_port_range + self.additional_server_args = additional_server_args + self.queue_time_delay = 5 # seconds to wait before rechecking if a job has started + self.server_counter = 0 # for saving output of each server to different files + self.start_server() + + def start_server(self): + self._initialize_connection() + self.server_counter += 1 + self._launch_job() + + def stop_server(self): + print(f'CLIENT (subsystem {self.component_name}): Stopping the remote analysis server', flush=True) + self.socket.send('shutdown|null'.encode()) + self._shutdown_server() + self.socket.close() + + def enough_time_is_remaining(self, estimated_model_time): + self.job.update_job_state() + if self.job.walltime_remaining is None: + return False + else: + return estimated_model_time < self.job.walltime_remaining + + def _port_is_in_use(self, port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port))==0 + + def _initialize_connection(self): + if self._port_is_in_use(self.port): + print(f'CLIENT (subsystem {self.component_name}): Port {self.port} is already in use... finding first available port in the range {self.acceptable_port_range}', flush=True) + + for port in range(self.acceptable_port_range[0],self.acceptable_port_range[1]+1): + if not self._port_is_in_use(port): + self.port = port + break + else: + raise RuntimeError(f'CLIENT (subsystem {self.component_name}): Could not find open port') + + self._initialize_zmq_socket() + + def _initialize_zmq_socket(self): + context = zmq.Context() + self.socket = context.socket(zmq.REQ) + self.socket.connect(f"tcp://localhost:{self.port}") + + def _launch_job(self): + print(f'CLIENT (subsystem {self.component_name}): Launching new server', flush=True) + python_command = (f"python {self.run_server_filename} --port {self.port} {self.additional_server_args}") + python_mpi_command = self.pbs.create_mpi_command(python_command, output_root_name=f'mphys_{self.component_name}_server{self.server_counter}') + jobid = self.pbs.launch(f'MPhys{self.port}', [python_mpi_command], blocking=False) + self.job = PBSJob(jobid) + self._wait_for_job_to_start() + self._setup_ssh() + + def _wait_for_job_to_start(self): + print(f'CLIENT (subsystem {self.component_name}): Waiting for job to start', flush=True) + job_submission_time = time.time() + self._setup_placeholder_ssh() + while self.job.state!='R': + time.sleep(self.queue_time_delay) + self.job.update_job_state() + self._stop_placeholder_ssh() + self.job_start_time = time.time() + print(f'CLIENT (subsystem {self.component_name}): Job started (queue wait time: {(time.time()-job_submission_time)/3600} hours)', flush=True) + + def _setup_ssh(self): + ssh_command = f'ssh -4 -o ServerAliveCountMax=40 -o ServerAliveInterval=15 -N -L {self.port}:localhost:{self.port} {self.job.hostname} &' + self.ssh_proc = subprocess.Popen(ssh_command.split(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + + def _shutdown_server(self): + self.ssh_proc.kill() + time.sleep(0.1) # prevent full shutdown before job deletion? + self.job.qdel() + + def _setup_placeholder_ssh(self): + print(f'CLIENT (subsystem {self.component_name}): Starting placeholder process to hold port {self.port} while in queue', flush=True) + ssh_command = f'ssh -4 -o ServerAliveCountMax=40 -o ServerAliveInterval=15 -N -L {self.port}:localhost:{self.port} {socket.gethostname()} &' + self.ssh_proc = subprocess.Popen(ssh_command.split(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + + def _stop_placeholder_ssh(self): + self.ssh_proc.kill() + +class MPhysZeroMQServer(Server): + """ + A derived Server class that uses ZeroMQ for network communication. + """ + def __init__(self, port, get_om_group_function_pointer, + ignore_setup_warnings = False, + ignore_runtime_warnings = False, + rerun_initial_design = False): + + super().__init__(get_om_group_function_pointer, ignore_setup_warnings, + ignore_runtime_warnings, rerun_initial_design) + self._setup_zeromq_socket(port) + + def _setup_zeromq_socket(self, port): + if self.rank==0: + context = zmq.Context() + self.socket = context.socket(zmq.REP) + self.socket.bind(f"tcp://*:{port}") + + def _parse_incoming_message(self): + inputs = None + if self.rank==0: + inputs = self.socket.recv().decode() + inputs = self.prob.model.comm.bcast(inputs) + + command, input_dict = inputs.split("|") + if command!='shutdown': + input_dict = json.loads(input_dict) + return command, input_dict + + def _send_outputs_to_client(self, output_dict: dict): + if self.rank==0: + self.socket.send(str(json.dumps(output_dict)).encode()) + +def get_default_zmq_pbs_argparser(): + parser = argparse.ArgumentParser('Python script for launching mphys analysis server', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--port', type=int, help='tcp port number for zeromq socket') + return parser diff --git a/setup.py b/setup.py index 5afd2f47..9d62d02c 100644 --- a/setup.py +++ b/setup.py @@ -9,12 +9,18 @@ long_description = f.read() optional_dependencies = { + 'remote': [ + 'pbs4py', + 'zmq' + ], 'test': [ 'testflo' ], 'docs': [ 'sphinx', - 'sphinxcontrib.bibtex' + 'sphinxcontrib.bibtex', + 'pbs4py', + 'zmq' ] } # Add an optional dependency that concatenates all others