Skip to content

Commit

Permalink
Move some M2ulPhyS IO support into standalone fcns (#244)
Browse files Browse the repository at this point in the history
On the path to making the IO capabilities more reusable, in this
commit we extract 5 functions out of M2ulPhyS and into standalone
functions.  The refactored functions are

write_soln_data
read_partitioned_soln_data
read_serialized_soln_data
partitioning_file_hdf5
serialize_soln_for_write
  • Loading branch information
trevilo committed Feb 2, 2024
1 parent 4ab7af7 commit 617e730
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 77 deletions.
6 changes: 3 additions & 3 deletions src/M2ulPhyS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ void M2ulPhyS::initVariables() {
if (config.isRestartSerialized("read")) {
assert(serial_mesh->Conforming());
partitioning_ = Array<int>(serial_mesh->GeneratePartitioning(nprocs_, defaultPartMethod), nelemGlobal_);
partitioning_file_hdf5("write");
partitioning_file_hdf5("write", config, groupsMPI, nelemGlobal_, partitioning_);
} else {
partitioning_file_hdf5("read");
partitioning_file_hdf5("read", config, groupsMPI, nelemGlobal_, partitioning_);
}
}

Expand Down Expand Up @@ -359,7 +359,7 @@ void M2ulPhyS::initVariables() {
if (nprocs_ > 1) {
assert(serial_mesh->Conforming());
partitioning_ = Array<int>(serial_mesh->GeneratePartitioning(nprocs_, defaultPartMethod), nelemGlobal_);
if (rank0_) partitioning_file_hdf5("write");
if (rank0_) partitioning_file_hdf5("write", config, groupsMPI, nelemGlobal_, partitioning_);
MPI_Barrier(groupsMPI->getTPSCommWorld());
}

Expand Down
5 changes: 0 additions & 5 deletions src/M2ulPhyS.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,7 @@ class M2ulPhyS : public TPS::Solver {
void initilizeSpeciesFromLTE();

// i/o routines
void read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data);
void read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data, IOFamily &fam);
void restart_files_hdf5(string mode, string inputFileName = std::string());
void partitioning_file_hdf5(string mode);
void serialize_soln_for_write(IOFamily &fam);
void write_soln_data(hid_t group, string varName, hid_t dataspace, double *data);

void Check_NAN();
bool Check_JobResubmit();
Expand Down
132 changes: 64 additions & 68 deletions src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) {
double *data = fam.pfunc_->HostReadWrite();
// special case if writing a serial restart
if ((config.isRestartSerialized(mode)) && (nprocs_ > 1)) {
serialize_soln_for_write(fam);
serialize_soln_for_write(fam, groupsMPI, mesh->GetNE(), nelemGlobal_, locToGlobElem, partitioning_);
if (rank0_) data = fam.serial_sol->HostReadWrite();
}

Expand All @@ -274,7 +274,7 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) {

// save raw data
if (rank0_ || config.isRestartPartitioned(mode)) {
for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0]);
for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0], rank0_);
}

if (group >= 0) H5Gclose(group);
Expand Down Expand Up @@ -326,10 +326,14 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) {
if (var.inRestartFile_) {
std::string h5Path = fam.group_ + "/" + var.varName_;
if (rank0_) grvy_printf(ginfo, "--> Reading h5 path = %s\n", h5Path.c_str());
if (config.isRestartPartitioned(mode))
if (config.isRestartPartitioned(mode)) {
read_partitioned_soln_data(file, h5Path.c_str(), var.index_ * numInSoln, data);
else
read_serialized_soln_data(file, h5Path.c_str(), dof, var.index_, data, fam);
} else {
assert(partitioning_ != NULL);
assert(config.isRestartSerialized("read"));
read_serialized_soln_data(file, h5Path.c_str(), dof, var.index_, data, fam, groupsMPI, partitioning_,
nelemGlobal_);
}
}
}
}
Expand Down Expand Up @@ -381,23 +385,27 @@ void M2ulPhyS::restart_files_hdf5(string mode, string inputFileName) {
return;
}

