diff --git a/CHANGES.md b/CHANGES.md index 3a90b16b..9969f5af 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,18 @@ # Release notes See also the -[unreleased changes](https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.1...branch%2Fdefault). +[unreleased changes](https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.2...branch%2Fdefault). + +## [0.5.2] (2024-05-??) + +- New command `fluidimage-mean` to compute mean of images with the new topology + {class}`fluidimage.topologies.mean.TopologyMeanImage`. + +- New executors `multi_exec_sync` ({mod}`fluidimage.executors.multi_exec_sync`) and + `multi_exec_subproc_sync` ({mod}`fluidimage.executors.multi_exec_subproc_sync`). + +- [!108](https://foss.heptapod.net/fluiddyn/fluidimage/-/merge_requests/108): save UVmat + error codes in PIV files. ## [0.5.1] (2024-05-06) @@ -217,3 +228,4 @@ This version contains incompatible API changes documented here. [0.4.6]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.4.5...0.4.6 [0.5.0]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.4.6...0.5.0 [0.5.1]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.0...0.5.1 +[0.5.2]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.1...0.5.2 diff --git a/bench/profile_mean.py b/bench/profile_mean.py new file mode 100644 index 00000000..78f203bf --- /dev/null +++ b/bench/profile_mean.py @@ -0,0 +1,38 @@ +from pathlib import Path +from shutil import rmtree + +from pyinstrument import Profiler +from pyinstrument.renderers import ConsoleRenderer + +from fluidimage.topologies.mean import Topology + +path_images = Path("/fsnet/project/watu/2022/22INTERNAL_F/DATA/EXP44/PCO_50mm") +path_images = Path("/data/PCO_50mm") +rmtree(path_images.parent / "PCO_50mm.mean", ignore_errors=True) + +params = Topology.create_default_params() +params.images.path = str(path_images / "im*.png") +params.images.str_subset = ":100" + +topology = Topology(params) +executor = "exec_sequential" +executor = "exec_async_sequential" +# executor = "multi_exec_sync" +executor = "multi_exec_async" + +profiler = Profiler() +profiler.start() +topology.compute(executor=executor, nb_max_workers=4) +profiler.stop() + +print( + profiler.output( + renderer=ConsoleRenderer( + unicode=True, + color=True, + show_all=False, + # time="percent_of_total", + # flat=True, # buggy with pyinstrument 4.6.2! + ) + ) +) diff --git a/doc/commands.md b/doc/commands.md new file mode 100644 index 00000000..9b3edbef --- /dev/null +++ b/doc/commands.md @@ -0,0 +1,8 @@ +# Commands + +Fluidimage provides few useful commands: + +- `fluidimage-monitor`: monitor Fluidimage computations. +- `fluidimviewer`: visualize images. +- `fluidpivviewer`: visualize PIV fields. +- `fluidimage-mean`: compute the mean of images. diff --git a/doc/examples/pivchallenge/possible_paths.txt b/doc/examples/pivchallenge/possible_paths.txt index 03378030..12513177 100644 --- a/doc/examples/pivchallenge/possible_paths.txt +++ b/doc/examples/pivchallenge/possible_paths.txt @@ -1,4 +1,5 @@ /data/pivchallenge +/data1/pivchallenge /fsnet/project/meige/2016/16FLUIDIMAGE/samples/pivchallenge /storage2/pivchallenge ~/pivchallenge diff --git a/doc/examples/pivchallenge/profile_mean.py b/doc/examples/pivchallenge/profile_mean.py new file mode 100644 index 00000000..ade4150b --- /dev/null +++ b/doc/examples/pivchallenge/profile_mean.py @@ -0,0 +1,36 @@ +from shutil import rmtree + +from path_images import get_path +from pyinstrument import Profiler +from pyinstrument.renderers import ConsoleRenderer + +from fluidimage.topologies.mean import Topology + +path_images = get_path("2005C") +rmtree(path_images.parent / "Images.mean", ignore_errors=True) + +params = Topology.create_default_params() +params.images.path = str(path_images / "c*.bmp") + +topology = Topology(params) +executor = "exec_sequential" +# executor = "exec_async_sequential" +executor = "multi_exec_sync" +# executor = "multi_exec_async" + +profiler = Profiler() +profiler.start() +topology.compute(executor=executor, nb_max_workers=2) +profiler.stop() + +print( + profiler.output( + renderer=ConsoleRenderer( + unicode=True, + color=True, + show_all=False, + # time="percent_of_total", + # flat=True, # buggy with pyinstrument 4.6.2! + ) + ) +) diff --git a/doc/index.md b/doc/index.md index 4052d277..11c64a2e 100644 --- a/doc/index.md +++ b/doc/index.md @@ -14,6 +14,7 @@ overview install tutorial examples +commands build-from-source ``` diff --git a/pyproject.toml b/pyproject.toml index 6d0a1dd9..72794db4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = 'mesonpy' [project] name = "fluidimage" -version = "0.5.1" +version = "0.5.2" description = "Fluid image processing with Python." authors = [ {name = "Pierre Augier", email = "pierre.augier@legi.cnrs.fr"}, @@ -58,6 +58,7 @@ fluidimage-monitor = "fluidimage.gui.monitor:main" fluidimviewer = "fluidimage.gui.imviewer:main" fluidimviewer-pg = "fluidimage.gui.pg_main:main" fluidpivviewer = "fluidimage.gui.piv_viewer:main" +fluidimage-mean = "fluidimage.topologies.mean:main" [project.optional-dependencies] pims = ["pims"] @@ -72,8 +73,11 @@ exec_sequential = "fluidimage.executors.exec_sequential" exec_async = "fluidimage.executors.exec_async" exec_async_sequential = "fluidimage.executors.exec_async_sequential" exec_async_seq_for_multi = "fluidimage.executors.exec_async_seq_for_multi" +exec_seq_for_multi = "fluidimage.executors.exec_seq_for_multi" multi_exec_async = "fluidimage.executors.multi_exec_async" +multi_exec_sync = "fluidimage.executors.multi_exec_sync" multi_exec_subproc = "fluidimage.executors.multi_exec_subproc" +multi_exec_subproc_sync = "fluidimage.executors.multi_exec_subproc_sync" exec_async_multi = "fluidimage.executors.exec_async_multiproc" exec_async_servers = "fluidimage.executors.exec_async_servers" exec_async_servers_threading = "fluidimage.executors.exec_async_servers_threading" diff --git a/src/fluidimage/executors/__init__.py b/src/fluidimage/executors/__init__.py index e5b288ce..be613207 100644 --- a/src/fluidimage/executors/__init__.py +++ b/src/fluidimage/executors/__init__.py @@ -19,8 +19,11 @@ exec_async exec_async_sequential multi_exec_async + multi_exec_sync multi_exec_subproc + multi_exec_subproc_sync exec_async_seq_for_multi + exec_seq_for_multi exec_async_multiproc exec_async_servers servers @@ -58,8 +61,9 @@ def afterfork(): # because the OS does not support forks (or not fully) and # multiprocessing works differently than on Linux # see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods -supported_multi_executors = ["multi_exec_subproc"] +supported_multi_executors = ["multi_exec_subproc", "multi_exec_subproc_sync"] if sys.platform == "linux": + supported_multi_executors.insert(0, "multi_exec_sync") supported_multi_executors.insert(0, "multi_exec_async") diff --git a/src/fluidimage/executors/base.py b/src/fluidimage/executors/base.py index 9490a86a..02d84a25 100644 --- a/src/fluidimage/executors/base.py +++ b/src/fluidimage/executors/base.py @@ -75,6 +75,12 @@ class ExecutorBase(ABC): _path_lockfile: Path num_expected_results: int time_start: str + _final_seq_work_run: bool + + # for results log + _path_results: Path + _path_num_results: Path + _len_saved_results: int def _init_log_path(self): self.time_start_str = time_as_str() @@ -202,6 +208,7 @@ def _get_file_object_for_logger(self): return sys.stdout def _init_compute(self): + self._final_seq_work_run = False self.t_start = time() self._init_num_expected_results() if self.num_expected_results == 0: @@ -218,7 +225,7 @@ def _init_compute_log(self): print(" executor:", executor_name) print(" nb_cpus_allowed =", nb_cores) print(" nb_max_workers =", self.nb_max_workers) - print(" num_expected_results", self.num_expected_results) + print(" num_expected_results =", self.num_expected_results) print(" path_dir_result =", self.path_dir_result) print( "Monitoring app can be launched with:\n" @@ -276,7 +283,16 @@ def _reset_std_as_default(self): reset_logger() self._log_file.close() + def _run_final_seq_work(self): + if ( + hasattr(self.topology, "final_seq_work") + and not self._final_seq_work_run + ): + self.topology.final_seq_work() + self._final_seq_work_run = True + def _finalize_compute(self): + self._run_final_seq_work() log_memory_usage(time_as_str(2) + ": end of `compute`. mem usage") self.topology.print_at_exit(time() - self.t_start) self._reset_std_as_default() @@ -361,6 +377,42 @@ def _init_num_expected_results_first_queue(self): self._keys_first_queue = list(self._first_queue.keys()) self.num_expected_results = len(self._keys_first_queue) + def _init_results_log(self, path_job_data): + + if hasattr(self, "index_process"): + str_index_process = f"_{self.index_process:03}" + else: + str_index_process = "" + + self._path_results = path_job_data / f"results{str_index_process}.txt" + self._path_num_results = ( + self._path_results.parent / f"len_results{str_index_process}.txt" + ) + self._len_saved_results = 0 + + def _save_results_names(self): + + new_results = self.topology.results[self._len_saved_results :] + self._len_saved_results = len(self.topology.results) + + with open(self._path_num_results, "w", encoding="utf-8") as file: + file.write(f"{self._len_saved_results}\n") + + if new_results: + if isinstance(new_results[0], str): + new_results = [Path(path).name for path in new_results] + elif hasattr(new_results[0], "name"): + new_results = [_r.name for _r in new_results] + else: + new_results = [str(_r) for _r in new_results] + new_results = "\n".join(new_results) + "\n" + + with open(self._path_results, "a", encoding="utf-8") as file: + file.write(new_results) + + if not self._log_file.closed: + self._log_file.flush() + class MultiExecutorBase(ExecutorBase): """Manage the multi-executor mode @@ -454,16 +506,6 @@ def handler_signals(signal_number, stack): self._start_processes() self.nb_processes = len(self.processes) - self._wait_for_all_processes() - self._finalize_compute() - - def _poll_return_code(self, process): - return process.poll() - - def _join_processes(self): - """Join the processes""" - - def _wait_for_all_processes(self): running_processes = { idx: process for idx, process in enumerate(self.processes) @@ -523,13 +565,26 @@ def _wait_for_all_processes(self): ) running_processes_updated.clear() - self._join_processes() + self._join_processes() + + self.topology.results = results = [] + for path in self.path_job_data.glob("results_*.txt"): + with open(path, encoding="utf-8") as file: + results.extend(line.strip() for line in file.readlines()) + + self._run_final_seq_work() + + progress.update(progress_task, completed=len(self.topology.results)) + + self._finalize_compute() + + def _poll_return_code(self, process): + return process.poll() + + def _join_processes(self): + """Join the processes""" def _finalize_compute(self): - self.topology.results = results = [] - for path in self.path_job_data.glob("results_*.txt"): - with open(path, encoding="utf-8") as file: - results.extend(line.strip() for line in file.readlines()) if self.errors: text_error = ( diff --git a/src/fluidimage/executors/exec_async.py b/src/fluidimage/executors/exec_async.py index e7bfd088..2de8281f 100644 --- a/src/fluidimage/executors/exec_async.py +++ b/src/fluidimage/executors/exec_async.py @@ -119,6 +119,11 @@ def define_functions(self): # global functions if work.kind is not None and "global" in work.kind: + try: + work.func_or_cls.__self__.executor = self + except AttributeError: + pass + async def func(work=work): while True: while ( diff --git a/src/fluidimage/executors/exec_async_seq_for_multi.py b/src/fluidimage/executors/exec_async_seq_for_multi.py index 39450f35..cc2704c5 100644 --- a/src/fluidimage/executors/exec_async_seq_for_multi.py +++ b/src/fluidimage/executors/exec_async_seq_for_multi.py @@ -38,6 +38,7 @@ def __init__( ) self._log_path = path_log + topology.executor = self super().__init__( topology, path_dir_result, @@ -59,17 +60,10 @@ def __init__( self._save_topology_results ) path_log_dir = Path(self._log_path).parent - path_job_data = path_log_dir.with_name("job" + path_log_dir.name[3:]) - self._path_results = ( - path_job_data / f"results_{self.index_process:03}.txt" + self.path_job_data = path_log_dir.with_name( + "job" + path_log_dir.name[3:] ) - - self._path_num_results = ( - self._path_results.parent - / f"len_results_{self.index_process:03}.txt" - ) - - self._len_saved_results = 0 + self._init_results_log(self.path_job_data) sys.stdout = self._log_file @@ -100,29 +94,6 @@ def _finalize_compute(self): if hasattr(self.topology, "results"): self._save_results_names() - def _save_results_names(self): - - new_results = self.topology.results[self._len_saved_results :] - self._len_saved_results = len(self.topology.results) - - with open(self._path_num_results, "w", encoding="utf-8") as file: - file.write(f"{self._len_saved_results}\n") - - if new_results: - if isinstance(new_results[0], str): - new_results = [Path(path).name for path in new_results] - elif hasattr(new_results[0], "name"): - new_results = [_r.name for _r in new_results] - else: - new_results = [str(_r) for _r in new_results] - new_results = "\n".join(new_results) + "\n" - - with open(self._path_results, "a", encoding="utf-8") as file: - file.write(new_results) - - if not self._log_file.closed: - self._log_file.flush() - async def _save_topology_results(self): while not self._has_to_stop: self._save_results_names() diff --git a/src/fluidimage/executors/exec_async_servers.py b/src/fluidimage/executors/exec_async_servers.py index 22656e86..a5cbb78f 100644 --- a/src/fluidimage/executors/exec_async_servers.py +++ b/src/fluidimage/executors/exec_async_servers.py @@ -9,13 +9,11 @@ """ -import os import signal import numpy as np import trio -from fluiddyn import time_as_str from fluidimage.util import log_debug, logger from .exec_async import ExecutorAsync diff --git a/src/fluidimage/executors/exec_seq_for_multi.py b/src/fluidimage/executors/exec_seq_for_multi.py new file mode 100644 index 00000000..16f777b9 --- /dev/null +++ b/src/fluidimage/executors/exec_seq_for_multi.py @@ -0,0 +1,17 @@ +"""Sequential executor for multi executor + +.. autoclass:: ExecutorSeqForMulti + :members: + :private-members: + +""" + +from fluidimage.executors.exec_async_seq_for_multi import ExecutorAsyncSeqForMulti +from fluidimage.executors.exec_sequential import ExecutorSequential + + +class ExecutorSeqForMulti(ExecutorSequential, ExecutorAsyncSeqForMulti): + """Sequential executor modified for multi executors""" + + +Executor = ExecutorSeqForMulti diff --git a/src/fluidimage/executors/exec_sequential.py b/src/fluidimage/executors/exec_sequential.py index 306f0252..d036b58f 100644 --- a/src/fluidimage/executors/exec_sequential.py +++ b/src/fluidimage/executors/exec_sequential.py @@ -21,13 +21,18 @@ def compute(self): self.exec_one_shot_works() self._run_works() self._finalize_compute() + if hasattr(self.topology, "results"): + self._save_results_names() def _run_works(self): while not all([len(queue) == 0 for queue in self.topology.queues]): for work in self.works: # global functions if work.kind is not None and "global" in work.kind: - if len(work.output_queue) > self.nb_items_queue_max: + if ( + work.output_queue is not None + and len(work.output_queue) > self.nb_items_queue_max + ): continue work.func_or_cls(work.input_queue, work.output_queue) @@ -67,5 +72,17 @@ def _run_works(self): if work.output_queue is not None: work.output_queue[key] = ret + if hasattr(self.topology, "results"): + self._save_results_names() + + def _init_compute_log(self): + self.nb_max_workers = 1 + super()._init_compute_log() + + def _save_job_data(self): + super()._save_job_data() + if hasattr(self.topology, "results"): + self._init_results_log(self.path_job_data) + Executor = ExecutorSequential diff --git a/src/fluidimage/executors/meson.build b/src/fluidimage/executors/meson.build index 73bc317a..cbe267f1 100644 --- a/src/fluidimage/executors/meson.build +++ b/src/fluidimage/executors/meson.build @@ -6,11 +6,14 @@ python_sources = [ 'exec_async_multiproc.py', 'exec_async_sequential.py', 'exec_async_seq_for_multi.py', + 'exec_seq_for_multi.py', 'exec_async_servers.py', 'exec_async_servers_threading.py', 'exec_sequential.py', 'multi_exec_async.py', + 'multi_exec_sync.py', 'multi_exec_subproc.py', + 'multi_exec_subproc_sync.py', 'servers.py', ] diff --git a/src/fluidimage/executors/multi_exec_async.py b/src/fluidimage/executors/multi_exec_async.py index c8cd28ba..027fd756 100644 --- a/src/fluidimage/executors/multi_exec_async.py +++ b/src/fluidimage/executors/multi_exec_async.py @@ -6,23 +6,17 @@ :members: :private-members: -.. autoclass:: ExecutorAsyncForMulti - :members: - :private-members: - """ import copy from multiprocessing import Process +from fluidimage.topologies.splitters import split_list + from .base import MultiExecutorBase from .exec_async_seq_for_multi import ExecutorAsyncSeqForMulti -class ExecutorAsyncForMulti(ExecutorAsyncSeqForMulti): - """Slightly modified ExecutorAsync""" - - class MultiExecutorAsync(MultiExecutorBase): """Manage the multi-executor mode @@ -48,6 +42,8 @@ class MultiExecutorAsync(MultiExecutorBase): """ + ExecutorForMulti = ExecutorAsyncSeqForMulti + def _start_processes(self): """ There are two ways to split self.topology work: @@ -72,16 +68,7 @@ def _start_processes(self): def _start_multiprocess_first_queue(self): """Start the processes spitting the work with the first queue""" - nb_keys_per_process = max( - 1, int(len(self._keys_first_queue) / self.nb_processes) - ) - - keys_for_processes = [] - for iproc in range(self.nb_processes): - istart = iproc * nb_keys_per_process - keys_for_processes.append( - self._keys_first_queue[istart : istart + nb_keys_per_process] - ) + keys_for_processes = split_list(self._keys_first_queue, self.nb_processes) # change topology self.topology.first_queue = self.topology.works[0].output_queue @@ -172,7 +159,7 @@ def _start_multiprocess_series(self): def init_and_compute(self, topology_this_process, log_path, idx_process): """Create an executor and start it in a process""" - executor = ExecutorAsyncForMulti( + executor = self.ExecutorForMulti( topology_this_process, self.path_dir_result, sleep_time=self.sleep_time, diff --git a/src/fluidimage/executors/multi_exec_subproc.py b/src/fluidimage/executors/multi_exec_subproc.py index d8836f12..75ad0133 100644 --- a/src/fluidimage/executors/multi_exec_subproc.py +++ b/src/fluidimage/executors/multi_exec_subproc.py @@ -22,6 +22,7 @@ class MultiExecutorSubproc(MultiExecutorBase): """Multi executor based on subprocesses and splitters""" splitter: Splitter + executor_for_multi = "exec_async_seq_for_multi" def _init_num_expected_results(self): @@ -29,7 +30,7 @@ def _init_num_expected_results(self): splitter_cls = self.topology.Splitter except AttributeError as error: raise ValueError( - "MultiExecutorSubproc can only execute " + f"{type(self).__name__} can only execute " "topologies with a Splitter." ) from error @@ -39,12 +40,12 @@ def _init_num_expected_results(self): params._set_child( "compute_kwargs", attribs={ - "executor": "exec_async_seq_for_multi", + "executor": self.executor_for_multi, "nb_max_workers": 1, }, ) except ValueError: - params.compute_kwargs.executor = "exec_async_seq_for_multi" + params.compute_kwargs.executor = self.executor_for_multi params.compute_kwargs.nb_max_workers = 1 try: diff --git a/src/fluidimage/executors/multi_exec_subproc_sync.py b/src/fluidimage/executors/multi_exec_subproc_sync.py new file mode 100644 index 00000000..041fd6a5 --- /dev/null +++ b/src/fluidimage/executors/multi_exec_subproc_sync.py @@ -0,0 +1,16 @@ +"""Multi executor based on subprocesses using exec_seq_for_multi + +.. autoclass:: MultiExecutorSubprocSync + :members: + :private-members: + +""" + +from fluidimage.executors.multi_exec_subproc import MultiExecutorSubproc + + +class MultiExecutorSubprocSync(MultiExecutorSubproc): + executor_for_multi = "exec_seq_for_multi" + + +Executor = MultiExecutorSubprocSync diff --git a/src/fluidimage/executors/multi_exec_sync.py b/src/fluidimage/executors/multi_exec_sync.py new file mode 100644 index 00000000..4c402822 --- /dev/null +++ b/src/fluidimage/executors/multi_exec_sync.py @@ -0,0 +1,19 @@ +""" +Multi executors sync +===================== + +.. autoclass:: MultiExecutorSync + :members: + :private-members: + +""" + +from fluidimage.executors.exec_seq_for_multi import ExecutorSeqForMulti +from fluidimage.executors.multi_exec_async import MultiExecutorAsync + + +class MultiExecutorSync(MultiExecutorAsync): + ExecutorForMulti = ExecutorSeqForMulti + + +Executor = MultiExecutorSync diff --git a/src/fluidimage/executors/servers.py b/src/fluidimage/executors/servers.py index bb279ec8..637cc134 100644 --- a/src/fluidimage/executors/servers.py +++ b/src/fluidimage/executors/servers.py @@ -177,6 +177,15 @@ def signal_handler(sig, frame): self.conn = conn self.event_has_to_stop = event_has_to_stop + + try: + params.saving.how + except AttributeError: + pass + else: + if params.saving.how == "ask": + params.saving.how = "complete" + self.topology = topology_cls(params) self._log_file = open(log_path, "w", encoding="utf-8") diff --git a/src/fluidimage/gui/monitor.py b/src/fluidimage/gui/monitor.py index 4d757440..301eaf9b 100644 --- a/src/fluidimage/gui/monitor.py +++ b/src/fluidimage/gui/monitor.py @@ -153,7 +153,7 @@ def __init__(self, args): copy_doc(self.params, params_default) self.paths_len_results = sorted( - self.path_job_info.glob("len_results_*.txt") + self.path_job_info.glob("len_results*.txt") ) assert self.paths_len_results diff --git a/src/fluidimage/test_run_from_xml.py b/src/fluidimage/test_run_from_xml.py index bb3663c2..14947f85 100644 --- a/src/fluidimage/test_run_from_xml.py +++ b/src/fluidimage/test_run_from_xml.py @@ -34,7 +34,7 @@ def test_uvmat(tmp_path, monkeypatch, name): action = main() path_results = tmp_path / "Images.civ" - assert action.params.saving.path == str(path_results) + assert str(action.params.saving.path) == str(path_results) paths_piv = sorted(p.name for p in path_results.glob("piv*.h5")) diff --git a/src/fluidimage/topologies/__init__.py b/src/fluidimage/topologies/__init__.py index 7a8c736b..8efa858d 100644 --- a/src/fluidimage/topologies/__init__.py +++ b/src/fluidimage/topologies/__init__.py @@ -18,6 +18,7 @@ piv bos preproc + mean image2image surface_tracking optical_flow @@ -34,70 +35,7 @@ """ -import os -import sys -from pathlib import Path - -from fluiddyn.io.query import query - from .base import TopologyBase, TopologyBaseFromSeries from .log import LogTopology __all__ = ["LogTopology", "TopologyBase"] - -how_values = ("ask", "new_dir", "complete", "recompute", "from_path_indices") - - -def prepare_path_dir_result( - path_dir_input, path_saving, postfix_saving, how_saving -): - """Makes new directory for results, if required, and returns its path.""" - - if how_saving not in how_values: - raise ValueError( - f"how_saving (here equal to '{how_saving}') " - f"should be in {how_values}" - ) - - path_dir_input = str(path_dir_input) - - if path_saving is not None: - path_dir_result = path_saving - else: - path_dir_result = path_dir_input + "." + postfix_saving - - how = how_saving - if os.path.exists(path_dir_result): - if how == "ask": - answer = query( - f"The directory {path_dir_result} " - + "already exists. What do you want to do?\n" - "New dir, Complete, Recompute or Stop?\n" - ) - - while answer.lower() not in ["n", "c", "r", "s"]: - answer = query( - "The answer should be in ['n', 'c', 'r', 's']\n" - "Please type your answer again...\n" - ) - - if answer == "s": - print("Stopped by the user.") - sys.exit() - - elif answer == "n": - how = "new_dir" - elif answer == "c": - how = "complete" - elif answer == "r": - how = "recompute" - - if how == "new_dir": - i = 0 - while os.path.exists(path_dir_result + str(i)): - i += 1 - path_dir_result += str(i) - - path_dir_result = Path(path_dir_result) - path_dir_result.mkdir(exist_ok=True) - return path_dir_result, how diff --git a/src/fluidimage/topologies/base.py b/src/fluidimage/topologies/base.py index 2754ae2f..5e235486 100644 --- a/src/fluidimage/topologies/base.py +++ b/src/fluidimage/topologies/base.py @@ -24,12 +24,15 @@ """ import json +import os +import sys from abc import ABC, abstractmethod from collections import OrderedDict from pathlib import Path from typing import Sequence, Union from warnings import warn +from fluiddyn.io.query import query from fluidimage import ParamContainer, SerieOfArraysFromFiles, SeriesOfArrays from fluidimage.util import DEBUG, cstring, logger @@ -40,6 +43,66 @@ supported_multi_executors, ) +how_values = ("ask", "new_dir", "complete", "recompute", "from_path_indices") + + +def prepare_path_dir_result( + path_dir_input, path_saving, postfix_saving, how_saving +): + """Makes new directory for results, if required, and returns its path.""" + + if how_saving not in how_values: + raise ValueError( + f"how_saving (here equal to '{how_saving}') " + f"should be in {how_values}" + ) + + path_dir_input = str(path_dir_input) + + if path_saving is not None: + path_dir_result = path_saving + else: + path_dir_result = path_dir_input + "." + postfix_saving + + how = how_saving + if not os.path.exists(path_dir_result): + if how == "ask": + how = "recompute" + else: + if how == "ask": + answer = query( + f"The directory {path_dir_result} " + + "already exists. What do you want to do?\n" + "New dir, Complete, Recompute or Stop?\n" + ) + + while answer.lower() not in ["n", "c", "r", "s"]: + answer = query( + "The answer should be in ['n', 'c', 'r', 's']\n" + "Please type your answer again...\n" + ) + + if answer == "s": + print("Stopped by the user.") + sys.exit() + + elif answer == "n": + how = "new_dir" + elif answer == "c": + how = "complete" + elif answer == "r": + how = "recompute" + + if how == "new_dir": + i = 0 + while os.path.exists(path_dir_result + str(i)): + i += 1 + path_dir_result += str(i) + + path_dir_result = Path(path_dir_result) + path_dir_result.mkdir(exist_ok=True) + return path_dir_result, how + class Work: """Represent a work @@ -146,6 +209,13 @@ def is_name_in_values(self, image_name): return False +class QueueList(list): + + def __init__(self, name): + self.name = name + super().__init__() + + class TopologyBase: """Base class for topologies of processing. @@ -199,9 +269,19 @@ def _add_default_params_saving(cls, params): ) def __init__( - self, path_dir_result=None, logging_level="info", nb_max_workers=None + self, + params=None, + path_dir_src=None, + path_dir_result=None, + logging_level="info", + nb_max_workers=None, ): - self.path_dir_result = path_dir_result + self.params = params + self.path_dir_src = Path(path_dir_src) + if path_dir_result is None: + self._init_path_dir_result(path_dir_src) + else: + self.path_dir_result = path_dir_result self.logging_level = logging_level self.nb_max_workers = nb_max_workers @@ -210,9 +290,19 @@ def __init__( self.works_dict = {} self.executor = None + def _init_path_dir_result(self, path_dir_src): + p_saving = self.params.saving + self.path_dir_result, self.how_saving = prepare_path_dir_result( + path_dir_src, p_saving.path, p_saving.postfix, p_saving.how + ) + p_saving.path = self.path_dir_result + def add_queue(self, name: str, kind: str = None): """Create a new queue.""" - queue = Queue(name=name, kind=kind) + if kind == "list": + queue = QueueList(name) + else: + queue = Queue(name=name, kind=kind) self.queues.append(queue) return queue diff --git a/src/fluidimage/topologies/bos.py b/src/fluidimage/topologies/bos.py index 937b250e..6caacba6 100644 --- a/src/fluidimage/topologies/bos.py +++ b/src/fluidimage/topologies/bos.py @@ -12,7 +12,6 @@ from fluidimage import ParamContainer from fluidimage.data_objects.piv import get_name_bos -from fluidimage.topologies import prepare_path_dir_result from fluidimage.topologies.base import TopologyBaseFromImages from fluidimage.topologies.splitters import SplitterFromImages from fluidimage.util import imread @@ -94,22 +93,13 @@ def create_default_params(cls): return params def __init__(self, params, logging_level="info", nb_max_workers=None): - self.params = params - self.main_work = WorkBOS(params) self.serie = self.main_work.serie self.path_reference = self.main_work.path_reference - path_dir = Path(self.serie.path_dir) - path_dir_result, self.how_saving = prepare_path_dir_result( - path_dir, params.saving.path, params.saving.postfix, params.saving.how - ) - - self.path_dir_result = path_dir_result - self.path_dir_src = Path(path_dir) - super().__init__( - path_dir_result=path_dir_result, + params=params, + path_dir_src=self.serie.path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) diff --git a/src/fluidimage/topologies/example.py b/src/fluidimage/topologies/example.py index d00202ad..9fe6b3d5 100644 --- a/src/fluidimage/topologies/example.py +++ b/src/fluidimage/topologies/example.py @@ -92,23 +92,21 @@ def create_default_params(cls): return params def __init__(self, params, logging_level="info", nb_max_workers=None): - self.params = params - path_input = params["path_input"] path_dir_result = params["path_dir_result"] nloops = params["nloops"] self.multiplicator_nb_images = params["multiplicator_nb_images"] - self.path_input = path_input - super().__init__( + params=params, + path_dir_src=params["path_input"], path_dir_result=path_dir_result, logging_level=logging_level, nb_max_workers=nb_max_workers, ) - if not self.path_dir_result.exists(): - self.path_dir_result.mkdir() + if not path_dir_result.exists(): + path_dir_result.mkdir() self.img_counter = 0 @@ -167,8 +165,9 @@ def __init__(self, params, logging_level="info", nb_max_workers=None): ) def fill_names(self, input_queue, output_queue): + del input_queue for ind in range(self.multiplicator_nb_images): - for name in sorted(os.listdir(self.path_input)): + for name in sorted(os.listdir(self.path_dir_src)): key = name.split(".bmp")[0] + f"_{ind:02}" output_queue[key] = name @@ -180,7 +179,7 @@ def read_array(self, name): if name == "Karman_03.bmp": raise ValueError("For testing") - image = imread(self.path_input / name) + image = imread(self.path_dir_src / name) return image def fill_couples_arrays(self, input_queues, output_queue): diff --git a/src/fluidimage/topologies/image2image.py b/src/fluidimage/topologies/image2image.py index a18fd5b0..9837ffeb 100644 --- a/src/fluidimage/topologies/image2image.py +++ b/src/fluidimage/topologies/image2image.py @@ -12,7 +12,6 @@ from fluiddyn.io.image import imsave from fluidimage import ParamContainer -from fluidimage.topologies import prepare_path_dir_result from fluidimage.topologies.splitters import SplitterFromImages from fluidimage.util import imread from fluidimage.works.image2image import WorkImage2Image @@ -71,7 +70,6 @@ def create_default_params(cls): return params def __init__(self, params, logging_level="info", nb_max_workers=None): - self.params = params if params.im2im is None: raise ValueError("params.im2im has to be set.") @@ -80,16 +78,9 @@ def __init__(self, params, logging_level="info", nb_max_workers=None): self.serie = self.work.serie im2im_func = self.work.im2im_func - path_dir = self.serie.path_dir - path_dir_result, self.how_saving = prepare_path_dir_result( - path_dir, params.saving.path, params.saving.postfix, params.saving.how - ) - - self.path_dir_result = path_dir_result - self.path_dir_src = Path(path_dir) - super().__init__( - path_dir_result=path_dir_result, + params=params, + path_dir_src=self.serie.path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) diff --git a/src/fluidimage/topologies/mean.py b/src/fluidimage/topologies/mean.py new file mode 100644 index 00000000..9cdbba07 --- /dev/null +++ b/src/fluidimage/topologies/mean.py @@ -0,0 +1,271 @@ +"""Mean images topology + +.. autoclass:: TopologyMeanImage + :members: + :private-members: + +""" + +import argparse +import os +from pathlib import Path + +import h5py +import numpy as np +from PIL import Image + +import fluidimage +from fluiddyn.util.serieofarrays import SerieOfArraysFromFiles +from fluidimage import ParamContainer +from fluidimage.topologies.base import TopologyBaseFromImages +from fluidimage.topologies.splitters import SplitterFromImages +from fluidimage.util import imread +from fluidimage.works import BaseWorkFromImage + +# from transonic import boost, Array + + +# A2d = Array[np.uint32, "2d", "C"] + + +# x2 speedup of this operation but this is clearly not the bottleneck yet... +# @boost +# def sum_4_2darrays(arr0: A2d, arr1: A2d, arr2: A2d, arr3: A2d): +# """Sum 4 2d arrays""" +# n0, n1 = arr0.shape +# for i0 in range(n0): +# for i1 in range(n1): +# arr0[i0, i1] = ( +# arr0[i0, i1] + arr1[i0, i1] + arr2[i0, i1] + arr3[i0, i1] +# ) +# return arr0 + + +class TopologyMeanImage(TopologyBaseFromImages): + """Compute in parallel the mean image.""" + + _short_name = "mean" + Splitter = SplitterFromImages + result: np.ndarray + path_result: Path + + @classmethod + def create_default_params(cls): + params = ParamContainer(tag="params") + super()._add_default_params_saving(params) + BaseWorkFromImage._complete_params_with_default(params) + return params + + def __init__(self, params, logging_level="info", nb_max_workers=None): + + p_images = params.images + self.serie = SerieOfArraysFromFiles(p_images.path, p_images.str_subset) + + super().__init__( + params=params, + path_dir_src=self.serie.path_dir, + logging_level=logging_level, + nb_max_workers=nb_max_workers, + ) + + queue_paths = self.add_queue("paths") + queue_arrays = self.add_queue("arrays") + queue_tmp_arrays = self.add_queue("tmp_arrays", "list") + + self.add_work( + "fill_path", + self.fill_queue_paths, + output_queue=queue_paths, + kind="one shot", + ) + + self.add_work( + "read_array", + imread, + input_queue=queue_paths, + output_queue=queue_arrays, + kind="io", + ) + + self.add_work( + "main", + self.main, + input_queue=(queue_paths, queue_arrays, queue_tmp_arrays), + kind="global", + ) + + self.results = [] + + image = self.serie.get_array_from_index(0) + self.original_dtype = image.dtype + + def reduce_queue_tmp_arrays4(self, queue_tmp_arrays): + while len(queue_tmp_arrays) >= 4: + arr0, n0 = queue_tmp_arrays.pop() + arr1, n1 = queue_tmp_arrays.pop() + arr2, n2 = queue_tmp_arrays.pop() + arr3, n3 = queue_tmp_arrays.pop() + arr_sum = arr0 + arr1 + arr2 + arr3 + # arr_sum = sum_4_2darrays(arr0, arr1, arr2, arr3) + n_sum = n0 + n1 + n2 + n3 + # print("reduce_queue_tmp_arrays4", n_sum) + queue_tmp_arrays.insert(0, (arr_sum, n_sum)) + self.results.extend([n_sum] * 3) + + def reduce_queue_tmp_arrays2(self, queue_tmp_arrays): + while len(queue_tmp_arrays) >= 2: + arr0, n0 = queue_tmp_arrays.pop() + arr1, n1 = queue_tmp_arrays.pop() + arr_sum = arr0 + arr1 + n_sum = n0 + n1 + # print("reduce_queue_tmp_arrays2", n_sum) + queue_tmp_arrays.insert(0, (arr_sum, n_sum)) + self.results.extend([n_sum]) + + def main(self, input_queues, output_queue): + del output_queue + queue_paths, queue_arrays, queue_tmp_arrays = input_queues + assert isinstance(queue_tmp_arrays, list) + + while queue_arrays: + name, arr = queue_arrays.pop_first_item() + queue_tmp_arrays.append((arr.astype(np.uint32), 1)) + + self.reduce_queue_tmp_arrays4(queue_tmp_arrays) + + if ( + not queue_paths + and not queue_arrays + and ( + not hasattr(self.executor, "nb_working_workers_io") + or self.executor.nb_working_workers_io == 0 + ) + ): + self.reduce_queue_tmp_arrays2(queue_tmp_arrays) + if not queue_tmp_arrays: + return + assert len(queue_tmp_arrays) == 1, queue_tmp_arrays + arr_result, n_result = queue_tmp_arrays.pop() + + executor = self.executor + + try: + index_process = executor.index_process + except AttributeError: + index_process = 0 + + path = executor.path_job_data / f"tmp_sum{index_process:03d}.h5" + with h5py.File(path, "w") as file: + file.create_dataset("arr", data=arr_result) + file.attrs["num_images"] = n_result + + def final_seq_work(self): + path_tmp_files = sorted(self.executor.path_job_data.glob("tmp_sum*.h5")) + + queue_tmp_arrays = [] + for path_tmp_file in path_tmp_files: + with h5py.File(path_tmp_file, "r") as file: + arr = file["arr"][...] + num_images = file.attrs["num_images"] + queue_tmp_arrays.append((arr, num_images)) + + self.reduce_queue_tmp_arrays4(queue_tmp_arrays) + self.reduce_queue_tmp_arrays2(queue_tmp_arrays) + + assert len(queue_tmp_arrays) == 1 + arr, num_images = queue_tmp_arrays[0] + self.result = (arr / num_images).astype(self.original_dtype) + self.results.append(num_images) + + self.path_result = self.path_dir_result.with_name( + self.path_dir_result.name + ".png" + ) + + im = Image.fromarray(self.result) + im.save(self.path_result, "PNG") + im.close() + + for path_tmp_file in path_tmp_files: + path_tmp_file.unlink() + + +def parse_args(): + """Parse the arguments of the command line""" + + parser = argparse.ArgumentParser( + description=TopologyMeanImage.__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "path", + help="Path file or directory.", + type=str, + nargs="?", + default=os.getcwd(), + ) + parser.add_argument("-v", "--verbose", help="verbose mode", action="count") + parser.add_argument( + "-V", + "--version", + help="Print fluidimage version and exit", + action="count", + ) + + parser.add_argument( + "--executor", + help="Name of the executor.", + type=str, + default=None, + ) + + parser.add_argument( + "-np", + "--nb-max-workers", + help="Maximum number of workers/processes.", + type=int, + default=None, + ) + + parser.add_argument( + "--subset", + help="Subset of images.", + type=str, + default=None, + ) + + parser.add_argument( + "-o", + "--output", + help="Output path (without the .png extension).", + type=str, + default=None, + ) + + return parser.parse_args() + + +Topology = TopologyMeanImage + + +def main(): + """Main function for fluidimage-mean""" + args = parse_args() + + if args.version: + print(f"fluidimage {fluidimage.__version__}") + return + + if args.executor is None: + if args.nb_max_workers == 1: + args.executor = "exec_sequential" + else: + args.executor = "exec_async_sequential" + + print(args) + + params = Topology.create_default_params() + params.images.path = str(args.path) + params.images.str_subset = args.subset + params.saving.path = args.output + topology = Topology(params) + topology.compute(args.executor, nb_max_workers=args.nb_max_workers) diff --git a/src/fluidimage/topologies/meson.build b/src/fluidimage/topologies/meson.build index b3428380..78d2221d 100644 --- a/src/fluidimage/topologies/meson.build +++ b/src/fluidimage/topologies/meson.build @@ -7,6 +7,7 @@ python_sources = [ 'image2image.py', 'launcher.py', 'log.py', + 'mean.py', 'nb_cpu_cores.py', 'optical_flow.py', 'piv.py', @@ -16,6 +17,7 @@ python_sources = [ 'test_bos.py', 'test_example.py', 'test_image2image.py', + 'test_mean.py', 'test_optical_flow.py', 'test_piv.py', 'test_preproc.py', diff --git a/src/fluidimage/topologies/piv.py b/src/fluidimage/topologies/piv.py index 51599e49..28cf46c8 100644 --- a/src/fluidimage/topologies/piv.py +++ b/src/fluidimage/topologies/piv.py @@ -13,7 +13,7 @@ from fluidimage import ParamContainer, SeriesOfArrays from fluidimage.data_objects.piv import ArrayCouple, get_name_piv -from fluidimage.topologies import TopologyBaseFromSeries, prepare_path_dir_result +from fluidimage.topologies import TopologyBaseFromSeries from fluidimage.topologies.nb_cpu_cores import nb_cores from fluidimage.topologies.splitters import SplitterFromSeries from fluidimage.util import imread, logger @@ -88,13 +88,9 @@ def __init__(self, params, logging_level="info", nb_max_workers=None): ind_step=params.series.ind_step, ) - path_dir = self.series.serie.path_dir - path_dir_result, self.how_saving = prepare_path_dir_result( - path_dir, params.saving.path, params.saving.postfix, params.saving.how - ) - super().__init__( - path_dir_result=path_dir_result, + params=params, + path_dir_src=self.series.serie.path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) diff --git a/src/fluidimage/topologies/preproc.py b/src/fluidimage/topologies/preproc.py index 98ae099a..4aabad27 100644 --- a/src/fluidimage/topologies/preproc.py +++ b/src/fluidimage/topologies/preproc.py @@ -16,7 +16,7 @@ from fluidimage import SeriesOfArrays from fluidimage.data_objects.preproc import ArraySerie as ArraySubset from fluidimage.data_objects.preproc import PreprocResults, get_name_preproc -from fluidimage.topologies import TopologyBaseFromSeries, prepare_path_dir_result +from fluidimage.topologies import TopologyBaseFromSeries from fluidimage.topologies.splitters import SplitterFromSeries from fluidimage.util import imread from fluidimage.works import image2image @@ -191,23 +191,14 @@ def __init__( path_dir = params.series.path else: path_dir = os.path.dirname(params.series.path) - self.path_dir_input = path_dir - - path_dir_result, self.how_saving = prepare_path_dir_result( - path_dir, - params.saving.path, - params.saving.postfix, - params.saving.how, - ) super().__init__( - path_dir_result=path_dir_result, + params=params, + path_dir_src=path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) - self.params.saving.path = self.path_dir_result - # Define waiting queues queue_subsets_of_names = self.add_queue("subsets of filenames") queue_paths = self.add_queue("image paths") diff --git a/src/fluidimage/topologies/surface_tracking.py b/src/fluidimage/topologies/surface_tracking.py index a2041cef..ac85126c 100644 --- a/src/fluidimage/topologies/surface_tracking.py +++ b/src/fluidimage/topologies/surface_tracking.py @@ -20,7 +20,7 @@ from fluiddyn.io.image import imsave_h5 from fluidimage import ParamContainer, SerieOfArraysFromFiles, SeriesOfArrays -from fluidimage.topologies import TopologyBase, prepare_path_dir_result +from fluidimage.topologies import TopologyBase from fluidimage.util import imread, logger from fluidimage.works import image2image from fluidimage.works.surface_tracking import WorkSurfaceTracking @@ -123,18 +123,11 @@ def __init__(self, params, logging_level="info", nb_max_workers=None): ind_stop=self.serie.get_slicing_tuples()[0][1] - 1, ind_step=self.serie.get_slicing_tuples()[0][2], ) - path_dir = self.serie.path_dir - path_dir_result, self.how_saving = prepare_path_dir_result( - path_dir, params.saving.path, params.saving.postfix, params.saving.how - ) - - self.path_dir_result = path_dir_result - self.path_dir_src = Path(path_dir) - self.surface_tracking_work = WorkSurfaceTracking(params) super().__init__( - path_dir_result=path_dir_result, + params=params, + path_dir_src=self.serie.path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) diff --git a/src/fluidimage/topologies/test_example.py b/src/fluidimage/topologies/test_example.py index 5fe0ff3c..f1b0a00d 100644 --- a/src/fluidimage/topologies/test_example.py +++ b/src/fluidimage/topologies/test_example.py @@ -16,8 +16,10 @@ # tmp: TopologyExample doesn't have a Splitter executors.remove("multi_exec_subproc") +executors.remove("multi_exec_subproc_sync") +@pytest.mark.usefixtures("close_plt_figs") @pytest.mark.parametrize("executor", executors) def test_topo_example(tmp_path_karman, executor): @@ -38,7 +40,9 @@ def test_topo_example(tmp_path_karman, executor): assert ( log.topology_name == "fluidimage.topologies.example.TopologyExample" ) - assert log.nb_max_workers == 2 + + if executor != "exec_sequential": + assert log.nb_max_workers == 2 if [ len(log.durations[key]) diff --git a/src/fluidimage/topologies/test_mean.py b/src/fluidimage/topologies/test_mean.py new file mode 100644 index 00000000..dd7eb877 --- /dev/null +++ b/src/fluidimage/topologies/test_mean.py @@ -0,0 +1,85 @@ +import sys + +import numpy as np +import pytest + +from fluiddyn.io.image import imsave +from fluidimage.executors import supported_multi_executors +from fluidimage.topologies.mean import Topology, main + +executors = [ + "exec_sequential", + "exec_async_sequential", + "exec_async", + "exec_async_multi", + "exec_async_servers", + "exec_async_servers_threading", +] + +executors.extend(supported_multi_executors) + +num_images = 19 + + +@pytest.fixture(scope="session") +def tmp_path_images(tmp_path_factory): + tmp_path = tmp_path_factory.mktemp("dir_images") + path_dir_images = tmp_path / "images" + path_dir_images.mkdir() + + im = np.empty([4, 8], dtype=np.uint8) + + for idx in range(num_images): + im.fill(idx) + name = f"im{idx:03d}.png" + imsave(path_dir_images / name, im, as_int=True) + + return path_dir_images + + +@pytest.mark.parametrize("executor", executors) +def test_mean(tmp_path_images, executor): + + mean_should_be = num_images // 2 + + tmp_path = tmp_path_images + + params = Topology.create_default_params() + params.images.path = str(tmp_path) + params.saving.postfix = executor + + topology = Topology(params) + + topology.compute(executor, nb_max_workers=2) + + result = topology.result + + assert result[0, 0] == mean_should_be, ( + result[0, 0], + mean_should_be, + ) + assert np.all(np.isclose(result, result[0, 0])) + + assert len(topology.results) == num_images + assert topology.path_result.exists() + + +def test_mean_help(monkeypatch): + + command = "fluidimage-mean -h" + + with monkeypatch.context() as ctx: + ctx.setattr(sys, "argv", command.split()) + + +def test_mean_command(tmp_path_images, monkeypatch): + + tmp_path = tmp_path_images + path_output = tmp_path / "a-great-name.png" + + command = f"fluidimage-monitor {tmp_path} -o {path_output.with_suffix('')}" + with monkeypatch.context() as ctx: + ctx.setattr(sys, "argv", command.split()) + main() + + assert path_output.exists()