Skip to content

Commit

Permalink
Move primary data write support into IODataOrganizer class (#244)
Browse files Browse the repository at this point in the history
This involves 3 functions that now perform the bulk of the work.
First, the (previously standalone) function serialize_soln_for_write
is moved into the IOFamily class, as serializeForWrite.  Second, two
new functions (IODataOrganizer::write and
IODataOrganizer::writeSerial) are introduced to handle writing field
data that has been registered through the IODataOrganizer class.

At the moment these functions are separate b/c they require different
input data (i.e., writeSerial needs information, such as the
partitioning array, that write does not).  They could be unified,
potentially by extending IODataOrganizer::initializeSerial to obtain
this extra information rather than passing it directly to writeSerial.
But, this refactor is left for later.
  • Loading branch information
trevilo committed Feb 2, 2024
1 parent c16d6b9 commit 118ad6a
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 114 deletions.
274 changes: 164 additions & 110 deletions src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,58 +89,10 @@ void M2ulPhyS::write_restart_files_hdf5(hid_t file, bool serialized_write) {
h5_save_attribute(file, "dofs_global", gdofs);
}

// -------------------------------------------------------------------
// Write solution data defined by IO families
// -------------------------------------------------------------------
hsize_t dims[1];

//-------------------------------------------------------
// Loop over defined IO families to save desired output
//-------------------------------------------------------
for (size_t n = 0; n < ioData.families_.size(); n++) {
IOFamily &fam = ioData.families_[n];

if (serialized_write && (nprocs_ > 1) && (rank0_)) {
assert((locToGlobElem != NULL) && (partitioning_ != NULL));
assert(fam.serial_fes != NULL);
dims[0] = fam.serial_fes->GetNDofs();
} else {
dims[0] = fam.pfunc_->ParFESpace()->GetNDofs();
}
// define groups based on defined IO families
if (rank0_) {
grvy_printf(ginfo, "\nCreating HDF5 group for defined IO families\n");
grvy_printf(ginfo, "--> %s : %s\n", fam.group_.c_str(), fam.description_.c_str());
}

hid_t group = -1;
hid_t dataspace = -1;

if (rank0_ || !serialized_write) {
dataspace = H5Screate_simple(1, dims, NULL);
assert(dataspace >= 0);
group = H5Gcreate(file, fam.group_.c_str(), H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
assert(group >= 0);
}

// get pointer to raw data
double *data = fam.pfunc_->HostReadWrite();
// special case if writing a serial restart
if (serialized_write && (nprocs_ > 1)) {
serialize_soln_for_write(fam, groupsMPI, mesh->GetNE(), nelemGlobal_, locToGlobElem, partitioning_);
if (rank0_) data = fam.serial_sol->HostReadWrite();
}

// get defined variables for this IO family
vector<IOVar> vars = ioData.vars_[fam.group_];

// save raw data
if (rank0_ || !serialized_write) {
for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0], rank0_);
}

if (group >= 0) H5Gclose(group);
if (dataspace >= 0) H5Sclose(dataspace);
if (serialized_write) {
ioData.writeSerial(file, groupsMPI->getTPSCommWorld(), mesh->GetNE(), nelemGlobal_, locToGlobElem, partitioning_);
} else {
ioData.write(file, rank0_);
}
}

Expand Down Expand Up @@ -514,63 +466,6 @@ void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MP
grvy_timer_end(__func__);
}

void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne,
const int *locToGlobElem, const Array<int> &partitioning) {
MPI_Comm TPSCommWorld = groupsMPI->getTPSCommWorld();
const bool rank0 = groupsMPI->isWorldRoot();

// Ensure consistency
int global_ne_check;
MPI_Reduce(&local_ne, &global_ne_check, 1, MPI_INT, MPI_SUM, 0, TPSCommWorld);
if (rank0) {
assert(global_ne_check == global_ne);
assert(partitioning.Size() == global_ne);
}
assert(locToGlobElem != NULL);

ParGridFunction *pfunc = fam.pfunc_;
if (rank0) {
grvy_printf(ginfo, "Generating serialized restart file (group %s...)\n", fam.group_.c_str());
// copy my own data
Array<int> lvdofs, gvdofs;
Vector lsoln;
for (int elem = 0; elem < local_ne; elem++) {
int gelem = locToGlobElem[elem];
fam.pfunc_->ParFESpace()->GetElementVDofs(elem, lvdofs);
pfunc->GetSubVector(lvdofs, lsoln);
fam.serial_fes->GetElementVDofs(gelem, gvdofs);
fam.serial_sol->SetSubVector(gvdofs, lsoln);
}

// have rank 0 receive data from other tasks and copy its own
for (int gelem = 0; gelem < global_ne; gelem++) {
int from_rank = partitioning[gelem];
if (from_rank != 0) {
fam.serial_fes->GetElementVDofs(gelem, gvdofs);
lsoln.SetSize(gvdofs.Size());

MPI_Recv(lsoln.HostReadWrite(), gvdofs.Size(), MPI_DOUBLE, from_rank, gelem, TPSCommWorld, MPI_STATUS_IGNORE);

fam.serial_sol->SetSubVector(gvdofs, lsoln);
}
}

} else {
// have non-zero ranks send their data to rank 0
Array<int> lvdofs;
Vector lsoln;
for (int elem = 0; elem < local_ne; elem++) {
int gelem = locToGlobElem[elem];
// assert(gelem > 0);
fam.pfunc_->ParFESpace()->GetElementVDofs(elem, lvdofs);
pfunc->GetSubVector(lvdofs, lsoln); // work for gpu build?

// send to task 0
MPI_Send(lsoln.HostReadWrite(), lsoln.Size(), MPI_DOUBLE, 0, gelem, TPSCommWorld);
}
}
} // end function: serialize_soln_for_write()

