Skip to content

Commit

Permalink
Merge branch 'topic/default/topo_mean' into 'branch/default'
Browse files Browse the repository at this point in the history
topology mean

See merge request fluiddyn/fluidimage!109
  • Loading branch information
paugier committed May 24, 2024
2 parents e99a984 + 5d2a1c1 commit 538844e
Show file tree
Hide file tree
Showing 34 changed files with 758 additions and 206 deletions.
14 changes: 13 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Release notes

See also the
[unreleased changes](https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.1...branch%2Fdefault).
[unreleased changes](https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.2...branch%2Fdefault).

## [0.5.2] (2024-05-??)

- New command `fluidimage-mean` to compute mean of images with the new topology
{class}`fluidimage.topologies.mean.TopologyMeanImage`.

- New executors `multi_exec_sync` ({mod}`fluidimage.executors.multi_exec_sync`) and
`multi_exec_subproc_sync` ({mod}`fluidimage.executors.multi_exec_subproc_sync`).

- [!108](https://foss.heptapod.net/fluiddyn/fluidimage/-/merge_requests/108): save UVmat
error codes in PIV files.

## [0.5.1] (2024-05-06)

Expand Down Expand Up @@ -217,3 +228,4 @@ This version contains incompatible API changes documented here.
[0.4.6]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.4.5...0.4.6
[0.5.0]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.4.6...0.5.0
[0.5.1]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.0...0.5.1
[0.5.2]: https://foss.heptapod.net/fluiddyn/fluidimage/-/compare/0.5.1...0.5.2
38 changes: 38 additions & 0 deletions bench/profile_mean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pathlib import Path
from shutil import rmtree

from pyinstrument import Profiler
from pyinstrument.renderers import ConsoleRenderer

from fluidimage.topologies.mean import Topology

path_images = Path("/fsnet/project/watu/2022/22INTERNAL_F/DATA/EXP44/PCO_50mm")
path_images = Path("/data/PCO_50mm")
rmtree(path_images.parent / "PCO_50mm.mean", ignore_errors=True)

params = Topology.create_default_params()
params.images.path = str(path_images / "im*.png")
params.images.str_subset = ":100"

topology = Topology(params)
executor = "exec_sequential"
executor = "exec_async_sequential"
# executor = "multi_exec_sync"
executor = "multi_exec_async"

profiler = Profiler()
profiler.start()
topology.compute(executor=executor, nb_max_workers=4)
profiler.stop()

print(
profiler.output(
renderer=ConsoleRenderer(
unicode=True,
color=True,
show_all=False,
# time="percent_of_total",
# flat=True, # buggy with pyinstrument 4.6.2!
)
)
)
8 changes: 8 additions & 0 deletions doc/commands.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Commands

Fluidimage provides few useful commands:

- `fluidimage-monitor`: monitor Fluidimage computations.
- `fluidimviewer`: visualize images.
- `fluidpivviewer`: visualize PIV fields.
- `fluidimage-mean`: compute the mean of images.
1 change: 1 addition & 0 deletions doc/examples/pivchallenge/possible_paths.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/data/pivchallenge
/data1/pivchallenge
/fsnet/project/meige/2016/16FLUIDIMAGE/samples/pivchallenge
/storage2/pivchallenge
~/pivchallenge
36 changes: 36 additions & 0 deletions doc/examples/pivchallenge/profile_mean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from shutil import rmtree

from path_images import get_path
from pyinstrument import Profiler
from pyinstrument.renderers import ConsoleRenderer

from fluidimage.topologies.mean import Topology

path_images = get_path("2005C")
rmtree(path_images.parent / "Images.mean", ignore_errors=True)

params = Topology.create_default_params()
params.images.path = str(path_images / "c*.bmp")

topology = Topology(params)
executor = "exec_sequential"
# executor = "exec_async_sequential"
executor = "multi_exec_sync"
# executor = "multi_exec_async"

profiler = Profiler()
profiler.start()
topology.compute(executor=executor, nb_max_workers=2)
profiler.stop()

print(
profiler.output(
renderer=ConsoleRenderer(
unicode=True,
color=True,
show_all=False,
# time="percent_of_total",
# flat=True, # buggy with pyinstrument 4.6.2!
)
)
)
1 change: 1 addition & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ overview
install
tutorial
examples
commands
build-from-source
```

Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build-backend = 'mesonpy'

[project]
name = "fluidimage"
version = "0.5.1"
version = "0.5.2"
description = "Fluid image processing with Python."
authors = [
{name = "Pierre Augier", email = "pierre.augier@legi.cnrs.fr"},
Expand Down Expand Up @@ -58,6 +58,7 @@ fluidimage-monitor = "fluidimage.gui.monitor:main"
fluidimviewer = "fluidimage.gui.imviewer:main"
fluidimviewer-pg = "fluidimage.gui.pg_main:main"
fluidpivviewer = "fluidimage.gui.piv_viewer:main"
fluidimage-mean = "fluidimage.topologies.mean:main"

[project.optional-dependencies]
pims = ["pims"]
Expand All @@ -72,8 +73,11 @@ exec_sequential = "fluidimage.executors.exec_sequential"
exec_async = "fluidimage.executors.exec_async"
exec_async_sequential = "fluidimage.executors.exec_async_sequential"
exec_async_seq_for_multi = "fluidimage.executors.exec_async_seq_for_multi"
exec_seq_for_multi = "fluidimage.executors.exec_seq_for_multi"
multi_exec_async = "fluidimage.executors.multi_exec_async"
multi_exec_sync = "fluidimage.executors.multi_exec_sync"
multi_exec_subproc = "fluidimage.executors.multi_exec_subproc"
multi_exec_subproc_sync = "fluidimage.executors.multi_exec_subproc_sync"
exec_async_multi = "fluidimage.executors.exec_async_multiproc"
exec_async_servers = "fluidimage.executors.exec_async_servers"
exec_async_servers_threading = "fluidimage.executors.exec_async_servers_threading"
Expand Down
6 changes: 5 additions & 1 deletion src/fluidimage/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
exec_async
exec_async_sequential
multi_exec_async
multi_exec_sync
multi_exec_subproc
multi_exec_subproc_sync
exec_async_seq_for_multi
exec_seq_for_multi
exec_async_multiproc
exec_async_servers
servers
Expand Down Expand Up @@ -58,8 +61,9 @@ def afterfork():
# because the OS does not support forks (or not fully) and
# multiprocessing works differently than on Linux
# see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
supported_multi_executors = ["multi_exec_subproc"]
supported_multi_executors = ["multi_exec_subproc", "multi_exec_subproc_sync"]
if sys.platform == "linux":
supported_multi_executors.insert(0, "multi_exec_sync")
supported_multi_executors.insert(0, "multi_exec_async")


Expand Down
87 changes: 71 additions & 16 deletions src/fluidimage/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class ExecutorBase(ABC):
_path_lockfile: Path
num_expected_results: int
time_start: str
_final_seq_work_run: bool

# for results log
_path_results: Path
_path_num_results: Path
_len_saved_results: int

def _init_log_path(self):
self.time_start_str = time_as_str()
Expand Down Expand Up @@ -202,6 +208,7 @@ def _get_file_object_for_logger(self):
return sys.stdout

def _init_compute(self):
self._final_seq_work_run = False
self.t_start = time()
self._init_num_expected_results()
if self.num_expected_results == 0:
Expand All @@ -218,7 +225,7 @@ def _init_compute_log(self):
print(" executor:", executor_name)
print(" nb_cpus_allowed =", nb_cores)
print(" nb_max_workers =", self.nb_max_workers)
print(" num_expected_results", self.num_expected_results)
print(" num_expected_results =", self.num_expected_results)
print(" path_dir_result =", self.path_dir_result)
print(
"Monitoring app can be launched with:\n"
Expand Down Expand Up @@ -276,7 +283,16 @@ def _reset_std_as_default(self):
reset_logger()
self._log_file.close()

def _run_final_seq_work(self):
if (
hasattr(self.topology, "final_seq_work")
and not self._final_seq_work_run
):
self.topology.final_seq_work()
self._final_seq_work_run = True

def _finalize_compute(self):
self._run_final_seq_work()
log_memory_usage(time_as_str(2) + ": end of `compute`. mem usage")
self.topology.print_at_exit(time() - self.t_start)
self._reset_std_as_default()
Expand Down Expand Up @@ -361,6 +377,42 @@ def _init_num_expected_results_first_queue(self):
self._keys_first_queue = list(self._first_queue.keys())
self.num_expected_results = len(self._keys_first_queue)

def _init_results_log(self, path_job_data):

if hasattr(self, "index_process"):
str_index_process = f"_{self.index_process:03}"
else:
str_index_process = ""

self._path_results = path_job_data / f"results{str_index_process}.txt"
self._path_num_results = (
self._path_results.parent / f"len_results{str_index_process}.txt"
)
self._len_saved_results = 0

def _save_results_names(self):

new_results = self.topology.results[self._len_saved_results :]
self._len_saved_results = len(self.topology.results)

with open(self._path_num_results, "w", encoding="utf-8") as file:
file.write(f"{self._len_saved_results}\n")

if new_results:
if isinstance(new_results[0], str):
new_results = [Path(path).name for path in new_results]
elif hasattr(new_results[0], "name"):
new_results = [_r.name for _r in new_results]
else:
new_results = [str(_r) for _r in new_results]
new_results = "\n".join(new_results) + "\n"

with open(self._path_results, "a", encoding="utf-8") as file:
file.write(new_results)

if not self._log_file.closed:
self._log_file.flush()


class MultiExecutorBase(ExecutorBase):
"""Manage the multi-executor mode
Expand Down Expand Up @@ -454,16 +506,6 @@ def handler_signals(signal_number, stack):

self._start_processes()
self.nb_processes = len(self.processes)
self._wait_for_all_processes()
self._finalize_compute()

def _poll_return_code(self, process):
return process.poll()

def _join_processes(self):
"""Join the processes"""

def _wait_for_all_processes(self):

running_processes = {
idx: process for idx, process in enumerate(self.processes)
Expand Down Expand Up @@ -523,13 +565,26 @@ def _wait_for_all_processes(self):
)
running_processes_updated.clear()

self._join_processes()
self._join_processes()

self.topology.results = results = []
for path in self.path_job_data.glob("results_*.txt"):
with open(path, encoding="utf-8") as file:
results.extend(line.strip() for line in file.readlines())

self._run_final_seq_work()

progress.update(progress_task, completed=len(self.topology.results))

self._finalize_compute()

def _poll_return_code(self, process):
return process.poll()

def _join_processes(self):
"""Join the processes"""

def _finalize_compute(self):
self.topology.results = results = []
for path in self.path_job_data.glob("results_*.txt"):
with open(path, encoding="utf-8") as file:
results.extend(line.strip() for line in file.readlines())

if self.errors:
text_error = (
Expand Down
5 changes: 5 additions & 0 deletions src/fluidimage/executors/exec_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ def define_functions(self):
# global functions
if work.kind is not None and "global" in work.kind:

try:
work.func_or_cls.__self__.executor = self
except AttributeError:
pass

async def func(work=work):
while True:
while (
Expand Down
37 changes: 4 additions & 33 deletions src/fluidimage/executors/exec_async_seq_for_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
)

self._log_path = path_log
topology.executor = self
super().__init__(
topology,
path_dir_result,
Expand All @@ -59,17 +60,10 @@ def __init__(
self._save_topology_results
)
path_log_dir = Path(self._log_path).parent
path_job_data = path_log_dir.with_name("job" + path_log_dir.name[3:])
self._path_results = (
path_job_data / f"results_{self.index_process:03}.txt"
self.path_job_data = path_log_dir.with_name(
"job" + path_log_dir.name[3:]
)

self._path_num_results = (
self._path_results.parent
/ f"len_results_{self.index_process:03}.txt"
)

self._len_saved_results = 0
self._init_results_log(self.path_job_data)

sys.stdout = self._log_file

Expand Down Expand Up @@ -100,29 +94,6 @@ def _finalize_compute(self):
if hasattr(self.topology, "results"):
self._save_results_names()

def _save_results_names(self):

new_results = self.topology.results[self._len_saved_results :]
self._len_saved_results = len(self.topology.results)

with open(self._path_num_results, "w", encoding="utf-8") as file:
file.write(f"{self._len_saved_results}\n")

if new_results:
if isinstance(new_results[0], str):
new_results = [Path(path).name for path in new_results]
elif hasattr(new_results[0], "name"):
new_results = [_r.name for _r in new_results]
else:
new_results = [str(_r) for _r in new_results]
new_results = "\n".join(new_results) + "\n"

with open(self._path_results, "a", encoding="utf-8") as file:
file.write(new_results)

if not self._log_file.closed:
self._log_file.flush()

async def _save_topology_results(self):
while not self._has_to_stop:
self._save_results_names()
Expand Down
2 changes: 0 additions & 2 deletions src/fluidimage/executors/exec_async_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
"""

import os
import signal

import numpy as np
import trio

from fluiddyn import time_as_str
from fluidimage.util import log_debug, logger

from .exec_async import ExecutorAsync
Expand Down
Loading

0 comments on commit 538844e

Please sign in to comment.