diff --git a/CMakeLists.txt b/CMakeLists.txt index 19bdc7dc57..ac2d487a99 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -783,7 +783,7 @@ if(openPMD_BUILD_TESTING) macro(additional_testing_sources test_name out_list) if(${test_name} STREQUAL "SerialIO") list(APPEND ${out_list} - test/Files_SerialIO/close_iteration_test.cpp + test/Files_SerialIO/close_and_reopen_test.cpp test/Files_SerialIO/filebased_write_test.cpp ) endif() diff --git a/check_recursive_include.py b/check_recursive_include.py deleted file mode 100755 index e5b37cb6e9..0000000000 --- a/check_recursive_include.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 - -import re -import subprocess -import sys - - -def track_includes(start_file): - remove_start = re.compile("include/") - - def clean(file): - return re.sub(remove_start, "", file) - - unseen_files = [start_file] - res = {} - # import ipdb - # ipdb.set_trace(context=30) - while unseen_files: - current_file = unseen_files[0] - del unseen_files[0] - - cmd = ["grep", "-Rl", clean(current_file), "include/"] - try: - next_files = subprocess.check_output(cmd) - lines = [line for line in next_files.decode().splitlines() if line] - except subprocess.CalledProcessError: - lines = [] - - res[current_file] = lines - for line in lines: - if line not in res: - unseen_files.append(line) - return res - - -if __name__ == "__main__": - - remove_start = re.compile("include/(openPMD/)?") - remove_slash = re.compile("/|\\.") - - def clean(file): - return re.sub(remove_slash, "_", re.sub(remove_start, "", file)) - - res = track_includes(sys.argv[1]) - print("digraph {") - for key, values in res.items(): - key = clean(key) - for target in values: - print("\t{} -> {};".format(key, clean(target))) - print("}") diff --git a/examples/10_streaming_read.cpp b/examples/10_streaming_read.cpp index cef55fd790..eb02f3b393 100644 --- a/examples/10_streaming_read.cpp +++ b/examples/10_streaming_read.cpp @@ -18,6 +18,15 @@ int main() return 0; } + // Access the Series linearly. This means that upon opening the Series, no + // data is accessed yet. Instead, the single Iterations are processed + // collectively, one after the other, and data access only happens upon + // explicitly accessing an Iteration from `Series::snapshots()`. Note that + // the Container API of `Series::snapshots()` will work in a restricted mode + // compared to the `READ_RANDOM_ACCESS` access type, refer also to the + // documentation of the `Snapshots` class in `snapshots/Snapshots.hpp`. This + // restricted workflow enables performance optimizations in the backends, + // and more importantly is compatible with streaming I/O. Series series = Series("electrons.sst", Access::READ_LINEAR, R"( { "adios2": { @@ -29,11 +38,6 @@ int main() } })"); - // `Series::writeIterations()` and `Series::readIterations()` are - // intentionally restricted APIs that ensure a workflow which also works - // in streaming setups, e.g. an iteration cannot be opened again once - // it has been closed. - // `Series::iterations` can be directly accessed in random-access workflows. for (auto &[index, iteration] : series.snapshots()) { std::cout << "Current iteration: " << index << std::endl; diff --git a/examples/10_streaming_write.cpp b/examples/10_streaming_write.cpp index fc609dcf10..d64bee6d79 100644 --- a/examples/10_streaming_write.cpp +++ b/examples/10_streaming_write.cpp @@ -40,11 +40,11 @@ int main() std::shared_ptr local_data( new position_t[length], [](position_t const *ptr) { delete[] ptr; }); - // `Series::writeIterations()` and `Series::readIterations()` are - // intentionally restricted APIs that ensure a workflow which also works - // in streaming setups, e.g. an iteration cannot be opened again once - // it has been closed. - // `Series::iterations` can be directly accessed in random-access workflows. + // Create the Series with synchronous snapshots, i.e. one Iteration after + // the other. The alternative would be random-access where multiple + // Iterations can be accessed independently from one another. This more + // restricted mode enables performance optimizations in the backends, and + // more importantly is compatible with streaming I/O. auto iterations = series.snapshots(SnapshotWorkflow::Synchronous); for (size_t i = 0; i < 100; ++i) { diff --git a/include/openPMD/ReadIterations.hpp b/include/openPMD/ReadIterations.hpp index d393d99706..9a4eff9221 100644 --- a/include/openPMD/ReadIterations.hpp +++ b/include/openPMD/ReadIterations.hpp @@ -38,7 +38,7 @@ class LegacyIteratorAdaptor * This is a feature-restricted subset for the functionality of * `Series::snapshots()`, prefer using that. The compatibility layer is needed * due to the different value_type for `Series::readIterations()`-based - * iteration (`IterationIndex` instead of `std::pair`). + * iteration (`IndexedIteration` instead of `std::pair`). * * Create instance via Series::readIterations(). * For use in a C++11-style foreach loop over iterations. diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 176b8cbe4f..c387c803f7 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -708,7 +708,8 @@ class Series : public Attributable * * Conversely, the Access::CREATE and Access::APPEND access modes both * resolve to random-access by default, but can be specified to use - * Synchronous workflow if needed. + * Synchronous workflow if needed. A shorthand for Synchronous workflows can + * be found with Series::writeIterations(). * * @param snapshot_workflow Specify the intended workflow * in Access::CREATE and Access::APPEND. Leave unspecified in @@ -736,17 +737,9 @@ class Series : public Attributable /** * @brief Entry point to the writing end of the streaming API. * - * Creates and returns an instance of the WriteIterations class which is an - * intentionally restricted container of iterations that takes care of - * streaming semantics, e.g. ensuring that an iteration cannot be reopened - * once closed. - * For a less restrictive API in non-streaming situations, - * `Series::iterations` can be accessed directly. - * The created object is stored as member of the Series object, hence this - * method may be called as many times as a user wishes. - * There is only one shared iterator state per Series, even when calling - * this method twice. - * Look for the WriteIterations class for further documentation. + * Shorthand for `Series::snapshots()` for access types CREATE and APPEND + * called with parameter SnapshotWorkflow::Synchronous, i.e. for + * streaming-aware data producers. * * @return WriteIterations */ diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 902ce7e17c..931a1f9e3d 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -145,7 +145,13 @@ Iteration &Iteration::open() auto &it = get(); // figure out my iteration number auto begin = s.indexOf(*this); - // ensure that files are accessed + if (it.m_closed == internal::CloseStatus::ClosedInFrontend) + { + // Iteration is only logically closed, we can simply unmark it + it.m_closed = internal::CloseStatus::Open; + } + // Ensure that files are accessed. + // If the close status was Closed, this will open it. s.openIteration(begin->first, *this); if (it.m_closed == CloseStatus::ParseAccessDeferred) { diff --git a/src/Series.cpp b/src/Series.cpp index adc87971f3..cfd92f84e7 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -2705,9 +2705,27 @@ void Series::openIteration(IterationIndex_t index, Iteration &iteration) { auto oldStatus = iteration.get().m_closed; using CL = internal::CloseStatus; + /* + * Closed and ClosedInFrontend need to be treated different here. + * Closed means that the Iteration is actually closed, but we need it again, + * so it should be opened again. + * ClosedInFrontend means that the Iteration is about to be closed, but + * still open. Nothing needs to be done, the enqueued operations can be + * performed, and the Iteration will be closed afterwards. + */ switch (oldStatus) { case CL::Closed: + if (access::writeOnly(IOHandler()->m_frontendAccess)) + { + std::cerr << &R"( +[Series::openIteration] + Warning: Reopening closed Iterations in write modes is currently experimental. + Note that an ADIOS2 step/file cannot be modified once closed, just appended + to with a new step. Support for this is not yet feature-complete (pre-alpha). +)"[1]; + } + [[fallthrough]]; case CL::Open: iteration.get().m_closed = CL::Open; break; @@ -3188,14 +3206,15 @@ void Series::parseBase() WriteIterations Series::writeIterations() { - auto &series = get(); - if (series.m_deferred_initialization.has_value()) + auto const access = IOHandler()->m_frontendAccess; + if (access != Access::CREATE && access != Access::APPEND) { - runDeferredInitialization(); + throw error::WrongAPIUsage( + "[Series::writeIterations()] May only be applied for access modes " + "CREATE or APPEND. Use Series::snapshots() for random-access-type " + "or for read-type workflows."); } - auto begin = make_writing_stateful_iterator(*this, series); - return Snapshots(std::shared_ptr( - new StatefulSnapshotsContainer(std::move(begin)))); + return snapshots(SnapshotWorkflow::Synchronous); } void Series::close() diff --git a/test/Files_SerialIO/close_iteration_test.cpp b/test/Files_SerialIO/close_and_reopen_test.cpp similarity index 100% rename from test/Files_SerialIO/close_iteration_test.cpp rename to test/Files_SerialIO/close_and_reopen_test.cpp diff --git a/test/Files_SerialIO/filebased_write_test.cpp b/test/Files_SerialIO/filebased_write_test.cpp index ba2780a71e..c7b200b70e 100644 --- a/test/Files_SerialIO/filebased_write_test.cpp +++ b/test/Files_SerialIO/filebased_write_test.cpp @@ -5,6 +5,19 @@ namespace filebased_write_test { using namespace openPMD; +#define OPENPMD_TEST_VERBOSE 0 + +namespace +{ + template + auto write_to_stdout([[maybe_unused]] Args &&...args) -> void + { +#if OPENPMD_TEST_VERBOSE + (std::cout << ... << args); +#endif + } +} // namespace + auto close_and_reopen_iterations( const std::string &filename, openPMD::Access access, @@ -18,32 +31,32 @@ auto close_and_reopen_iterations( auto chunk = component.loadChunkVariant(); iteration.seriesFlush(); auto num_particles = component.getExtent()[0]; - std::cout << "Particles: "; + write_to_stdout("Particles: "); if (num_particles > 0) { std::visit( [&](auto const &shared_ptr) { auto it = shared_ptr.get(); auto end = it + num_particles; - std::cout << '[' << *it++; + write_to_stdout('[', *it++); for (; it != end; ++it) { - std::cout << ", " << *it; + write_to_stdout(", ", *it); } }, chunk); - std::cout << "]"; + write_to_stdout("]"); } else { - std::cout << "[]"; + write_to_stdout("[]"); } - std::cout << std::endl; + write_to_stdout('\n'); }; for (auto &[idx, iteration] : list.snapshots()) { - std::cout << "Seeing iteration " << idx << std::endl; + write_to_stdout("Seeing iteration ", idx, '\n'); if (need_to_explitly_open_iterations) { iteration.open(); @@ -52,20 +65,20 @@ auto close_and_reopen_iterations( { test_read(iteration); } - std::cout << "Closing iteration " << idx << std::endl; + write_to_stdout("Closing iteration ", idx, '\n'); iteration.close(); } - std::cout << "Trying to read iteration 3 out of line" << std::endl; + write_to_stdout("Trying to read iteration 3 out of line", '\n'); if (need_to_explitly_open_iterations || access == Access::READ_ONLY) { list.snapshots()[3].open(); } test_read(list.snapshots()[3]); - std::cout << "----------\nGoing again\n----------" << std::endl; + write_to_stdout("----------\nGoing again\n----------", '\n'); for (auto &[idx, iteration] : list.snapshots()) { - std::cout << "Seeing iteration " << idx << std::endl; + write_to_stdout("Seeing iteration ", idx, '\n'); if (need_to_explitly_open_iterations || access == Access::READ_ONLY) { iteration.open();