// convenience function to read solution data for parallel restarts
void read_partitioned_soln_data(hid_t file, string varName, size_t index, double *data) {
hid_t data_soln;
Expand Down Expand Up @@ -728,10 +623,68 @@ void M2ulPhyS::readTable(const std::string &inputPath, TableInput &result) {
// Routines for I/O data organizer helper class
// ---------------------------------------------

void IOFamily::serializeForWrite(MPI_Comm comm, int local_ne, int global_ne, const int *locToGlobElem,
const Array<int> &partitioning) {
int rank;
MPI_Comm_rank(comm, &rank);
const bool rank0 = (rank == 0);

// Ensure consistency
int global_ne_check;
MPI_Reduce(&local_ne, &global_ne_check, 1, MPI_INT, MPI_SUM, 0, comm);
if (rank0) {
assert(global_ne_check == global_ne);
assert(partitioning.Size() == global_ne);
}
assert(locToGlobElem != NULL);

ParGridFunction *pfunc = this->pfunc_;
if (rank0) {
grvy_printf(ginfo, "Generating serialized restart file (group %s...)\n", this->group_.c_str());
// copy my own data
Array<int> lvdofs, gvdofs;
Vector lsoln;
for (int elem = 0; elem < local_ne; elem++) {
int gelem = locToGlobElem[elem];
this->pfunc_->ParFESpace()->GetElementVDofs(elem, lvdofs);
pfunc->GetSubVector(lvdofs, lsoln);
this->serial_fes->GetElementVDofs(gelem, gvdofs);
this->serial_sol->SetSubVector(gvdofs, lsoln);
}

// have rank 0 receive data from other tasks and copy its own
for (int gelem = 0; gelem < global_ne; gelem++) {
int from_rank = partitioning[gelem];
if (from_rank != 0) {
this->serial_fes->GetElementVDofs(gelem, gvdofs);
lsoln.SetSize(gvdofs.Size());

MPI_Recv(lsoln.HostReadWrite(), gvdofs.Size(), MPI_DOUBLE, from_rank, gelem, comm, MPI_STATUS_IGNORE);

this->serial_sol->SetSubVector(gvdofs, lsoln);
}
}

} else {
// have non-zero ranks send their data to rank 0
Array<int> lvdofs;
Vector lsoln;
for (int elem = 0; elem < local_ne; elem++) {
int gelem = locToGlobElem[elem];
// assert(gelem > 0);
this->pfunc_->ParFESpace()->GetElementVDofs(elem, lvdofs);
pfunc->GetSubVector(lvdofs, lsoln); // work for gpu build?

// send to task 0
MPI_Send(lsoln.HostReadWrite(), lsoln.Size(), MPI_DOUBLE, 0, gelem, comm);
}
}
}

// register a new IO family which maps to a ParGridFunction
void IODataOrganizer::registerIOFamily(std::string description, std::string group, ParGridFunction *pfunc,
bool auxRestart, bool _inRestartFile) {
IOFamily family{description, group, pfunc};
IOFamily family(description, group, pfunc);
std::vector<IOVar> vars;
family.allowsAuxRestart = auxRestart;
family.inRestartFile = _inRestartFile;
Expand Down Expand Up @@ -775,6 +728,107 @@ void IODataOrganizer::initializeSerial(bool root, bool serial, Mesh *serial_mesh
}
}

void IODataOrganizer::write(hid_t file, bool rank0) {
// -------------------------------------------------------------------
// Write solution data defined by IO families
// -------------------------------------------------------------------
hsize_t dims[1];

//-------------------------------------------------------
// Loop over defined IO families to save desired output
//-------------------------------------------------------
for (size_t n = 0; n < families_.size(); n++) {
IOFamily &fam = families_[n];

dims[0] = fam.pfunc_->ParFESpace()->GetNDofs();

// define groups based on defined IO families
if (rank0) {
grvy_printf(ginfo, "\nCreating HDF5 group for defined IO families\n");
grvy_printf(ginfo, "--> %s : %s\n", fam.group_.c_str(), fam.description_.c_str());
}

hid_t group = -1;
hid_t dataspace = -1;

dataspace = H5Screate_simple(1, dims, NULL);
assert(dataspace >= 0);
group = H5Gcreate(file, fam.group_.c_str(), H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
assert(group >= 0);

// get pointer to raw data
double *data = fam.pfunc_->HostReadWrite();

// get defined variables for this IO family
vector<IOVar> vars = vars_[fam.group_];

// save raw data
for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0], rank0);

if (group >= 0) H5Gclose(group);
if (dataspace >= 0) H5Sclose(dataspace);
}
}

void IODataOrganizer::writeSerial(hid_t file, MPI_Comm comm, int local_ne, int global_ne, const int *locToGlobElem,
const Array<int> &partitioning) {
int rank;
MPI_Comm_rank(comm, &rank);
int nprocs;
MPI_Comm_size(comm, &nprocs);
const bool rank0 = (rank == 0);

// if running serially, writeSerial and write are logically the
// same, but infrastructure is different. Thus, for serial run,
// just call write() here
if (nprocs == 1) {
this->write(file, rank0);
return;
}

hsize_t dims[1];

// Loop over defined IO families to save desired output
for (size_t n = 0; n < families_.size(); n++) {
IOFamily &fam = families_[n];

if (rank0) {
assert((locToGlobElem != NULL) && (partitioning.Size() == global_ne));
assert(fam.serial_fes != NULL);
dims[0] = fam.serial_fes->GetNDofs();
grvy_printf(ginfo, "\nCreating HDF5 group for defined IO families\n");
grvy_printf(ginfo, "--> %s : %s\n", fam.group_.c_str(), fam.description_.c_str());
}

hid_t group = -1;
hid_t dataspace = -1;

if (rank0) {
dataspace = H5Screate_simple(1, dims, NULL);
assert(dataspace >= 0);
group = H5Gcreate(file, fam.group_.c_str(), H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
assert(group >= 0);
}

fam.serializeForWrite(comm, local_ne, global_ne, locToGlobElem, partitioning);

// get pointer to raw data
double *data = nullptr;
if (rank0) data = fam.serial_sol->HostReadWrite();

// get defined variables for this IO family
vector<IOVar> vars = vars_[fam.group_];

// save raw data
if (rank0) {
for (auto var : vars) write_soln_data(group, var.varName_, dataspace, data + var.index_ * dims[0], rank0);
}

if (group >= 0) H5Gclose(group);
if (dataspace >= 0) H5Sclose(dataspace);
}
}

IODataOrganizer::~IODataOrganizer() {
for (size_t n = 0; n < families_.size(); n++) {
IOFamily fam = families_[n];
Expand Down
16 changes: 12 additions & 4 deletions src/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ class IOFamily {
// true if the family is to be found in restart files
bool inRestartFile;

mfem::FiniteElementSpace *serial_fes;
mfem::GridFunction *serial_sol;
mfem::FiniteElementSpace *serial_fes = nullptr;
mfem::GridFunction *serial_sol = nullptr;

IOFamily(std::string desc, std::string grp, mfem::ParGridFunction *pf)
: description_(desc), group_(grp), pfunc_(pf) {}

void serializeForWrite(MPI_Comm comm, int local_ne, int global_ne, const int *locToGlobElem,
const Array<int> &partitioning);
};

class IOVar {
Expand All @@ -76,6 +82,10 @@ class IODataOrganizer {
int getIOFamilyIndex(std::string group);

void initializeSerial(bool root, bool serial, mfem::Mesh *serial_mesh);

void write(hid_t file, bool rank0);
void writeSerial(hid_t file, MPI_Comm comm, int local_ne, int global_ne, const int *locToGlobElem,
const Array<int> &partitioning);
};

void read_partitioned_soln_data(hid_t file, std::string varName, size_t index, double *data);
Expand All @@ -84,6 +94,4 @@ void read_serialized_soln_data(hid_t file, std::string varName, int numDof, int
void write_soln_data(hid_t group, std::string varName, hid_t dataspace, double *data, bool rank0);
void partitioning_file_hdf5(std::string mode, const RunConfiguration &config, MPI_Groups *groupsMPI, int nelemGlobal,
mfem::Array<int> &partitioning);
void serialize_soln_for_write(IOFamily &fam, MPI_Groups *groupsMPI, int local_ne, int global_ne,
const int *locToGlobElem, const mfem::Array<int> &partitioning);
#endif // IO_HPP_

0 comments on commit 118ad6a

Please sign in to comment.