Skip to content

Commit

Permalink
Independent writes: Make workaround more flexible (openPMD#1660)
Browse files Browse the repository at this point in the history
* Add `flush_entire_series` parameter to Attributable::seriesFlush

* Add Attributable::touch()

More flexible base API

* Split this into seriesFlush() and iterationFlush()

* Block execution from Series

* Avoid name conflicts

* CI fixes
  • Loading branch information
franzpoeschel authored Sep 17, 2024
1 parent 016bb8a commit ff0c669
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 15 deletions.
1 change: 1 addition & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class Iteration : public Attributable
friend class internal::AttributableData;
template <typename T>
friend T &internal::makeOwning(T &self, Series);
friend class Writable;

public:
Iteration(Iteration const &) = default;
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush({FlushLevel::SkeletonOnly});
seriesFlush_impl</* flush_entire_series = */ false>(
{FlushLevel::SkeletonOnly});

size_t size = 1;
for (auto ext : e)
Expand Down
12 changes: 12 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/WriteIterations.hpp"
#include "openPMD/auxiliary/TypeTraits.hpp"
#include "openPMD/auxiliary/Variant.hpp"
#include "openPMD/backend/Attributable.hpp"
#include "openPMD/backend/Container.hpp"
Expand Down Expand Up @@ -692,6 +693,17 @@ class Series : public Attributable
*/
void close();

/**
* This overrides Attributable::iterationFlush() which will fail on Series.
*/
template <typename X = void, typename... Args>
auto iterationFlush(Args &&...)
{
static_assert(
auxiliary::dependent_false_v<X>,
"Cannot call this on an instance of Series.");
}

// clang-format off
OPENPMD_private
// clang-format on
Expand Down
23 changes: 22 additions & 1 deletion include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ class Attributable
*/
void seriesFlush(std::string backendConfig = "{}");

/** Flush the containing Iteration.
*
* Writable connects all objects of an openPMD series through a linked list
* of parents. This method will walk up the parent list to find
* the containing Iteration.
* The Iteration will be flushed regardless if it is dirty.
*
* @param backendConfig Further backend-specific instructions on how to
* implement this flush call.
* Must be provided in-line, configuration is not read
* from files.
*/
void iterationFlush(std::string backendConfig = "{}");

/** String serialization to describe an Attributable
*
* This object contains the Series data path as well as the openPMD object
Expand Down Expand Up @@ -308,6 +322,12 @@ class Attributable
*/
MyPath myPath() const;

/**
* @brief Sets the object dirty to make internal procedures think it has
* been modified.
*/
void touch();

// clang-format off
OPENPMD_protected
// clang-format on
Expand All @@ -330,7 +350,8 @@ OPENPMD_protected
internal::SeriesData *>;
/** @} */

void seriesFlush(internal::FlushParams const &);
template <bool flush_entire_series>
void seriesFlush_impl(internal::FlushParams const &);

void flushAttributes(internal::FlushParams const &);

Expand Down
2 changes: 2 additions & 0 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ class Writable final
* an object that has no parent, which is the Series object, and flush()-es
* it.
*/
template <bool flush_entire_series>
void seriesFlush(std::string backendConfig = "{}");

// clang-format off
OPENPMD_private
// clang-format on

template <bool flush_entire_series>
void seriesFlush(internal::FlushParams const &);
/*
* These members need to be shared pointers since distinct instances of
Expand Down
23 changes: 20 additions & 3 deletions src/backend/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ Attributable &Attributable::setComment(std::string const &c)

void Attributable::seriesFlush(std::string backendConfig)
{
writable().seriesFlush(std::move(backendConfig));
writable().seriesFlush</* flush_entire_series = */ true>(
std::move(backendConfig));
}

void Attributable::iterationFlush(std::string backendConfig)
{
writable().seriesFlush</* flush_entire_series = */ false>(
std::move(backendConfig));
}

Series Attributable::retrieveSeries() const
Expand Down Expand Up @@ -240,10 +247,20 @@ auto Attributable::myPath() const -> MyPath
return res;
}

void Attributable::seriesFlush(internal::FlushParams const &flushParams)
void Attributable::touch()
{
setDirtyRecursive(true);
}

