diff --git a/docs/basics/remote_components.rst b/docs/basics/remote_components.rst index 401baf1d..1d99dd13 100644 --- a/docs/basics/remote_components.rst +++ b/docs/basics/remote_components.rst @@ -68,6 +68,7 @@ Troubleshooting --------------- The :code:`dump_json` option for :code:`RemoteZeroMQComp` will make the component write input and output JSON files, which contain all data sent to and received from the server. An exception is the :code:`wall_time` entry (given in seconds) in the output JSON file, which is added on the client-side after the server has completed the design evaluation. +Similarly, the :code:`down_time` entry keeps track of the elapsed time between the end of the previous design evaluation and the beginning of the current one. Another entry that is only provided for informational purposes is :code:`design_counter`, which keeps track of how many different designs have been evaluated on the current server. If :code:`dump_separate_json` is set to True, then separate files will be written for each design evaluation. On the server side, an n2 file titled :code:`n2_inner_analysis_.html` will be written after each evaluation. diff --git a/examples/aerostructural/supersonic_panel/README b/examples/aerostructural/supersonic_panel/README new file mode 100644 index 00000000..c5ad8454 --- /dev/null +++ b/examples/aerostructural/supersonic_panel/README @@ -0,0 +1,37 @@ +Summary of top-level codes: + + run.py: most basic code that demonstrates a single scenario, doing a derivative check at the starting design + run_parallel.py: code that demonstrates two scenarios (differing by Mach and dynamic pressure), evaluated in parallel with MultipointParallel, doing a derivative check at the starting design + as_opt_parallel.py: runs an optimization of the previous code's two scenarios (mass minimization subject to lift and stress constraints at the two flight conditions) + as_opt_remote_serial.py: runs the same optimization using one remote component that evaluates the MultipointParallel group in as_opt_parallel.py + as_opt_remote_parallel.py: runs the same optimization using two parallel remote components, which each evaluates the Multipoint analysis in run.py + +The optimizations should complete with the following metrics (with C_L being lift coefficient and func_struct being an aggregated von Mises stress). +Note that objective and constraint names can vary slightly based on the optimization script. + + Design Vars + {'aoa': array([10.52590682, 18.2314054 ]), + 'dv_struct': array([0.0001 , 0.0001 , 0.0001 , 0.0001 , 0.0001 , + 0.0001 , 0.0001 , 0.0001 , 0.0001 , 0.00010421, + 0.00010883, 0.00011221, 0.00011371, 0.00011452, 0.0001133 , + 0.00010892, 0.00010359, 0.0001 , 0.0001 , 0.0001 ]), + 'geometry_morph_param': array([0.1])} + + Nonlinear constraints + {'multipoint.aerostructural1.C_L': array([0.15]), + 'multipoint.aerostructural1.func_struct': array([1.00000023]), + 'multipoint.aerostructural2.C_L': array([0.45]), + 'multipoint.aerostructural2.func_struct': array([1.00000051])} + + Objectives + {'multipoint.aerostructural1.mass': array([8.73298752e-05])} + + Optimization terminated successfully (Exit mode 0) + Current function value: 0.008732987524877025 + Iterations: 22 + Function evaluations: 24 + Gradient evaluations: 22 + +Note that the remote scripts, which both use mphys_server.py to launch the HPC job used for the analyses, are set up to use the K4 queue of NASA Langley's K cluster. +To run this script on an HPC not supported by pbs4py, you will likely have to write your own pbs4py Launcher constructor. +Further details on remote components may be found on the document page on remote components: https://openmdao.github.io/mphys/basics/remote_components.html diff --git a/examples/aerostructural/supersonic_panel/aerodynamics_mphys.py b/examples/aerostructural/supersonic_panel/aerodynamics_mphys.py index 48f4d9d5..15512081 100644 --- a/examples/aerostructural/supersonic_panel/aerodynamics_mphys.py +++ b/examples/aerostructural/supersonic_panel/aerodynamics_mphys.py @@ -5,13 +5,16 @@ from piston_theory import PistonTheory +X_AERO0_MESH = MPhysVariables.Aerodynamics.Surface.Mesh.COORDINATES +X_AERO = MPhysVariables.Aerodynamics.Surface.COORDINATES +F_AERO = MPhysVariables.Aerodynamics.Surface.LOADS + # IVC which returns a baseline mesh class AeroMesh(om.IndepVarComp): def initialize(self): self.options.declare('x_aero0') def setup(self): - self.x_aero0_name = MPhysVariables.Aerodynamics.Surface.COORDINATES_INITIAL - self.add_output(self.x_aero0_name, val=self.options['x_aero0'], distributed=True, tags=['mphys_coordinates']) + self.add_output(X_AERO0_MESH, val=self.options['x_aero0'], distributed=True, tags=['mphys_coordinates']) # IC which computes aero pressures @@ -19,12 +22,10 @@ class AeroSolver(om.ImplicitComponent): def initialize(self): self.options.declare('solver') - self.x_aero_name = MPhysVariables.Aerodynamics.Surface.COORDINATES - def setup(self): self.solver = self.options['solver'] - self.add_input(self.x_aero_name, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(X_AERO, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) self.add_input('aoa', 0., units = 'deg', tags=['mphys_input']) self.add_input('qdyn', 0., tags=['mphys_input']) self.add_input('mach', 0., tags=['mphys_input']) @@ -38,7 +39,7 @@ def setup(self): def solve_nonlinear(self,inputs,outputs): - self.solver.xyz = inputs[self.x_aero_name] + self.solver.xyz = inputs[X_AERO] self.solver.aoa = inputs['aoa'] self.solver.qdyn = inputs['qdyn'] self.solver.mach = inputs['mach'] @@ -46,7 +47,7 @@ def solve_nonlinear(self,inputs,outputs): outputs['pressure'] = self.solver.compute_pressure() def apply_nonlinear(self,inputs,outputs,residuals): - self.solver.xyz = inputs[self.x_aero_name] + self.solver.xyz = inputs[X_AERO] self.solver.aoa = inputs['aoa'] self.solver.qdyn = inputs['qdyn'] self.solver.mach = inputs['mach'] @@ -69,8 +70,8 @@ def apply_linear(self,inputs,outputs,d_inputs,d_outputs,d_residuals,mode): adjoint=d_residuals['pressure'] ) - if self.x_aero_name in d_inputs: - d_inputs[self.x_aero_name] += d_xa + if X_AERO in d_inputs: + d_inputs[X_AERO] += d_xa if 'aoa' in d_inputs: d_inputs['aoa'] += d_aoa if 'qdyn' in d_inputs: @@ -85,30 +86,28 @@ def initialize(self): self.options.declare('solver') def setup(self): - self.x_aero_name = MPhysVariables.Aerodynamics.Surface.COORDINATES - self.f_aero_name = MPhysVariables.Aerodynamics.Surface.LOADS self.solver = self.options['solver'] - self.add_input(self.x_aero_name, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(X_AERO, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) self.add_input('pressure', shape_by_conn=True, distributed=True, tags=['mphys_coupling']) - self.add_output(self.f_aero_name, np.zeros(self.solver.n_nodes*self.solver.n_dof), distributed=True, tags=['mphys_coupling']) + self.add_output(F_AERO, np.zeros(self.solver.n_nodes*self.solver.n_dof), distributed=True, tags=['mphys_coupling']) def compute(self,inputs,outputs): - self.solver.xyz = inputs[self.x_aero_name] + self.solver.xyz = inputs[X_AERO] self.solver.pressure = inputs['pressure'] - outputs[self.f_aero_name] = self.solver.compute_force() + outputs[F_AERO] = self.solver.compute_force() def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if mode == 'rev': - if self.f_aero_name in d_outputs: + if F_AERO in d_outputs: d_xa, d_p = self.solver.compute_force_derivatives( - adjoint=d_outputs[self.f_aero_name] + adjoint=d_outputs[F_AERO] ) - if self.x_aero_name in d_inputs: - d_inputs[self.x_aero_name] += d_xa + if X_AERO in d_inputs: + d_inputs[X_AERO] += d_xa if 'pressure' in d_inputs: d_inputs['pressure'] += d_p @@ -205,4 +204,4 @@ def get_number_of_nodes(self): return self.solver.n_nodes def get_ndof(self): - return self.soler.n_dof + return self.solver.n_dof diff --git a/examples/aerostructural/supersonic_panel/as_opt_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_parallel.py index e9feed95..2b671e59 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_parallel.py @@ -32,24 +32,25 @@ def initialize(self): self.options.declare('scenario_names') def setup(self): - for i in range(len(self.options['scenario_names'])): + + for scenario_name in self.options['scenario_names']: # create the run directory if self.comm.rank==0: - if not os.path.isdir(self.options['scenario_names'][i]): - os.mkdir(self.options['scenario_names'][i]) + if not os.path.isdir(scenario_name): + os.mkdir(scenario_name) self.comm.Barrier() nonlinear_solver = om.NonlinearBlockGS(maxiter=100, iprint=2, use_aitken=True, aitken_initial_factor=0.5) linear_solver = om.LinearBlockGS(maxiter=40, iprint=2, use_aitken=True, aitken_initial_factor=0.5) - self.mphys_add_scenario(self.options['scenario_names'][i], + self.mphys_add_scenario(scenario_name, ScenarioAeroStructural( aero_builder=self.options['aero_builder'], struct_builder=self.options['struct_builder'], ldxfer_builder=self.options['xfer_builder'], geometry_builder=self.options['geometry_builder'], in_MultipointParallel=True, - run_directory=self.options['scenario_names'][i]), + run_directory=scenario_name), coupling_nonlinear_solver=nonlinear_solver, coupling_linear_solver=linear_solver) @@ -98,12 +99,14 @@ def setup(self): geometry_builder = GeometryBuilder(builders) # add parallel multipoint group - self.add_subsystem('multipoint',AerostructParallel( - aero_builder=aero_builder, - struct_builder=struct_builder, - xfer_builder=xfer_builder, - geometry_builder=geometry_builder, - scenario_names=self.scenario_names)) + self.add_subsystem('multipoint', + AerostructParallel( + aero_builder=aero_builder, + struct_builder=struct_builder, + xfer_builder=xfer_builder, + geometry_builder=geometry_builder, + scenario_names=self.scenario_names) + ) for i in range(len(self.scenario_names)): @@ -173,7 +176,7 @@ def get_model(scenario_names): prob.cleanup() if prob.model.comm.rank==0: # write out data - cr = om.CaseReader("optimization_history.sql") + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql") driver_cases = cr.list_cases('driver') case = cr.get_case(0) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py index 97534964..7e75921c 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py @@ -40,9 +40,6 @@ def setup(self): class TopLevelGroup(om.Group): def setup(self): - if self.comm.size!=2: - raise SystemError('Please launch with 2 processors') - # IVCs that feed into both parallel groups self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) @@ -114,7 +111,7 @@ def setup(self): # write out data if prob.model.comm.rank==0: - cr = om.CaseReader("optimization_history_parallel.sql") + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history_parallel.sql") driver_cases = cr.list_cases('driver') case = cr.get_case(0) @@ -142,5 +139,6 @@ def setup(self): f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') f.write(' ' + '\n') -# shutdown each rank's server -eval(f'prob.model.multipoint.remote_scenario{prob.model.comm.rank}.stop_server()') +# shutdown the servers +prob.model.multipoint.remote_scenario0.stop_server() +prob.model.multipoint.remote_scenario1.stop_server() diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py index 16a985c3..812dc67a 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py @@ -54,40 +54,41 @@ prob.cleanup() # write out data - cr = om.CaseReader("optimization_history.sql") - driver_cases = cr.list_cases('driver') - - case = cr.get_case(0) - cons = case.get_constraints() - dvs = case.get_design_vars() - objs = case.get_objectives() - - with open("optimization_history.dat","w+") as f: - - for i, k in enumerate(objs.keys()): - f.write('objective: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + if prob.model.comm.rank==0: + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql") + driver_cases = cr.list_cases('driver') + + case = cr.get_case(0) + cons = case.get_constraints() + dvs = case.get_design_vars() + objs = case.get_objectives() + + with open("optimization_history.dat","w+") as f: + + for i, k in enumerate(objs.keys()): + f.write('objective: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(cons.keys()): + f.write('constraint: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(dvs.keys()): + f.write('DV: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + f.write('run times, function\n') + for i in range(len(prob.model.remote.times_function)): + f.write(f'{prob.model.remote.times_function[i]}\n') f.write(' ' + '\n') - for i, k in enumerate(cons.keys()): - f.write('constraint: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write('run times, gradient\n') + for i in range(len(prob.model.remote.times_gradient)): + f.write(f'{prob.model.remote.times_gradient[i]}\n') f.write(' ' + '\n') - - for i, k in enumerate(dvs.keys()): - f.write('DV: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') - f.write(' ' + '\n') - - f.write('run times, function\n') - for i in range(len(prob.model.remote.times_function)): - f.write(f'{prob.model.remote.times_function[i]}\n') - f.write(' ' + '\n') - - f.write('run times, gradient\n') - for i in range(len(prob.model.remote.times_gradient)): - f.write(f'{prob.model.remote.times_gradient[i]}\n') - f.write(' ' + '\n') diff --git a/examples/aerostructural/supersonic_panel/geometry_morph.py b/examples/aerostructural/supersonic_panel/geometry_morph.py index 5f60f697..cbd9ded5 100644 --- a/examples/aerostructural/supersonic_panel/geometry_morph.py +++ b/examples/aerostructural/supersonic_panel/geometry_morph.py @@ -1,7 +1,14 @@ import numpy as np import openmdao.api as om from mpi4py import MPI -from mphys import Builder +from mphys import Builder, MPhysVariables + + +X_AERO0_GEOM_INPUT = MPhysVariables.Aerodynamics.Surface.Geometry.COORDINATES_INPUT +X_AERO0_GEOM_OUTPUT = MPhysVariables.Aerodynamics.Surface.Geometry.COORDINATES_OUTPUT + +X_STRUCT_GEOM_INPUT = MPhysVariables.Structures.Geometry.COORDINATES_INPUT +X_STRUCT_GEOM_OUTPUT = MPhysVariables.Structures.Geometry.COORDINATES_OUTPUT # EC which morphs the geometry class GeometryMorph(om.ExplicitComponent): @@ -9,26 +16,29 @@ def initialize(self): self.options.declare('names') self.options.declare('n_nodes') + self.input_names = {'aero': X_AERO0_GEOM_INPUT, 'struct': X_STRUCT_GEOM_INPUT} + self.output_names = {'aero': X_AERO0_GEOM_OUTPUT, 'struct': X_STRUCT_GEOM_OUTPUT} + def setup(self): self.add_input('geometry_morph_param') for name, n_nodes in zip(self.options['names'], self.options['n_nodes']): - self.add_input(f'x_{name}_in', distributed=True, shape_by_conn=True) - self.add_output(f'x_{name}0', shape=n_nodes*3, distributed=True, tags=['mphys_coordinates']) + self.add_input(self.input_names[name], distributed=True, shape_by_conn=True, tags=['mphys_coordinates']) + self.add_output(self.output_names[name], shape=n_nodes*3, distributed=True, tags=['mphys_coordinates']) def compute(self,inputs,outputs): for name in self.options['names']: - outputs[f'x_{name}0'] = inputs['geometry_morph_param']*inputs[f'x_{name}_in'] + outputs[self.output_names[name]] = inputs['geometry_morph_param']*inputs[self.input_names[name]] def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if mode == 'rev': for name in self.options['names']: - if f'x_{name}0' in d_outputs: + if self.output_names[name] in d_outputs: if 'geometry_morph_param' in d_inputs: - d_inputs['geometry_morph_param'] += self.comm.allreduce(np.sum(d_outputs[f'x_{name}0']*inputs[f'x_{name}_in']), op=MPI.SUM) + d_inputs['geometry_morph_param'] += self.comm.allreduce(np.sum(d_outputs[self.output_names[name]]*inputs[self.input_names[name]]), op=MPI.SUM) - if f'x_{name}_in' in d_inputs: - d_inputs[f'x_{name}_in'] += d_outputs[f'x_{name}0']*inputs['geometry_morph_param'] + if self.input_names[name] in d_inputs: + d_inputs[self.input_names[name]] += d_outputs[self.output_names[name]]*inputs['geometry_morph_param'] # Builder diff --git a/examples/aerostructural/supersonic_panel/run.py b/examples/aerostructural/supersonic_panel/run.py index 67bf0a79..57359115 100644 --- a/examples/aerostructural/supersonic_panel/run.py +++ b/examples/aerostructural/supersonic_panel/run.py @@ -3,7 +3,7 @@ import openmdao.api as om import os -from mphys import Multipoint +from mphys import Multipoint, MPhysVariables from mphys.scenarios.aerostructural import ScenarioAeroStructural from structures_mphys import StructBuilder @@ -23,6 +23,17 @@ N_el_struct = 20 N_el_aero = 7 +X_AERO = MPhysVariables.Aerodynamics.Surface.COORDINATES +X_AERO0 = MPhysVariables.Aerodynamics.Surface.COORDINATES_INITIAL +X_AERO0_MESH = MPhysVariables.Aerodynamics.Surface.Mesh.COORDINATES +X_AERO0_GEOM_INPUT = MPhysVariables.Aerodynamics.Surface.Geometry.COORDINATES_INPUT +X_AERO0_GEOM_OUTPUT = MPhysVariables.Aerodynamics.Surface.Geometry.COORDINATES_OUTPUT + +X_STRUCT = MPhysVariables.Structures.COORDINATES +X_STRUCT_MESH = MPhysVariables.Structures.Mesh.COORDINATES +X_STRUCT_GEOM_INPUT = MPhysVariables.Structures.Geometry.COORDINATES_INPUT +X_STRUCT_GEOM_OUTPUT = MPhysVariables.Structures.Geometry.COORDINATES_OUTPUT + # Mphys class Model(Multipoint): def initialize(self): @@ -74,10 +85,10 @@ def setup(self): self.add_subsystem('struct_mesh', struct_builder.get_mesh_coordinate_subsystem()) self.add_subsystem('aero_mesh', aero_builder.get_mesh_coordinate_subsystem()) - self.add_subsystem('geometry', geometry_builder.get_mesh_coordinate_subsystem(), promotes=['*']) + self.add_subsystem('geometry', geometry_builder.get_mesh_coordinate_subsystem()) - self.connect('struct_mesh.x_struct0', 'x_struct_in') - self.connect('aero_mesh.x_aero0', 'x_aero_in') + self.connect(f'struct_mesh.{X_STRUCT_MESH}', f'geometry.{X_STRUCT_GEOM_INPUT}') + self.connect(f'aero_mesh.{X_AERO0_MESH}', f'geometry.{X_AERO0_GEOM_INPUT}') # create the run directory if self.comm.rank==0: @@ -97,8 +108,11 @@ def setup(self): coupling_nonlinear_solver=nonlinear_solver, coupling_linear_solver=linear_solver) - for var in ['modulus', 'yield_stress', 'density', 'mach', 'qdyn', 'aoa', 'dv_struct', 'x_struct0', 'x_aero0']: + for var in ['modulus', 'yield_stress', 'density', 'mach', 'qdyn', 'aoa', 'dv_struct']: self.connect(var, self.scenario_name+'.'+var) + self.connect('geometry_morph_param', 'geometry.geometry_morph_param') + self.connect(f'geometry.{X_AERO0_GEOM_OUTPUT}', f'{self.scenario_name}.{X_AERO0}') + self.connect(f'geometry.{X_STRUCT_GEOM_OUTPUT}', f'{self.scenario_name}.{X_STRUCT}') # add design variables, to simplify remote setup self.add_design_var('geometry_morph_param', lower=0.1, upper=10.0) diff --git a/examples/aerostructural/supersonic_panel/run_parallel.py b/examples/aerostructural/supersonic_panel/run_parallel.py index bf2a6dcd..04b4bb86 100644 --- a/examples/aerostructural/supersonic_panel/run_parallel.py +++ b/examples/aerostructural/supersonic_panel/run_parallel.py @@ -2,7 +2,7 @@ from mpi4py import MPI import openmdao.api as om -from mphys import Multipoint, MultipointParallel +from mphys import MultipointParallel from mphys.scenarios.aerostructural import ScenarioAeroStructural from structures_mphys import StructBuilder @@ -22,34 +22,37 @@ N_el_struct = 20 N_el_aero = 7 -# Mphys parallel multipoint scenarios class AerostructParallel(MultipointParallel): -#class AerostructParallel(Multipoint): - def __init__(self, aero_builder=None, struct_builder=None, xfer_builder=None, geometry_builder=None, scenario_names=None): - super().__init__() - self.aero_builder = aero_builder - self.struct_builder = struct_builder - self.xfer_builder = xfer_builder - self.geometry_builder = geometry_builder - self.scenario_names = scenario_names + + def initialize(self): + self.options.declare('aero_builder') + self.options.declare('struct_builder') + self.options.declare('xfer_builder') + self.options.declare('geometry_builder') + self.options.declare('scenario_names') def setup(self): - for i in range(len(self.scenario_names)): + + for scenario_name in self.options['scenario_names']: nonlinear_solver = om.NonlinearBlockGS(maxiter=100, iprint=2, use_aitken=True, aitken_initial_factor=0.5) linear_solver = om.LinearBlockGS(maxiter=40, iprint=2, use_aitken=True, aitken_initial_factor=0.5) - self.mphys_add_scenario(self.scenario_names[i], + self.mphys_add_scenario(scenario_name, ScenarioAeroStructural( - aero_builder=self.aero_builder, - struct_builder=self.struct_builder, - ldxfer_builder=self.xfer_builder, - geometry_builder=self.geometry_builder, + aero_builder=self.options['aero_builder'], + struct_builder=self.options['struct_builder'], + ldxfer_builder=self.options['xfer_builder'], + geometry_builder=self.options['geometry_builder'], in_MultipointParallel=True), coupling_nonlinear_solver=nonlinear_solver, coupling_linear_solver=linear_solver) # OM group class Model(om.Group): + def initialize(self): + self.options.declare('scenario_names', default=['aerostructural1','aerostructural2']) + def setup(self): + self.scenario_names = self.options['scenario_names'] # ivc self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) @@ -78,38 +81,34 @@ def setup(self): aero_builder = AeroBuilder(aero_setup) # xfer builder - xfer_builder = XferBuilder( - aero_builder=aero_builder, - struct_builder=struct_builder - ) + xfer_builder = XferBuilder(aero_builder=aero_builder, struct_builder=struct_builder) # geometry builders = {'struct': struct_builder, 'aero': aero_builder} geometry_builder = GeometryBuilder(builders) - # list of scenario names - scenario_names = ['aerostructural1','aerostructural2'] - # add parallel multipoint group - self.add_subsystem('multipoint',AerostructParallel( - aero_builder=aero_builder, - struct_builder=struct_builder, - xfer_builder=xfer_builder, - geometry_builder=geometry_builder, - scenario_names=scenario_names)) + self.add_subsystem('multipoint', + AerostructParallel( + aero_builder=aero_builder, + struct_builder=struct_builder, + xfer_builder=xfer_builder, + geometry_builder=geometry_builder, + scenario_names=self.scenario_names) + ) - for i in range(len(scenario_names)): + for i in range(len(self.scenario_names)): # connect scalar inputs to the scenario for var in ['modulus', 'yield_stress', 'density', 'dv_struct']: - self.connect(var, 'multipoint.'+scenario_names[i]+'.'+var) + self.connect(var, 'multipoint.'+self.scenario_names[i]+'.'+var) # connect vector inputs for var in ['mach', 'qdyn', 'aoa']: - self.connect(var, 'multipoint.'+scenario_names[i]+'.'+var, src_indices=[i]) + self.connect(var, 'multipoint.'+self.scenario_names[i]+'.'+var, src_indices=[i]) # connect top-level geom parameter - self.connect('geometry_morph_param', 'multipoint.'+scenario_names[i]+'.geometry.geometry_morph_param') + self.connect('geometry_morph_param', 'multipoint.'+self.scenario_names[i]+'.geometry.geometry_morph_param') # run model and check derivatives if __name__ == "__main__": diff --git a/examples/aerostructural/supersonic_panel/structures_mphys.py b/examples/aerostructural/supersonic_panel/structures_mphys.py index 4a1b6d24..959afa2e 100644 --- a/examples/aerostructural/supersonic_panel/structures_mphys.py +++ b/examples/aerostructural/supersonic_panel/structures_mphys.py @@ -1,15 +1,21 @@ import numpy as np import openmdao.api as om -from mphys import Builder +from mphys import Builder, MPhysVariables from beam_solver import Beam + +X_STRUCT_MESH = MPhysVariables.Structures.Mesh.COORDINATES +X_STRUCT = MPhysVariables.Structures.COORDINATES +U_STRUCT = MPhysVariables.Structures.DISPLACEMENTS +F_STRUCT = MPhysVariables.Structures.Loads.AERODYNAMIC + # IVC which returns a baseline mesh class StructMesh(om.IndepVarComp): def initialize(self): self.options.declare('x_struct0') def setup(self): - self.add_output('x_struct0', val=self.options['x_struct0'], distributed=True, tags=['mphys_coordinates']) + self.add_output(X_STRUCT_MESH, val=self.options['x_struct0'], distributed=True, tags=['mphys_coordinates']) # IC which computes structural displacements @@ -21,76 +27,76 @@ def setup(self): self.solver = self.options['solver'] self.add_input('dv_struct', shape_by_conn=True, tags=['mphys_input']) - self.add_input('x_struct0', shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) - self.add_input('f_struct', shape_by_conn=True, distributed=True, tags=['mphys_coupling']) + self.add_input(X_STRUCT, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(F_STRUCT, shape_by_conn=True, distributed=True, tags=['mphys_coupling']) self.add_input('modulus', 0., tags=['mphys_input']) - self.add_output('u_struct', np.zeros(self.solver.n_dof*self.solver.n_nodes), distributed=True, tags=['mphys_coupling']) + self.add_output(U_STRUCT, np.zeros(self.solver.n_dof*self.solver.n_nodes), distributed=True, tags=['mphys_coupling']) # bc correct term, needed for LNBGS residuals to properly converge to 0 self.bc_correct = np.zeros(self.solver.n_dof*self.solver.n_nodes) def solve_nonlinear(self,inputs,outputs): self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.modulus = inputs['modulus'] - outputs['u_struct'] = self.solver.solve_system( - f=inputs['f_struct'] + outputs[U_STRUCT] = self.solver.solve_system( + f=inputs[F_STRUCT] ) def apply_nonlinear(self,inputs,outputs,residuals): self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.modulus = inputs['modulus'] - residuals['u_struct'] = self.solver.compute_residual( - u=outputs['u_struct'], - f=inputs['f_struct'] + residuals[U_STRUCT] = self.solver.compute_residual( + u=outputs[U_STRUCT], + f=inputs[F_STRUCT] ) def solve_linear(self,d_outputs,d_residuals,mode): if mode == 'rev': - d_residuals['u_struct'] = self.solver.solve_system( - f=d_outputs['u_struct'] + d_residuals[U_STRUCT] = self.solver.solve_system( + f=d_outputs[U_STRUCT] ) # correct the boundary condition dof, in order to set the LHS equal to the RHS self.bc_correct = self.solver.bc_correction( - u=d_outputs['u_struct'] + u=d_outputs[U_STRUCT] ) - d_residuals['u_struct'] += self.bc_correct + d_residuals[U_STRUCT] += self.bc_correct def apply_linear(self,inputs,outputs,d_inputs,d_outputs,d_residuals,mode): if mode == 'rev': - if 'u_struct' in d_residuals: + if U_STRUCT in d_residuals: self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] - self.solver.modulus = inputs['modulus'] + self.solver.xyz = inputs[X_STRUCT] + self.solver.modulus = inputs['modulus'] adjoint = self.solver.set_adjoint( - adjoint=d_residuals['u_struct'] + adjoint=d_residuals[U_STRUCT] ) - if 'u_struct' in d_outputs: - d_outputs['u_struct'] += self.solver.compute_residual( - u=adjoint, + if U_STRUCT in d_outputs: + d_outputs[U_STRUCT] += self.solver.compute_residual( + u=adjoint, f=np.zeros_like(adjoint) ) # add back in non-zero values at the bc DOFs, so the LNBGS residuals look correct - d_outputs['u_struct'] += self.bc_correct + d_outputs[U_STRUCT] += self.bc_correct d_dv_struct, d_xs, d_modulus = self.solver.compute_stiffness_derivatives( - u=outputs['u_struct'], + u=outputs[U_STRUCT], adjoint=adjoint ) if 'dv_struct' in d_inputs: d_inputs['dv_struct'] += d_dv_struct - if 'x_struct0' in d_inputs: - d_inputs['x_struct0'] += d_xs - if 'f_struct' in d_inputs: - d_inputs['f_struct'] -= adjoint + if X_STRUCT in d_inputs: + d_inputs[X_STRUCT] += d_xs + if F_STRUCT in d_inputs: + d_inputs[F_STRUCT] -= adjoint if 'modulus' in d_inputs: d_inputs['modulus'] += d_modulus @@ -102,29 +108,29 @@ def initialize(self): def setup(self): self.solver = self.options['solver'] - + self.aggregation_parameter = 20. self.add_input('dv_struct', shape_by_conn=True, tags = ['mphys_input']) - self.add_input('x_struct0', shape_by_conn=True, distributed=True, tags = ['mphys_coordinates']) - self.add_input('u_struct', shape_by_conn=True, distributed=True, tags = ['mphys_coupling']) + self.add_input(X_STRUCT, shape_by_conn=True, distributed=True, tags = ['mphys_coordinates']) + self.add_input(U_STRUCT, shape_by_conn=True, distributed=True, tags = ['mphys_coupling']) self.add_input('modulus', 0., tags=['mphys_input']) self.add_input('yield_stress', 0., tags=['mphys_input']) self.add_output('func_struct', 0., tags = ['mphys_result']) def compute(self,inputs,outputs): self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.modulus = inputs['modulus'] self.solver.yield_stress = inputs['yield_stress'] self.stress, outputs['func_struct'] = self.solver.compute_stress( - u=inputs['u_struct'], + u=inputs[U_STRUCT], aggregation_parameter=self.aggregation_parameter ) self.solver.write_output( - u=inputs['u_struct'], + u=inputs[U_STRUCT], stress=self.stress ) @@ -132,23 +138,23 @@ def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if mode == 'rev': if 'func_struct' in d_outputs: self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.modulus = inputs['modulus'] self.solver.yield_stress = inputs['yield_stress'] d_dv_struct, d_xs, d_us, d_modulus, d_yield_stress = self.solver.compute_stress_derivatives( - u=inputs['u_struct'], - stress=self.stress, - aggregation_parameter=self.aggregation_parameter, + u=inputs[U_STRUCT], + stress=self.stress, + aggregation_parameter=self.aggregation_parameter, adjoint=d_outputs['func_struct'] ) if 'dv_struct' in d_inputs: d_inputs['dv_struct'] += d_dv_struct - if 'x_struct0' in d_inputs: - d_inputs['x_struct0'] += d_xs - if 'u_struct' in d_inputs: - d_inputs['u_struct'] += d_us + if X_STRUCT in d_inputs: + d_inputs[X_STRUCT] += d_xs + if U_STRUCT in d_inputs: + d_inputs[U_STRUCT] += d_us if 'modulus' in d_inputs: d_inputs['modulus'] += d_modulus if 'yield_stress' in d_inputs: @@ -164,13 +170,13 @@ def setup(self): self.solver = self.options['solver'] self.add_input('dv_struct', shape_by_conn=True, tags = ['mphys_input']) - self.add_input('x_struct0', shape_by_conn=True, distributed=True, tags = ['mphys_coordinates']) + self.add_input(X_STRUCT, shape_by_conn=True, distributed=True, tags = ['mphys_coordinates']) self.add_input('density', 0., tags=['mphys_input']) self.add_output('mass', tags=['mphys_result']) def compute(self,inputs,outputs): self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.density = inputs['density'] outputs['mass'] = self.solver.compute_mass() @@ -179,7 +185,7 @@ def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if mode == 'rev': if 'mass' in d_outputs: self.solver.dv_struct = inputs['dv_struct'] - self.solver.xyz = inputs['x_struct0'] + self.solver.xyz = inputs[X_STRUCT] self.solver.density = inputs['density'] d_dv_struct, d_xs, d_density = self.solver.compute_mass_derivatives( @@ -188,12 +194,12 @@ def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if 'dv_struct' in d_inputs: d_inputs['dv_struct'] += d_dv_struct - if 'x_struct0' in d_inputs: - d_inputs['x_struct0'] += d_xs + if X_STRUCT in d_inputs: + d_inputs[X_STRUCT] += d_xs if 'density' in d_inputs: d_inputs['density'] += d_density - + # Group which holds the solver class StructSolverGroup(om.Group): def initialize(self): @@ -230,8 +236,8 @@ def __init__(self, options): def initialize(self, comm): self.solver = Beam( - panel_chord=self.options['panel_chord'], - panel_width=self.options['panel_width'], + panel_chord=self.options['panel_chord'], + panel_width=self.options['panel_width'], N_el=self.options['N_el'], comm=comm ) @@ -239,7 +245,7 @@ def initialize(self, comm): def get_mesh_coordinate_subsystem(self, scenario_name=None): if self.solver.owned is not None: x_struct0 = np.c_[self.solver.x,self.solver.y,self.solver.z].flatten(order='C') - else: + else: x_struct0 = np.zeros(0) return StructMesh(x_struct0=x_struct0) @@ -254,4 +260,3 @@ def get_number_of_nodes(self): def get_ndof(self): return self.solver.n_dof - diff --git a/examples/aerostructural/supersonic_panel/xfer_mphys.py b/examples/aerostructural/supersonic_panel/xfer_mphys.py index dcd15601..0657aeec 100644 --- a/examples/aerostructural/supersonic_panel/xfer_mphys.py +++ b/examples/aerostructural/supersonic_panel/xfer_mphys.py @@ -1,9 +1,17 @@ import numpy as np import openmdao.api as om -from mphys import Builder +from mphys import Builder, MPhysVariables from xfer import Xfer +X_STRUCT = MPhysVariables.Structures.COORDINATES +U_STRUCT = MPhysVariables.Structures.DISPLACEMENTS +F_STRUCT = MPhysVariables.Structures.Loads.AERODYNAMIC + +X_AERO0 = MPhysVariables.Aerodynamics.Surface.COORDINATES_INITIAL +U_AERO = MPhysVariables.Aerodynamics.Surface.DISPLACEMENTS +F_AERO = MPhysVariables.Aerodynamics.Surface.LOADS + # EC which transfers displacements from structure to aero class DispXfer(om.ExplicitComponent): def initialize(self): @@ -12,31 +20,31 @@ def initialize(self): def setup(self): self.solver = self.options['solver'] - self.add_input('x_struct0', shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) - self.add_input('x_aero0', shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) - self.add_input('u_struct', shape_by_conn=True, distributed=True, tags=['mphys_coupling']) - self.add_output('u_aero', np.zeros(self.solver.aero.n_dof*self.solver.aero.n_nodes), distributed=True, tags=['mphys_coupling']) + self.add_input(X_STRUCT, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(X_AERO0, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(U_STRUCT, shape_by_conn=True, distributed=True, tags=['mphys_coupling']) + self.add_output(U_AERO, np.zeros(self.solver.aero.n_dof*self.solver.aero.n_nodes), distributed=True, tags=['mphys_coupling']) def compute(self,inputs,outputs): - self.solver.xs = inputs['x_struct0'] - self.solver.xa = inputs['x_aero0'] - self.solver.us = inputs['u_struct'] + self.solver.xs = inputs[X_STRUCT] + self.solver.xa = inputs[X_AERO0] + self.solver.us = inputs[U_STRUCT] + + outputs[U_AERO] = self.solver.transfer_displacements() - outputs['u_aero'] = self.solver.transfer_displacements() - def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): if mode == 'rev': - if 'u_aero' in d_outputs: + if U_AERO in d_outputs: d_xs, d_xa, d_us = self.solver.transfer_displacements_derivatives( - adjoint=d_outputs['u_aero'] + adjoint=d_outputs[U_AERO] ) - if 'x_struct0' in d_inputs: - d_inputs['x_struct0'] += d_xs - if 'x_aero0' in d_inputs: - d_inputs['x_aero0'] += d_xa - if 'u_struct' in d_inputs: - d_inputs['u_struct'] += d_us + if X_STRUCT in d_inputs: + d_inputs[X_STRUCT] += d_xs + if X_AERO0 in d_inputs: + d_inputs[X_AERO0] += d_xa + if U_STRUCT in d_inputs: + d_inputs[U_STRUCT] += d_us # EC which transfers loads from aero to structure @@ -47,31 +55,31 @@ def initialize(self): def setup(self): self.solver = self.options['solver'] - self.add_input('x_struct0', shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) - self.add_input('x_aero0', shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) - self.add_input('f_aero', shape_by_conn=True, distributed=True, tags=['mphys_coupling']) - self.add_output('f_struct', np.zeros(self.solver.struct.n_dof*self.solver.struct.n_nodes), distributed=True, tags=['mphys_coupling']) - + self.add_input(X_STRUCT, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(X_AERO0, shape_by_conn=True, distributed=True, tags=['mphys_coordinates']) + self.add_input(F_AERO, shape_by_conn=True, distributed=True, tags=['mphys_coupling']) + self.add_output(F_STRUCT, np.zeros(self.solver.struct.n_dof*self.solver.struct.n_nodes), distributed=True, tags=['mphys_coupling']) + def compute(self,inputs,outputs): - self.solver.xs = inputs['x_struct0'] - self.solver.xa = inputs['x_aero0'] - self.solver.fa = inputs['f_aero'] + self.solver.xs = inputs[X_STRUCT] + self.solver.xa = inputs[X_AERO0] + self.solver.fa = inputs[F_AERO] + + outputs[F_STRUCT] = self.solver.transfer_loads() - outputs['f_struct'] = self.solver.transfer_loads() - def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode): - if mode == 'rev': - if 'f_struct' in d_outputs: + if mode == 'rev': + if F_STRUCT in d_outputs: d_xs, d_xa, d_fa = self.solver.transfer_loads_derivatives( - adjoint=d_outputs['f_struct'] + adjoint=d_outputs[F_STRUCT] ) - if 'x_struct0' in d_inputs: - d_inputs['x_struct0'] += d_xs - if 'x_aero0' in d_inputs: - d_inputs['x_aero0'] += d_xa - if 'f_aero' in d_inputs: - d_inputs['f_aero'] += d_fa + if X_STRUCT in d_inputs: + d_inputs[X_STRUCT] += d_xs + if X_AERO0 in d_inputs: + d_inputs[X_AERO0] += d_xa + if F_AERO in d_inputs: + d_inputs[F_AERO] += d_fa # Builder @@ -80,7 +88,7 @@ def __init__(self, aero_builder, struct_builder): self.aero_builder = aero_builder self.struct_builder = struct_builder - def initialize(self, comm): + def initialize(self, comm): self.solver = Xfer( aero = self.aero_builder.solver, struct = self.struct_builder.solver, @@ -91,5 +99,3 @@ def get_coupling_group_subsystem(self, scenario_name=None): disp_xfer = DispXfer(solver=self.solver) load_xfer = LoadXfer(solver=self.solver) return disp_xfer, load_xfer - - diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py index 6de888bc..c8394abf 100644 --- a/mphys/network/remote_component.py +++ b/mphys/network/remote_component.py @@ -13,7 +13,8 @@ class RemoteComp(om.ExplicitComponent): """ def stop_server(self): # shortcut for stopping server from top level - self.server_manager.stop_server() + if self.server_manager is not None: + self.server_manager.stop_server() def initialize(self): self.options.declare('run_server_filename', default="mphys_server.py", desc="python file that will launch the Server class") @@ -28,36 +29,42 @@ def initialize(self): self.options.declare('var_naming_dot_replacement', default=":", desc="what to replace '.' within dv/response name trees") self.options.declare('additional_remote_inputs', default=[], types=list, desc="additional inputs not defined as design vars in the remote component") self.options.declare('additional_remote_outputs', default=[], types=list, desc="additional outputs not defined as objective/constraints in the remote component") + self.options.declare('additional_remote_constants', default=[], types=list, desc="same as additional_remote_inputs, but derivatives wrt constants will not be computed") self.options.declare('use_derivative_coloring', default=False, types=bool, desc="assign derivative coloring to objective/constraints. Only for cases with parallel servers") def setup(self): - if self.comm.size>1: - raise SystemError('Using Remote Component on more than 1 rank is not supported') - self.time_estimate_multiplier = self.options['time_estimate_multiplier'] - self.time_estimate_buffer = self.options['time_estimate_buffer'] - self.reboot_only_on_function_call = self.options['reboot_only_on_function_call'] - self.dump_json = self.options['dump_json'] - self.dump_separate_json = self.options['dump_separate_json'] self.var_naming_dot_replacement = self.options['var_naming_dot_replacement'] - self.additional_remote_inputs = self.options['additional_remote_inputs'] - self.additional_remote_outputs = self.options['additional_remote_outputs'] self.use_derivative_coloring = self.options['use_derivative_coloring'] self.derivative_coloring_num = 0 - if self.dump_separate_json: - self.dump_json = True - - self._setup_server_manager() - # for tracking model times, and determining whether to relaunch servers - self.times_function = np.array([]) - self.times_gradient = np.array([]) - - # get baseline model - print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True) - output_dict = self.evaluate_model(command='initialize', - remote_input_dict={'additional_inputs': self.additional_remote_inputs, - 'additional_outputs': self.additional_remote_outputs, - 'component_name': self.name}) + output_dict = None + if self.comm.rank==0: + self.time_estimate_multiplier = self.options['time_estimate_multiplier'] + self.time_estimate_buffer = self.options['time_estimate_buffer'] + self.reboot_only_on_function_call = self.options['reboot_only_on_function_call'] + self.dump_json = self.options['dump_json'] + self.dump_separate_json = self.options['dump_separate_json'] + self.additional_remote_inputs = self.options['additional_remote_inputs'] + self.additional_remote_outputs = self.options['additional_remote_outputs'] + self.additional_remote_constants = self.options['additional_remote_constants'] + self.last_analysis_completed_time = time.time() # for tracking down time between function/gradient calls + if self.dump_separate_json: + self.dump_json = True + + self._setup_server_manager() + + # for tracking model times, and determining whether to relaunch servers + self.times_function = np.array([]) + self.times_gradient = np.array([]) + + # get baseline model + print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True) + output_dict = self.evaluate_model(command='initialize', + remote_input_dict={'additional_inputs': self.additional_remote_inputs, + 'additional_outputs': self.additional_remote_outputs, + 'additional_constants': self.additional_remote_constants, + 'component_name': self.name}) + output_dict = self.comm.bcast(output_dict) self._add_design_inputs_from_baseline_model(output_dict) self._add_objectives_from_baseline_model(output_dict) @@ -65,12 +72,16 @@ def setup(self): self._add_additional_inputs_from_baseline_model(output_dict) self._add_additional_outputs_from_baseline_model(output_dict) + self._add_additional_constants_from_baseline_model(output_dict) self.declare_partials('*', '*') def compute(self,inputs,outputs): - input_dict = self._create_input_dict_for_server(inputs) - remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate') + remote_dict = None + if self.comm.rank==0: + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate') + remote_dict = self.comm.bcast(remote_dict) self._assign_objectives_from_remote_output(remote_dict, outputs) self._assign_constraints_from_remote_output(remote_dict, outputs) @@ -79,8 +90,11 @@ def compute(self,inputs,outputs): def compute_partials(self, inputs, partials): # NOTE: this will not use of and wrt inputs, if given in outer script's compute_totals/check_totals - input_dict = self._create_input_dict_for_server(inputs) - remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives') + remote_dict = None + if self.comm.rank==0: + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives') + remote_dict = self.comm.bcast(remote_dict) self._assign_objective_partials_from_remote_output(remote_dict, partials) self._assign_constraint_partials_from_remote_output(remote_dict, partials) @@ -94,14 +108,16 @@ def evaluate_model(self, remote_input_dict=None, command='initialize'): if self.dump_json: self._dump_json(remote_input_dict, command) + down_time = time.time() - self.last_analysis_completed_time model_start_time = time.time() self._send_inputs_to_server(remote_input_dict, command) remote_output_dict = self._receive_outputs_from_server() model_time_elapsed = time.time() - model_start_time + self.last_analysis_completed_time = time.time() if self.dump_json: - remote_output_dict.update({'wall_time': model_time_elapsed}) + remote_output_dict.update({'wall_time': model_time_elapsed, 'down_time': down_time}) self._dump_json(remote_output_dict, command) if self._doing_derivative_evaluation(command): @@ -133,11 +149,13 @@ def _assign_additional_partials_from_remote_output(self, remote_dict, partials): partials[( output.replace('.',self.var_naming_dot_replacement), inp.replace('.',self.var_naming_dot_replacement) )] = remote_dict['additional_outputs'][output]['derivatives'][inp] def _create_input_dict_for_server(self, inputs): - input_dict = {'design_vars': {}, 'additional_inputs': {}, 'additional_outputs': self.additional_remote_outputs, 'component_name': self.name} + input_dict = {'design_vars': {}, 'additional_inputs': {}, 'additional_constants': {}, 'additional_outputs': self.additional_remote_outputs, 'component_name': self.name} for dv in self.design_var_keys: - input_dict['design_vars'][dv.replace('.',self.var_naming_dot_replacement)] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()} + input_dict['design_vars'][dv] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()} for input in self.additional_remote_inputs: input_dict['additional_inputs'][input] = {'val': inputs[input.replace('.',self.var_naming_dot_replacement)].tolist()} + for constant in self.additional_remote_constants: + input_dict['additional_constants'][constant] = {'val': inputs[constant.replace('.',self.var_naming_dot_replacement)].tolist()} return input_dict def _doing_derivative_evaluation(self, command: str): @@ -150,6 +168,8 @@ def _is_first_gradient_evaluation(self): return len(self.times_gradient) == 0 def _need_to_restart_server(self, command: str): + if self.server_manager.job_has_expired(): + return True if self._doing_derivative_evaluation(command): if self._is_first_gradient_evaluation() or self.reboot_only_on_function_call: return False @@ -205,6 +225,12 @@ def _add_additional_inputs_from_baseline_model(self, output_dict): self.add_input(input.replace('.',self.var_naming_dot_replacement), output_dict['additional_inputs'][input]['val']) + def _add_additional_constants_from_baseline_model(self, output_dict): + self.additional_remote_constants = list(output_dict['additional_constants'].keys()) + for constant in self.additional_remote_constants: + self.add_input(constant.replace('.',self.var_naming_dot_replacement), + output_dict['additional_constants'][constant]['val']) + def _add_additional_outputs_from_baseline_model(self, output_dict): self.additional_remote_outputs = list(output_dict['additional_outputs'].keys()) for output in self.additional_remote_outputs: diff --git a/mphys/network/server.py b/mphys/network/server.py index 1fb509bd..0d8feda7 100644 --- a/mphys/network/server.py +++ b/mphys/network/server.py @@ -1,5 +1,7 @@ import openmdao.api as om import warnings +from copy import deepcopy +import numpy as np class Server: """ @@ -25,7 +27,8 @@ class Server: def __init__(self, get_om_group_function_pointer, ignore_setup_warnings = False, ignore_runtime_warnings = False, - rerun_initial_design = False): + rerun_initial_design = False, + write_n2 = False): self.get_om_group_function_pointer = get_om_group_function_pointer self.ignore_setup_warnings = ignore_setup_warnings @@ -37,7 +40,9 @@ def __init__(self, get_om_group_function_pointer, self.derivatives = None self.additional_inputs = None self.additional_outputs = None + self.additional_constants = None self.design_counter = 0 # more debugging info for client side json dumping + self.write_n2 = write_n2 self._load_the_model() @@ -55,11 +60,11 @@ def _load_the_model(self): self.prob.setup(mode='rev') else: self.prob.setup(mode='rev') - self.rank = self.prob.model.comm.rank + self.comm = self.prob.model.comm # temporary fix for MELD initialization issue if self.rerun_initial_design: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Evaluating baseline design', flush=True) self._run_model() @@ -99,7 +104,7 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}): design_vars = self.prob.model._design_vars remote_output_dict['design_vars'] = {} for dv in design_vars.keys(): - remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv), + remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv, get_remote=True), 'ref': design_vars[dv]['ref'], 'ref0': design_vars[dv]['ref0'], 'lower': design_vars[dv]['lower'], @@ -117,11 +122,19 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}): def _gather_additional_inputs_from_om_problem(self, remote_output_dict = {}): remote_output_dict['additional_inputs'] = {} for input in self.additional_inputs: - remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input)} + remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input, get_remote=True)} if hasattr(remote_output_dict['additional_inputs'][input]['val'], 'tolist'): remote_output_dict['additional_inputs'][input]['val'] = remote_output_dict['additional_inputs'][input]['val'].tolist() return remote_output_dict + def _gather_additional_constants_from_om_problem(self, remote_output_dict = {}): + remote_output_dict['additional_constants'] = {} + for constant in self.additional_constants: + remote_output_dict['additional_constants'][constant] = {'val': self.prob.get_val(constant)} + if hasattr(remote_output_dict['additional_constants'][constant]['val'], 'tolist'): + remote_output_dict['additional_constants'][constant]['val'] = remote_output_dict['additional_constants'][constant]['val'].tolist() + return remote_output_dict + def _gather_design_outputs_from_om_problem(self, remote_output_dict = {}): responses = self.prob.model._responses remote_output_dict.update({'objective':{}, 'constraints':{}}) @@ -261,6 +274,7 @@ def _gather_inputs_and_outputs_from_om_problem(self): remote_output_dict = self._gather_design_outputs_from_om_problem(remote_output_dict) remote_output_dict = self._gather_additional_inputs_from_om_problem(remote_output_dict) remote_output_dict = self._gather_additional_outputs_from_om_problem(remote_output_dict) + remote_output_dict = self._gather_additional_constants_from_om_problem(remote_output_dict) if self.derivatives is not None: remote_output_dict = self._gather_design_derivatives_from_om_problem(remote_output_dict) remote_output_dict = self._gather_additional_output_derivatives_from_om_problem(remote_output_dict) @@ -271,23 +285,43 @@ def _gather_inputs_and_outputs_from_om_problem(self): def _set_design_variables_into_the_server_problem(self, input_dict): design_changed = False for key in input_dict['design_vars'].keys(): - if (self.prob.get_val(key)!=input_dict['design_vars'][key]['val']).any(): + if (self.prob.get_val(key, get_remote=True)!=input_dict['design_vars'][key]['val']).any(): design_changed = True self.prob.set_val(key, input_dict['design_vars'][key]['val']) return design_changed def _set_additional_inputs_into_the_server_problem(self, input_dict, design_changed): for key in input_dict['additional_inputs'].keys(): - if (self.prob.get_val(key)!=input_dict['additional_inputs'][key]['val']).any(): + design_changed_condition = self.prob.get_val(key, get_remote=True)!=input_dict['additional_inputs'][key]['val'] + if type(design_changed_condition)==bool: + design_changed = deepcopy(design_changed_condition) + elif design_changed_condition.any(): design_changed = True - self.prob.set_val(key, input_dict['additional_inputs'][key]['val']) + if np.array(input_dict['additional_inputs'][key]['val']).shape==self.prob.get_val(key, get_remote=True).shape: + self.prob.set_val(key, input_dict['additional_inputs'][key]['val']) + elif self.comm.rank==0: + print(f'SERVER: shape of additional input {key} differs from actual input size... ignoring.', flush=True) return design_changed + def _set_additional_constants_into_the_server_problem(self, input_dict, design_changed): + for key in input_dict['additional_constants'].keys(): + if np.array(input_dict['additional_constants'][key]['val']).shape==self.prob.get_val(key).shape: + design_changed_condition = self.prob.get_val(key)!=input_dict['additional_constants'][key]['val'] + if type(design_changed_condition)==bool: + design_changed = deepcopy(design_changed_condition) + elif design_changed_condition.any(): + design_changed = True + self.prob.set_val(key, input_dict['additional_constants'][key]['val']) + return max(self.comm.allgather(design_changed)) + def _save_additional_variable_names(self, input_dict): self.additional_inputs = input_dict['additional_inputs'] self.additional_outputs = input_dict['additional_outputs'] + self.additional_constants = input_dict['additional_constants'] if hasattr(self.additional_inputs,'keys'): self.additional_inputs = list(self.additional_inputs.keys()) + if hasattr(self.additional_constants,'keys'): + self.additional_constants = list(self.additional_constants.keys()) def run(self): """ @@ -295,53 +329,54 @@ def run(self): """ while True: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Waiting for new design...', flush=True) command, input_dict = self._parse_incoming_message() # interpret command (options are "shutdown", "initialize", "evaluate", or "evaluate derivatives") if command=='shutdown': - if self.rank==0: + if self.comm.rank==0: print('SERVER: Received signal to shutdown', flush=True) break self._save_additional_variable_names(input_dict) if command=='initialize': # evaluate baseline model for RemoteComp setup - if self.rank==0: + if self.comm.rank==0: print('SERVER: Initialization requested... using baseline design', flush=True) if self.current_design_has_been_evaluated: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Design already evaluated, skipping run_model', flush=True) else: self._run_model() else: design_changed = self._set_design_variables_into_the_server_problem(input_dict) design_changed = self._set_additional_inputs_into_the_server_problem(input_dict, design_changed) + design_changed = self._set_additional_constants_into_the_server_problem(input_dict, design_changed) if design_changed: self.current_design_has_been_evaluated = False self.current_derivatives_have_been_evaluated = False if command=='evaluate derivatives': # compute derivatives if self.current_derivatives_have_been_evaluated: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Derivatives already evaluated, skipping compute_totals', flush=True) else: if not self.current_design_has_been_evaluated: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Derivative needed, but design has changed... evaluating forward solution first', flush=True) self._run_model() - if self.rank==0: + if self.comm.rank==0: print('SERVER: Evaluating derivatives', flush=True) self._compute_totals() elif command=='evaluate': # run model if self.current_design_has_been_evaluated: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Design already evaluated, skipping run_model', flush=True) else: - if self.rank==0: + if self.comm.rank==0: print('SERVER: Evaluating design', flush=True) self._run_model() @@ -350,4 +385,5 @@ def run(self): self._send_outputs_to_client(output_dict) # write current n2 with values - om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html") + if self.write_n2: + om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html") diff --git a/mphys/network/server_manager.py b/mphys/network/server_manager.py index dda1c75d..376b34e2 100644 --- a/mphys/network/server_manager.py +++ b/mphys/network/server_manager.py @@ -29,3 +29,8 @@ def enough_time_is_remaining(self, estimated_model_time): How much time the new analysis is estimated to take """ return True + + def job_has_expired(self): + """ + Check if the job has run out of time. + """ diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py index 85f646d6..a36fe1b5 100644 --- a/mphys/network/zmq_pbs.py +++ b/mphys/network/zmq_pbs.py @@ -19,6 +19,7 @@ def initialize(self): self.options.declare('port', default=5081, desc="port number for server/client communication") self.options.declare('acceptable_port_range', default=[5081,6000], desc="port range to look through if 'port' is currently busy") self.options.declare('additional_server_args', default="", desc="Optional arguments to give server, in addition to --port ") + self.options.declare('job_expiration_max_restarts', default=None, desc="Optional maximum number of server restarts due to job expiration; unlimited by default") super().initialize() self.server_manager = None # for avoiding reinitialization due to multiple setup calls @@ -40,7 +41,8 @@ def _setup_server_manager(self): component_name=self.name, port=self.options['port'], acceptable_port_range=self.options['acceptable_port_range'], - additional_server_args=self.options['additional_server_args']) + additional_server_args=self.options['additional_server_args'], + job_expiration_max_restarts=self.options['job_expiration_max_restarts']) class MPhysZeroMQServerManager(ServerManager): """ @@ -61,6 +63,8 @@ class MPhysZeroMQServerManager(ServerManager): Range of alternative port numbers if specified port is already in use additional_server_args : str Optional arguments to give server, in addition to --port + job_expiration_max_restarts : int + Optional maximum number of server restarts due to job expiration; unlimited by default """ def __init__(self, pbs: PBS, @@ -68,7 +72,8 @@ def __init__(self, component_name: str, port=5081, acceptable_port_range=[5081,6000], - additional_server_args='' + additional_server_args='', + job_expiration_max_restarts=None ): self.pbs = pbs self.run_server_filename = run_server_filename @@ -76,8 +81,10 @@ def __init__(self, self.port = port self.acceptable_port_range = acceptable_port_range self.additional_server_args = additional_server_args + self.job_expiration_max_restarts = job_expiration_max_restarts self.queue_time_delay = 5 # seconds to wait before rechecking if a job has started self.server_counter = 0 # for saving output of each server to different files + self.job_expiration_restarts = 0 self.start_server() def start_server(self): @@ -87,7 +94,8 @@ def start_server(self): def stop_server(self): print(f'CLIENT (subsystem {self.component_name}): Stopping the remote analysis server', flush=True) - self.socket.send('shutdown|null'.encode()) + if self.job.state=='R': + self.socket.send('shutdown|null'.encode()) self._shutdown_server() self.socket.close() @@ -98,6 +106,19 @@ def enough_time_is_remaining(self, estimated_model_time): else: return estimated_model_time < self.job.walltime_remaining + def job_has_expired(self): + self.job.update_job_state() + if self.job.state=='R': + return False + else: + if self.job_expiration_max_restarts is not None: + if self.job_expiration_restarts+1 > self.job_expiration_max_restarts: + self.stop_server() + raise RuntimeError(f'CLIENT (subsystem {self.component_name}): Reached maximum number of job expiration restarts') + self.job_expiration_restarts += 1 + print(f'CLIENT (subsystem {self.component_name}): Job no longer running; flagging for job restart') + return True + def _port_is_in_use(self, port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port))==0 @@ -132,11 +153,11 @@ def _launch_job(self): def _wait_for_job_to_start(self): print(f'CLIENT (subsystem {self.component_name}): Waiting for job to start', flush=True) job_submission_time = time.time() - self._setup_placeholder_ssh() + self._setup_dummy_socket() while self.job.state!='R': time.sleep(self.queue_time_delay) self.job.update_job_state() - self._stop_placeholder_ssh() + self._stop_dummy_socket() self.job_start_time = time.time() print(f'CLIENT (subsystem {self.component_name}): Job started (queue wait time: {(time.time()-job_submission_time)/3600} hours)', flush=True) @@ -151,15 +172,14 @@ def _shutdown_server(self): time.sleep(0.1) # prevent full shutdown before job deletion? self.job.qdel() - def _setup_placeholder_ssh(self): - print(f'CLIENT (subsystem {self.component_name}): Starting placeholder process to hold port {self.port} while in queue', flush=True) - ssh_command = f'ssh -4 -o ServerAliveCountMax=40 -o ServerAliveInterval=15 -N -L {self.port}:localhost:{self.port} {socket.gethostname()} &' - self.ssh_proc = subprocess.Popen(ssh_command.split(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) + def _setup_dummy_socket(self): + print(f'CLIENT (subsystem {self.component_name}): Starting dummy ZeroMQ socket to hold port {self.port} while in queue', flush=True) + context = zmq.Context() + self.dummy_socket = context.socket(zmq.REP) + self.dummy_socket.bind(f"tcp://*:{self.port}") - def _stop_placeholder_ssh(self): - self.ssh_proc.kill() + def _stop_dummy_socket(self): + self.dummy_socket.close() class MPhysZeroMQServer(Server): """ @@ -168,21 +188,22 @@ class MPhysZeroMQServer(Server): def __init__(self, port, get_om_group_function_pointer, ignore_setup_warnings = False, ignore_runtime_warnings = False, - rerun_initial_design = False): + rerun_initial_design = False, + write_n2 = False): super().__init__(get_om_group_function_pointer, ignore_setup_warnings, - ignore_runtime_warnings, rerun_initial_design) + ignore_runtime_warnings, rerun_initial_design, write_n2) self._setup_zeromq_socket(port) def _setup_zeromq_socket(self, port): - if self.rank==0: + if self.comm.rank==0: context = zmq.Context() self.socket = context.socket(zmq.REP) self.socket.bind(f"tcp://*:{port}") def _parse_incoming_message(self): inputs = None - if self.rank==0: + if self.comm.rank==0: inputs = self.socket.recv().decode() inputs = self.prob.model.comm.bcast(inputs) @@ -192,7 +213,7 @@ def _parse_incoming_message(self): return command, input_dict def _send_outputs_to_client(self, output_dict: dict): - if self.rank==0: + if self.comm.rank==0: self.socket.send(str(json.dumps(output_dict)).encode()) def get_default_zmq_pbs_argparser():