From 4335a39f02d042902bbe133bd5f194e84d101b60 Mon Sep 17 00:00:00 2001 From: "Todd A. Oliver" Date: Mon, 29 Jan 2024 18:34:45 -0600 Subject: [PATCH] Move some M2ulPhyS IO support into standalone fcns (#244) On the path to making the IO capabilities more reusable, in this commit we extract 5 functions out of M2ulPhyS and into standalone functions. The refactored functions are write_soln_data read_partitioned_soln_data read_serialized_soln_data partitioning_file_hdf5 serialize_soln_for_write --- src/M2ulPhyS.cpp | 6 +- src/M2ulPhyS.hpp | 5 -- src/io.cpp | 132 ++++++++++++++++++-------------------- src/io.hpp | 13 ++++ src/run_configuration.hpp | 2 +- 5 files changed, 81 insertions(+), 77 deletions(-) diff --git a/src/M2ulPhyS.cpp b/src/M2ulPhyS.cpp index 4e9b6d687..de7be70ec 100644 --- a/src/M2ulPhyS.cpp +++ b/src/M2ulPhyS.cpp @@ -329,9 +329,9 @@ void M2ulPhyS::initVariables() { if (config.isRestartSerialized("read")) { assert(serial_mesh->Conforming()); partitioning_ = Array(serial_mesh->GeneratePartitioning(nprocs_, defaultPartMethod), nelemGlobal_); - partitioning_file_hdf5("write"); + partitioning_file_hdf5("write", config, groupsMPI, nelemGlobal_, partitioning_); } else { - partitioning_file_hdf5("read"); + partitioning_file_hdf5("read", config, groupsMPI, nelemGlobal_, partitioning_); } } @@ -359,7 +359,7 @@ void M2ulPhyS::initVariables() { if (nprocs_ > 1) { assert(serial_mesh->Conforming()); partitioning_ = Array(serial_mesh->GeneratePartitioning(nprocs_, defaultPartMethod), nelemGlobal_); - if (rank0_) partitioning_file_hdf5("write"); + if (rank0_) partitioning_file_hdf5("write", config, groupsMPI, nelemGlobal_, partitioning_); MPI_Barrier(groupsMPI->getTPSCommWorld()); } diff --git a/src/M2ulPhyS.hpp b/src/M2ulPhyS.hpp index 5bfb3f33a..05e47db78 100644 --- a/src/M2ulPhyS.hpp +++ b/src/M2ulPhyS.hpp @@ -344,12 +344,7 @@ class M2ulPhyS : public TPS::Solver { void initilizeSpeciesFromLTE(); // i/o routines - void read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data); - void read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data, IOFamily &fam); void restart_files_hdf5(string mode, string inputFileName = std::string()); - void partitioning_file_hdf5(string mode); - void serialize_soln_for_write(IOFamily &fam); - void write_soln_data(hid_t group, string varName, hid_t dataspace, double *data); void Check_NAN(); bool Check_JobResubmit(); diff --git a/src/io.cpp b/src/io.cpp index e6f98f017..c97a40f7e 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -265,7 +265,7 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) { double *data = fam.pfunc_->HostReadWrite(); // special case if writing a serial restart if ((config.isRestartSerialized(mode)) && (nprocs_ > 1)) { - serialize_soln_for_write(fam); + serialize_soln_for_write(fam, groupsMPI, mesh->GetNE(), nelemGlobal_, locToGlobElem, partitioning_); if (rank0_) data = fam.serial_sol->HostReadWrite(); } @@ -274,7 +274,7 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) { // save raw data if (rank0_ || config.isRestartPartitioned(mode)) { - for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0]); + for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0], rank0_); } if (group >= 0) H5Gclose(group); @@ -326,10 +326,14 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) { if (var.inRestartFile_) { std::string h5Path = fam.group_ + "/" + var.varName_; if (rank0_) grvy_printf(ginfo, "--> Reading h5 path = %s\n", h5Path.c_str()); - if (config.isRestartPartitioned(mode)) + if (config.isRestartPartitioned(mode)) { read_partitioned_soln_data(file, h5Path.c_str(), var.index_ * numInSoln, data); - else - read_serialized_soln_data(file, h5Path.c_str(), dof, var.index_, data, fam); + } else { + assert(partitioning_ != NULL); + assert(config.isRestartSerialized("read")); + read_serialized_soln_data(file, h5Path.c_str(), dof, var.index_, data, fam, groupsMPI, partitioning_, + nelemGlobal_); + } } } } @@ -381,23 +385,27 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) { return; } -void M2ulPhyS::partitioning_file_hdf5(std::string mode) { +void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MPI_Groups *groupsMPI, int nelemGlobal, + Array &partitioning) { MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld(); + const bool rank0 = groupsMPI->isWorldRoot(); + const int nprocs = groupsMPI->getTPSWorldSize(); + grvy_timer_begin(__func__); // only rank 0 writes partitioning file - if (!rank0_ && (mode == "write")) return; + if (!rank0 && (mode == "write")) return; // hid_t file, dataspace, data_soln; hid_t file = -1, dataspace; herr_t status; std::string fileName = config.GetPartitionBaseName(); - fileName += "." + std::to_string(nprocs_) + "p.h5"; + fileName += "." + std::to_string(nprocs) + "p.h5"; assert((mode == "read") || (mode == "write")); if (mode == "write") { - assert(partitioning_.Size() > 0); + assert(partitioning.Size() > 0); if (file_exists(fileName)) { grvy_printf(gwarn, "Removing existing partition file: %s\n", fileName.c_str()); @@ -421,26 +429,26 @@ void M2ulPhyS::partitioning_file_hdf5(std::string mode) { if (mode == "write") { // Attributes - h5_save_attribute(file, "numProcs", nprocs_); + h5_save_attribute(file, "numProcs", nprocs); // Raw partition info hsize_t dims[1]; hid_t data; - dims[0] = partitioning_.Size(); + dims[0] = partitioning.Size(); dataspace = H5Screate_simple(1, dims, NULL); assert(dataspace >= 0); data = H5Dcreate2(file, "partitioning", H5T_NATIVE_INT, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); assert(data >= 0); - status = H5Dwrite(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning_.GetData()); + status = H5Dwrite(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning.HostRead()); assert(status >= 0); H5Dclose(data); H5Fclose(file); } else if (mode == "read") { - partitioning_.SetSize(nelemGlobal_); + partitioning.SetSize(nelemGlobal); - if (rank0_) { + if (rank0) { grvy_printf(ginfo, "Reading original domain decomposition partition file: %s\n", fileName.c_str()); // verify partition info matches current proc count @@ -448,8 +456,8 @@ void M2ulPhyS::partitioning_file_hdf5(std::string mode) { int np = 0; h5_read_attribute(file, "numProcs", np); grvy_printf(ginfo, "--> # partitions defined = %i\n", np); - if (np != nprocs_) { - grvy_printf(gerror, "[ERROR]: Partition info does not match current processor count -> %i\n", nprocs_); + if (np != nprocs) { + grvy_printf(gerror, "[ERROR]: Partition info does not match current processor count -> %i\n", nprocs); exit(ERROR); } } @@ -462,55 +470,50 @@ void M2ulPhyS::partitioning_file_hdf5(std::string mode) { dataspace = H5Dget_space(data); numInFile = H5Sget_simple_extent_npoints(dataspace); - assert((int)numInFile == nelemGlobal_); + assert((int)numInFile == nelemGlobal); - status = H5Dread(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning_.GetData()); + status = H5Dread(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning.HostWrite()); assert(status >= 0); H5Dclose(data); } // <-- end rank0_ #if 0 // distribute partition vector to all procs - MPI_Bcast(partitioning_.GetData(), nelemGlobal_, MPI_INT, 0, TPSCommWorld); + MPI_Bcast(partitioning_.GetData(), nelemGlobal, MPI_INT, 0, TPSCommWorld); #endif // distribute partition vector to all procs (serialzed per process variant) int tag = 21; - if (rank0_) { - for (int rank = 1; rank < nprocs_; rank++) { - MPI_Send(partitioning_.GetData(), nelemGlobal_, MPI_INT, rank, tag, TPSCommWorld); + if (rank0) { + for (int rank = 1; rank < nprocs; rank++) { + MPI_Send(partitioning.HostRead(), nelemGlobal, MPI_INT, rank, tag, TPSCommWorld); grvy_printf(gdebug, "Sent partitioning data to rank %i\n", rank); } } else { - MPI_Recv(partitioning_.GetData(), nelemGlobal_, MPI_INT, 0, tag, TPSCommWorld, MPI_STATUS_IGNORE); + MPI_Recv(partitioning.HostWrite(), nelemGlobal, MPI_INT, 0, tag, TPSCommWorld, MPI_STATUS_IGNORE); } - if (rank0_) grvy_printf(ginfo, "--> partition file read complete\n"); + if (rank0) grvy_printf(ginfo, "--> partition file read complete\n"); } grvy_timer_end(__func__); } -void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) { +void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne, + const int *locToGlobElem, const Array &partitioning) { MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld(); - // Get total number of elements - const int local_ne = mesh->GetNE(); - int global_ne; - MPI_Reduce(&local_ne, &global_ne, 1, MPI_INT, MPI_SUM, 0, TPSCommWorld); - - // ks note: need to double check this routine when additional solution - // families are added and then remove next assert - // assert(ioFamily == "/solution"); - - // get pargridfunction for this IO family - // int iFamily = ioData.getIOFamilyIndex(ioFamily); - // assert(iFamily >= 0); - ParGridFunction *pfunc = fam.pfunc_; - - if (rank0_) assert(global_ne == nelemGlobal_); - - assert((locToGlobElem != NULL) && (partitioning_ != NULL)); + const bool rank0 = groupsMPI->isWorldRoot(); + + // Ensure consistency + int global_ne_check; + MPI_Reduce(&local_ne, &global_ne_check, 1, MPI_INT, MPI_SUM, 0, TPSCommWorld); + if (rank0) { + assert(global_ne_check == global_ne); + assert(partitioning.Size() == global_ne); + } + assert(locToGlobElem != NULL); - if (rank0_) { + ParGridFunction *pfunc = fam.pfunc_; + if (rank0) { grvy_printf(ginfo, "Generating serialized restart file (group %s...)\n", fam.group_.c_str()); // copy my own data Array lvdofs, gvdofs; @@ -525,7 +528,7 @@ void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) { // have rank 0 receive data from other tasks and copy its own for (int gelem = 0; gelem < global_ne; gelem++) { - int from_rank = partitioning_[gelem]; + int from_rank = partitioning[gelem]; if (from_rank != 0) { fam.serial_fes->GetElementVDofs(gelem, gvdofs); lsoln.SetSize(gvdofs.Size()); @@ -553,9 +556,7 @@ void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) { } // end function: serialize_soln_for_write() // convenience function to read solution data for parallel restarts -void M2ulPhyS::read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data) { - assert(config.isRestartPartitioned("read")); - +void read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data) { hid_t data_soln; herr_t status; @@ -567,10 +568,12 @@ void M2ulPhyS::read_partitioned_soln_data(hid_t file, string varName, size_t ind } // convenience function to read and distribute solution data for serialized restarts -void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data, - IOFamily &fam) { - MPI_Comm TPSCommWorld = this->groupsMPI->getTPSCommWorld(); - assert(config.isRestartSerialized("read")); +void read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data, IOFamily &fam, + MPI_Groups *groupsMPI, Array partitioning, int nelemGlobal) { + MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld(); + const bool rank0 = groupsMPI->isWorldRoot(); + const int nprocs = groupsMPI->getTPSWorldSize(); + const int rank = groupsMPI->getTPSWorldRank(); hid_t data_soln; herr_t status; @@ -580,13 +583,7 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, const int tag = 20; - // assert( (dim == 2) || (dim == 3) ); - // if(dim == 2) - // numStateVars = 4; - // else - // numStateVars = 5; - - if (rank0_) { + if (rank0) { grvy_printf(ginfo, "[RestartSerial]: Reading %s for distribution\n", varName.c_str()); Vector data_serial; @@ -598,15 +595,15 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, H5Dclose(data_soln); // assign solution owned by rank 0 - assert(partitioning_ != NULL); + assert(partitioning != NULL); Array lvdofs, gvdofs; Vector lnodes; int counter = 0; // int ndof_per_elem; - for (int gelem = 0; gelem < nelemGlobal_; gelem++) { - if (partitioning_[gelem] == 0) { + for (int gelem = 0; gelem < nelemGlobal; gelem++) { + if (partitioning[gelem] == 0) { // cull out subset of local vdof vector to use for this solution var fam.pfunc_->ParFESpace()->GetElementVDofs(counter, lvdofs); int numDof_per_this_elem = lvdofs.Size() / numStateVars; @@ -625,10 +622,10 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, } // pack remaining data and send to other processors - for (int rank = 1; rank < nprocs_; rank++) { + for (int rank = 1; rank < nprocs; rank++) { std::vector packedData; - for (int gelem = 0; gelem < nelemGlobal_; gelem++) - if (partitioning_[gelem] == rank) { + for (int gelem = 0; gelem < nelemGlobal; gelem++) + if (partitioning[gelem] == rank) { fam.serial_fes->GetElementVDofs(gelem, gvdofs); int numDof_per_this_elem = gvdofs.Size() / numStateVars; for (int i = 0; i < numDof_per_this_elem; i++) packedData.push_back(data_serial[gvdofs[i]]); @@ -638,8 +635,7 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, } } else { // <-- end rank 0 int numlDofs = fam.pfunc_->ParFESpace()->GetNDofs(); - grvy_printf(gdebug, "[%i]: local number of state vars to receive = %i (var=%s)\n", rank_, numlDofs, - varName.c_str()); + grvy_printf(gdebug, "[%i]: local number of state vars to receive = %i (var=%s)\n", rank, numlDofs, varName.c_str()); std::vector packedData(numlDofs); @@ -652,12 +648,12 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, } // convenience function to write HDF5 data -void M2ulPhyS::write_soln_data(hid_t group, string varName, hid_t dataspace, double *data) { +void write_soln_data(hid_t group, string varName, hid_t dataspace, double *data, bool rank0) { hid_t data_soln; herr_t status; assert(group >= 0); - if (rank0_) grvy_printf(ginfo, " --> Saving (%s)\n", varName.c_str()); + if (rank0) grvy_printf(ginfo, " --> Saving (%s)\n", varName.c_str()); data_soln = H5Dcreate2(group, varName.c_str(), H5T_NATIVE_DOUBLE, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); assert(data_soln >= 0); diff --git a/src/io.hpp b/src/io.hpp index 2e632dacb..0e7637f12 100644 --- a/src/io.hpp +++ b/src/io.hpp @@ -32,8 +32,13 @@ #ifndef IO_HPP_ #define IO_HPP_ +#include + +#include #include +#include "mpi_groups.hpp" +#include "run_configuration.hpp" #include "tps_mfem_wrap.hpp" class IOFamily { @@ -73,4 +78,12 @@ class IODataOrganizer { void initializeSerial(bool root, bool serial, mfem::Mesh *serial_mesh); }; +void read_partitioned_soln_data(hid_t file, std::string varName, size_t index, double *data); +void read_serialized_soln_data(hid_t file, std::string varName, int numDof, int varOffset, double *data, IOFamily &fam, + MPI_Groups *groupsMPI, mfem::Array partitioning, int nelemGlobal); +void write_soln_data(hid_t group, std::string varName, hid_t dataspace, double *data, bool rank0); +void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MPI_Groups *groupsMPI, int nelemGlobal, + mfem::Array &partitioning); +void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne, + const int *locToGlobElem, const mfem::Array &partitioning); #endif // IO_HPP_ diff --git a/src/run_configuration.hpp b/src/run_configuration.hpp index 6cd5e0e5f..c56359d57 100644 --- a/src/run_configuration.hpp +++ b/src/run_configuration.hpp @@ -298,7 +298,7 @@ class RunConfiguration { string GetMeshFileName() { return meshFile; } string GetOutputName() { return outputFile; } - string GetPartitionBaseName() { return partFile; } + string GetPartitionBaseName() const { return partFile; } int GetUniformRefLevels() { return ref_levels; } int GetTimeIntegratorType() { return timeIntegratorType; }