template <bool flush_entire_series>
void Attributable::seriesFlush_impl(internal::FlushParams const &flushParams)
{
writable().seriesFlush(flushParams);
writable().seriesFlush<flush_entire_series>(flushParams);
}
template void
Attributable::seriesFlush_impl<true>(internal::FlushParams const &flushParams);
template void
Attributable::seriesFlush_impl<false>(internal::FlushParams const &flushParams);

void Attributable::flushAttributes(internal::FlushParams const &flushParams)
{
Expand Down
54 changes: 47 additions & 7 deletions src/backend/Writable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include "openPMD/backend/Writable.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include <stdexcept>

namespace openPMD
{
Expand All @@ -42,25 +45,62 @@ Writable::~Writable()
IOTask(this, Parameter<Operation::DEREGISTER>(parent)));
}

template <bool flush_entire_series>
void Writable::seriesFlush(std::string backendConfig)
{
seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)});
seriesFlush<flush_entire_series>(
internal::FlushParams{FlushLevel::UserFlush, std::move(backendConfig)});
}
template void Writable::seriesFlush<true>(std::string backendConfig);
template void Writable::seriesFlush<false>(std::string backendConfig);

template <bool flush_entire_series>
void Writable::seriesFlush(internal::FlushParams const &flushParams)
{
Attributable impl;
impl.setData({attributable, [](auto const *) {}});
auto [iteration_internal, series_internal] = impl.containingIteration();
if (iteration_internal)
{
(*iteration_internal)
->asInternalCopyOf<Iteration>()
.setDirtyRecursive(true);
(*iteration_internal)->asInternalCopyOf<Iteration>().touch();
}
auto series = series_internal->asInternalCopyOf<Series>();
series.flush_impl(
series.iterations.begin(), series.iterations.end(), flushParams);
auto [begin, end] = [&, &iteration_internal_lambda = iteration_internal]()
-> std::pair<Series::iterations_iterator, Series::iterations_iterator> {
if (!flush_entire_series)
{
if (!iteration_internal_lambda.has_value())
{
throw std::runtime_error(
"[Writable::seriesFlush()] Requested flushing the "
"containing Iteration, but no Iteration was found?");
}
auto it = series.iterations.begin();
auto end_lambda = series.iterations.end();
for (; it != end_lambda; ++it)
{
if (&it->second.Iteration::get() == *iteration_internal_lambda)
{
auto next = it;
++next;
return {it, next};
}
}
throw std::runtime_error(
"[Writable::seriesFlush()] Found a containing Iteration that "
"seems to not be part of the containing Series?? You might try "
"running this with `flushing_entire_series=false` as a "
"workaround, but something is still wrong.");
}
else
{
return {series.iterations.begin(), series.iterations.end()};
}
}();
series.flush_impl(begin, end, flushParams);
}

template void
Writable::seriesFlush<true>(internal::FlushParams const &flushParams);
template void
Writable::seriesFlush<false>(internal::FlushParams const &flushParams);
} // namespace openPMD
4 changes: 4 additions & 0 deletions src/binding/python/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ void init_Attributable(py::module &m)
"series_flush",
py::overload_cast<std::string>(&Attributable::seriesFlush),
py::arg("backend_config") = "{}")
.def(
"iteration_flush",
py::overload_cast<std::string>(&Attributable::iterationFlush),
py::arg("backend_config") = "{}")

.def_property_readonly(
"attributes",
Expand Down
5 changes: 2 additions & 3 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]")
Access::CREATE,
MPI_COMM_WORLD,
"adios2.engine.preferred_flush_target = \"buffer\"");
write.seriesFlush();
int size, rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
Expand All @@ -1182,11 +1183,9 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]")
* conflict with the default buffer target that will run in the destructor,
* unless the flush in the next line really is collective.
*/
std::cout << "ENTER" << std::endl;
MPI_Barrier(MPI_COMM_WORLD);
iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\"");
iteration.iterationFlush("adios2.engine.preferred_flush_target = \"disk\"");
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "LEAVE" << std::endl;
}
#endif

Expand Down

0 comments on commit ff0c669

Please sign in to comment.