void M2ulPhyS::partitioning_file_hdf5(std::string mode) {
void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MPI_Groups *groupsMPI, int nelemGlobal,
Array<int> &partitioning) {
MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld();
const bool rank0 = groupsMPI->isWorldRoot();
const int nprocs = groupsMPI->getTPSWorldSize();

grvy_timer_begin(__func__);

// only rank 0 writes partitioning file
if (!rank0_ && (mode == "write")) return;
if (!rank0 && (mode == "write")) return;

// hid_t file, dataspace, data_soln;
hid_t file = -1, dataspace;
herr_t status;
std::string fileName = config.GetPartitionBaseName();
fileName += "." + std::to_string(nprocs_) + "p.h5";
fileName += "." + std::to_string(nprocs) + "p.h5";

assert((mode == "read") || (mode == "write"));

if (mode == "write") {
assert(partitioning_.Size() > 0);
assert(partitioning.Size() > 0);

if (file_exists(fileName)) {
grvy_printf(gwarn, "Removing existing partition file: %s\n", fileName.c_str());
Expand All @@ -421,35 +429,35 @@ void M2ulPhyS::partitioning_file_hdf5(std::string mode) {

if (mode == "write") {
// Attributes
h5_save_attribute(file, "numProcs", nprocs_);
h5_save_attribute(file, "numProcs", nprocs);

// Raw partition info
hsize_t dims[1];
hid_t data;

dims[0] = partitioning_.Size();
dims[0] = partitioning.Size();
dataspace = H5Screate_simple(1, dims, NULL);
assert(dataspace >= 0);

data = H5Dcreate2(file, "partitioning", H5T_NATIVE_INT, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
assert(data >= 0);
status = H5Dwrite(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning_.GetData());
status = H5Dwrite(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning.HostRead());
assert(status >= 0);
H5Dclose(data);
H5Fclose(file);
} else if (mode == "read") {
partitioning_.SetSize(nelemGlobal_);
partitioning.SetSize(nelemGlobal);

if (rank0_) {
if (rank0) {
grvy_printf(ginfo, "Reading original domain decomposition partition file: %s\n", fileName.c_str());

// verify partition info matches current proc count
{
int np = 0;
h5_read_attribute(file, "numProcs", np);
grvy_printf(ginfo, "--> # partitions defined = %i\n", np);
if (np != nprocs_) {
grvy_printf(gerror, "[ERROR]: Partition info does not match current processor count -> %i\n", nprocs_);
if (np != nprocs) {
grvy_printf(gerror, "[ERROR]: Partition info does not match current processor count -> %i\n", nprocs);
exit(ERROR);
}
}
Expand All @@ -462,55 +470,50 @@ void M2ulPhyS::partitioning_file_hdf5(std::string mode) {
dataspace = H5Dget_space(data);
numInFile = H5Sget_simple_extent_npoints(dataspace);

assert((int)numInFile == nelemGlobal_);
assert((int)numInFile == nelemGlobal);

status = H5Dread(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning_.GetData());
status = H5Dread(data, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, partitioning.HostWrite());
assert(status >= 0);
H5Dclose(data);
} // <-- end rank0_

#if 0
// distribute partition vector to all procs
MPI_Bcast(partitioning_.GetData(), nelemGlobal_, MPI_INT, 0, TPSCommWorld);
MPI_Bcast(partitioning_.GetData(), nelemGlobal, MPI_INT, 0, TPSCommWorld);
#endif

// distribute partition vector to all procs (serialzed per process variant)
int tag = 21;

if (rank0_) {
for (int rank = 1; rank < nprocs_; rank++) {
MPI_Send(partitioning_.GetData(), nelemGlobal_, MPI_INT, rank, tag, TPSCommWorld);
if (rank0) {
for (int rank = 1; rank < nprocs; rank++) {
MPI_Send(partitioning.HostRead(), nelemGlobal, MPI_INT, rank, tag, TPSCommWorld);
grvy_printf(gdebug, "Sent partitioning data to rank %i\n", rank);
}
} else {
MPI_Recv(partitioning_.GetData(), nelemGlobal_, MPI_INT, 0, tag, TPSCommWorld, MPI_STATUS_IGNORE);
MPI_Recv(partitioning.HostWrite(), nelemGlobal, MPI_INT, 0, tag, TPSCommWorld, MPI_STATUS_IGNORE);
}
if (rank0_) grvy_printf(ginfo, "--> partition file read complete\n");
if (rank0) grvy_printf(ginfo, "--> partition file read complete\n");
}
grvy_timer_end(__func__);
}

void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) {
void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne,
const int *locToGlobElem, const Array<int> &partitioning) {
MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld();
// Get total number of elements
const int local_ne = mesh->GetNE();
int global_ne;
MPI_Reduce(&local_ne, &global_ne, 1, MPI_INT, MPI_SUM, 0, TPSCommWorld);

// ks note: need to double check this routine when additional solution
// families are added and then remove next assert
// assert(ioFamily == "/solution");

// get pargridfunction for this IO family
// int iFamily = ioData.getIOFamilyIndex(ioFamily);
// assert(iFamily >= 0);
ParGridFunction *pfunc = fam.pfunc_;

if (rank0_) assert(global_ne == nelemGlobal_);

assert((locToGlobElem != NULL) && (partitioning_ != NULL));
const bool rank0 = groupsMPI->isWorldRoot();

// Ensure consistency
int global_ne_check;
MPI_Reduce(&local_ne, &global_ne_check, 1, MPI_INT, MPI_SUM, 0, TPSCommWorld);
if (rank0) {
assert(global_ne_check == global_ne);
assert(partitioning.Size() == global_ne);
}
assert(locToGlobElem != NULL);

if (rank0_) {
ParGridFunction *pfunc = fam.pfunc_;
if (rank0) {
grvy_printf(ginfo, "Generating serialized restart file (group %s...)\n", fam.group_.c_str());
// copy my own data
Array<int> lvdofs, gvdofs;
Expand All @@ -525,7 +528,7 @@ void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) {

// have rank 0 receive data from other tasks and copy its own
for (int gelem = 0; gelem < global_ne; gelem++) {
int from_rank = partitioning_[gelem];
int from_rank = partitioning[gelem];
if (from_rank != 0) {
fam.serial_fes->GetElementVDofs(gelem, gvdofs);
lsoln.SetSize(gvdofs.Size());
Expand Down Expand Up @@ -553,9 +556,7 @@ void M2ulPhyS::serialize_soln_for_write(IOFamily &fam) {
} // end function: serialize_soln_for_write()

// convenience function to read solution data for parallel restarts
void M2ulPhyS::read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data) {
assert(config.isRestartPartitioned("read"));

void read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data) {
hid_t data_soln;
herr_t status;

Expand All @@ -567,10 +568,12 @@ void M2ulPhyS::read_partitioned_soln_data(hid_t file, string varName, size_t ind
}

// convenience function to read and distribute solution data for serialized restarts
void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data,
IOFamily &fam) {
MPI_Comm TPSCommWorld = this->groupsMPI->getTPSCommWorld();
assert(config.isRestartSerialized("read"));
void read_serialized_soln_data(hid_t file, string varName, int numDof, int varOffset, double *data, IOFamily &fam,
MPI_Groups *groupsMPI, Array<int> partitioning, int nelemGlobal) {
MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld();
const bool rank0 = groupsMPI->isWorldRoot();
const int nprocs = groupsMPI->getTPSWorldSize();
const int rank = groupsMPI->getTPSWorldRank();

hid_t data_soln;
herr_t status;
Expand All @@ -580,13 +583,7 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof,

const int tag = 20;

// assert( (dim == 2) || (dim == 3) );
// if(dim == 2)
// numStateVars = 4;
// else
// numStateVars = 5;

if (rank0_) {
if (rank0) {
grvy_printf(ginfo, "[RestartSerial]: Reading %s for distribution\n", varName.c_str());

Vector data_serial;
Expand All @@ -598,15 +595,15 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof,
H5Dclose(data_soln);

// assign solution owned by rank 0
assert(partitioning_ != NULL);
assert(partitioning != NULL);

Array<int> lvdofs, gvdofs;
Vector lnodes;
int counter = 0;
// int ndof_per_elem;

for (int gelem = 0; gelem < nelemGlobal_; gelem++) {
if (partitioning_[gelem] == 0) {
for (int gelem = 0; gelem < nelemGlobal; gelem++) {
if (partitioning[gelem] == 0) {
// cull out subset of local vdof vector to use for this solution var
fam.pfunc_->ParFESpace()->GetElementVDofs(counter, lvdofs);
int numDof_per_this_elem = lvdofs.Size() / numStateVars;
Expand All @@ -625,10 +622,10 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof,
}

// pack remaining data and send to other processors
for (int rank = 1; rank < nprocs_; rank++) {
for (int rank = 1; rank < nprocs; rank++) {
std::vector<double> packedData;
for (int gelem = 0; gelem < nelemGlobal_; gelem++)
if (partitioning_[gelem] == rank) {
for (int gelem = 0; gelem < nelemGlobal; gelem++)
if (partitioning[gelem] == rank) {
fam.serial_fes->GetElementVDofs(gelem, gvdofs);
int numDof_per_this_elem = gvdofs.Size() / numStateVars;
for (int i = 0; i < numDof_per_this_elem; i++) packedData.push_back(data_serial[gvdofs[i]]);
Expand All @@ -638,8 +635,7 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof,
}
} else { // <-- end rank 0
int numlDofs = fam.pfunc_->ParFESpace()->GetNDofs();
grvy_printf(gdebug, "[%i]: local number of state vars to receive = %i (var=%s)\n", rank_, numlDofs,
varName.c_str());
grvy_printf(gdebug, "[%i]: local number of state vars to receive = %i (var=%s)\n", rank, numlDofs, varName.c_str());

std::vector<double> packedData(numlDofs);

Expand All @@ -652,12 +648,12 @@ void M2ulPhyS::read_serialized_soln_data(hid_t file, string varName, int numDof,
}

// convenience function to write HDF5 data
void M2ulPhyS::write_soln_data(hid_t group, string varName, hid_t dataspace, double *data) {
void write_soln_data(hid_t group, string varName, hid_t dataspace, double *data, bool rank0) {
hid_t data_soln;
herr_t status;
assert(group >= 0);

if (rank0_) grvy_printf(ginfo, " --> Saving (%s)\n", varName.c_str());
if (rank0) grvy_printf(ginfo, " --> Saving (%s)\n", varName.c_str());

data_soln = H5Dcreate2(group, varName.c_str(), H5T_NATIVE_DOUBLE, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
assert(data_soln >= 0);
Expand Down
13 changes: 13 additions & 0 deletions src/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@
#ifndef IO_HPP_
#define IO_HPP_

#include <hdf5.h>

#include <string>
#include <vector>

#include "mpi_groups.hpp"
#include "run_configuration.hpp"
#include "tps_mfem_wrap.hpp"

class IOFamily {
Expand Down Expand Up @@ -73,4 +78,12 @@ class IODataOrganizer {
void initializeSerial(bool root, bool serial, mfem::Mesh *serial_mesh);
};

void read_partitioned_soln_data(hid_t file, std::string varName, size_t index, double *data);
void read_serialized_soln_data(hid_t file, std::string varName, int numDof, int varOffset, double *data, IOFamily &fam,
MPI_Groups *groupsMPI, mfem::Array<int> partitioning, int nelemGlobal);
void write_soln_data(hid_t group, std::string varName, hid_t dataspace, double *data, bool rank0);
void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MPI_Groups *groupsMPI, int nelemGlobal,
mfem::Array<int> &partitioning);
void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne,
const int *locToGlobElem, const mfem::Array<int> &partitioning);
#endif // IO_HPP_
2 changes: 1 addition & 1 deletion src/run_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class RunConfiguration {

string GetMeshFileName() { return meshFile; }
string GetOutputName() { return outputFile; }
string GetPartitionBaseName() { return partFile; }
string GetPartitionBaseName() const { return partFile; }
int GetUniformRefLevels() { return ref_levels; }

int GetTimeIntegratorType() { return timeIntegratorType; }
Expand Down

0 comments on commit 617e730

Please sign in to comment.