From ff0c66966116dbb9869338927bf652cd563c0173 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 17 Sep 2024 19:24:39 +0200 Subject: [PATCH] Independent writes: Make workaround more flexible (#1660) * Add `flush_entire_series` parameter to Attributable::seriesFlush * Add Attributable::touch() More flexible base API * Split this into seriesFlush() and iterationFlush() * Block execution from Series * Avoid name conflicts * CI fixes --- include/openPMD/Iteration.hpp | 1 + include/openPMD/RecordComponent.tpp | 3 +- include/openPMD/Series.hpp | 12 ++++++ include/openPMD/backend/Attributable.hpp | 23 +++++++++- include/openPMD/backend/Writable.hpp | 2 + src/backend/Attributable.cpp | 23 ++++++++-- src/backend/Writable.cpp | 54 +++++++++++++++++++++--- src/binding/python/Attributable.cpp | 4 ++ test/ParallelIOTest.cpp | 5 +-- 9 files changed, 112 insertions(+), 15 deletions(-) diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index 52bf43293a..a35ceccd74 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -133,6 +133,7 @@ class Iteration : public Attributable friend class internal::AttributableData; template friend T &internal::makeOwning(T &self, Series); + friend class Writable; public: Iteration(Iteration const &) = default; diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 0a4086e3d8..542503e806 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -294,7 +294,8 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) * Flush the openPMD hierarchy to the backend without flushing any actual * data yet. */ - seriesFlush({FlushLevel::SkeletonOnly}); + seriesFlush_impl( + {FlushLevel::SkeletonOnly}); size_t size = 1; for (auto ext : e) diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 04907eda40..dbad461dc1 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -28,6 +28,7 @@ #include "openPMD/IterationEncoding.hpp" #include "openPMD/Streaming.hpp" #include "openPMD/WriteIterations.hpp" +#include "openPMD/auxiliary/TypeTraits.hpp" #include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attributable.hpp" #include "openPMD/backend/Container.hpp" @@ -692,6 +693,17 @@ class Series : public Attributable */ void close(); + /** + * This overrides Attributable::iterationFlush() which will fail on Series. + */ + template + auto iterationFlush(Args &&...) + { + static_assert( + auxiliary::dependent_false_v, + "Cannot call this on an instance of Series."); + } + // clang-format off OPENPMD_private // clang-format on diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 0f7b722ae5..08090ce2e3 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -273,6 +273,20 @@ class Attributable */ void seriesFlush(std::string backendConfig = "{}"); + /** Flush the containing Iteration. + * + * Writable connects all objects of an openPMD series through a linked list + * of parents. This method will walk up the parent list to find + * the containing Iteration. + * The Iteration will be flushed regardless if it is dirty. + * + * @param backendConfig Further backend-specific instructions on how to + * implement this flush call. + * Must be provided in-line, configuration is not read + * from files. + */ + void iterationFlush(std::string backendConfig = "{}"); + /** String serialization to describe an Attributable * * This object contains the Series data path as well as the openPMD object @@ -308,6 +322,12 @@ class Attributable */ MyPath myPath() const; + /** + * @brief Sets the object dirty to make internal procedures think it has + * been modified. + */ + void touch(); + // clang-format off OPENPMD_protected // clang-format on @@ -330,7 +350,8 @@ OPENPMD_protected internal::SeriesData *>; /** @} */ - void seriesFlush(internal::FlushParams const &); + template + void seriesFlush_impl(internal::FlushParams const &); void flushAttributes(internal::FlushParams const &); diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index d0b8b4f3c7..25a2154665 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -120,12 +120,14 @@ class Writable final * an object that has no parent, which is the Series object, and flush()-es * it. */ + template void seriesFlush(std::string backendConfig = "{}"); // clang-format off OPENPMD_private // clang-format on + template void seriesFlush(internal::FlushParams const &); /* * These members need to be shared pointers since distinct instances of diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index d5ff005389..018a776a58 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -120,7 +120,14 @@ Attributable &Attributable::setComment(std::string const &c) void Attributable::seriesFlush(std::string backendConfig) { - writable().seriesFlush(std::move(backendConfig)); + writable().seriesFlush( + std::move(backendConfig)); +} + +void Attributable::iterationFlush(std::string backendConfig) +{ + writable().seriesFlush( + std::move(backendConfig)); } Series Attributable::retrieveSeries() const @@ -240,10 +247,20 @@ auto Attributable::myPath() const -> MyPath return res; } -void Attributable::seriesFlush(internal::FlushParams const &flushParams) +void Attributable::touch() +{ + setDirtyRecursive(true); +} + +template +void Attributable::seriesFlush_impl(internal::FlushParams const &flushParams) { - writable().seriesFlush(flushParams); + writable().seriesFlush(flushParams); } +template void +Attributable::seriesFlush_impl(internal::FlushParams const &flushParams); +template void +Attributable::seriesFlush_impl(internal::FlushParams const &flushParams); void Attributable::flushAttributes(internal::FlushParams const &flushParams) { diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index 0e399a3a81..ee15f2a1a0 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -19,8 +19,11 @@ * If not, see . */ #include "openPMD/backend/Writable.hpp" +#include "openPMD/Error.hpp" +#include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/Series.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" +#include namespace openPMD { @@ -42,11 +45,16 @@ Writable::~Writable() IOTask(this, Parameter(parent))); } +template void Writable::seriesFlush(std::string backendConfig) { - seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)}); + seriesFlush( + internal::FlushParams{FlushLevel::UserFlush, std::move(backendConfig)}); } +template void Writable::seriesFlush(std::string backendConfig); +template void Writable::seriesFlush(std::string backendConfig); +template void Writable::seriesFlush(internal::FlushParams const &flushParams) { Attributable impl; @@ -54,13 +62,45 @@ void Writable::seriesFlush(internal::FlushParams const &flushParams) auto [iteration_internal, series_internal] = impl.containingIteration(); if (iteration_internal) { - (*iteration_internal) - ->asInternalCopyOf() - .setDirtyRecursive(true); + (*iteration_internal)->asInternalCopyOf().touch(); } auto series = series_internal->asInternalCopyOf(); - series.flush_impl( - series.iterations.begin(), series.iterations.end(), flushParams); + auto [begin, end] = [&, &iteration_internal_lambda = iteration_internal]() + -> std::pair { + if (!flush_entire_series) + { + if (!iteration_internal_lambda.has_value()) + { + throw std::runtime_error( + "[Writable::seriesFlush()] Requested flushing the " + "containing Iteration, but no Iteration was found?"); + } + auto it = series.iterations.begin(); + auto end_lambda = series.iterations.end(); + for (; it != end_lambda; ++it) + { + if (&it->second.Iteration::get() == *iteration_internal_lambda) + { + auto next = it; + ++next; + return {it, next}; + } + } + throw std::runtime_error( + "[Writable::seriesFlush()] Found a containing Iteration that " + "seems to not be part of the containing Series?? You might try " + "running this with `flushing_entire_series=false` as a " + "workaround, but something is still wrong."); + } + else + { + return {series.iterations.begin(), series.iterations.end()}; + } + }(); + series.flush_impl(begin, end, flushParams); } - +template void +Writable::seriesFlush(internal::FlushParams const &flushParams); +template void +Writable::seriesFlush(internal::FlushParams const &flushParams); } // namespace openPMD diff --git a/src/binding/python/Attributable.cpp b/src/binding/python/Attributable.cpp index 806a57b666..206e3741aa 100644 --- a/src/binding/python/Attributable.cpp +++ b/src/binding/python/Attributable.cpp @@ -517,6 +517,10 @@ void init_Attributable(py::module &m) "series_flush", py::overload_cast(&Attributable::seriesFlush), py::arg("backend_config") = "{}") + .def( + "iteration_flush", + py::overload_cast(&Attributable::iterationFlush), + py::arg("backend_config") = "{}") .def_property_readonly( "attributes", diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index c6d90d773e..58dc0b06a2 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1163,6 +1163,7 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]") Access::CREATE, MPI_COMM_WORLD, "adios2.engine.preferred_flush_target = \"buffer\""); + write.seriesFlush(); int size, rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); @@ -1182,11 +1183,9 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]") * conflict with the default buffer target that will run in the destructor, * unless the flush in the next line really is collective. */ - std::cout << "ENTER" << std::endl; MPI_Barrier(MPI_COMM_WORLD); - iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\""); + iteration.iterationFlush("adios2.engine.preferred_flush_target = \"disk\""); MPI_Barrier(MPI_COMM_WORLD); - std::cout << "LEAVE" << std::endl; } #endif