From 67b7f4877bde5a13141ddadbccf76a5903b3fd12 Mon Sep 17 00:00:00 2001 From: Stefan Fuertinger Date: Wed, 27 Oct 2021 14:34:02 +0200 Subject: [PATCH] FIX: More testing road bumps - check for open file descriptor limit before running the tests - use `/cs/home` by default as temp storage on the ESI cluster - new routine `flush_local_cluster` is used throughout testing pipeline to free up memory for `LocalCluster` computing On branch acme_amputation Changes to be committed: modified: syncopy/__init__.py modified: syncopy/tests/conftest.py modified: syncopy/tests/misc.py modified: syncopy/tests/test_continuousdata.py modified: syncopy/tests/test_discretedata.py modified: syncopy/tests/test_selectdata.py modified: syncopy/tests/test_specest.py --- syncopy/__init__.py | 7 +- syncopy/tests/conftest.py | 7 +- syncopy/tests/misc.py | 116 ++++++++++++++++----------- syncopy/tests/test_continuousdata.py | 4 +- syncopy/tests/test_discretedata.py | 20 ++--- syncopy/tests/test_selectdata.py | 2 + syncopy/tests/test_specest.py | 84 ++----------------- 7 files changed, 101 insertions(+), 139 deletions(-) diff --git a/syncopy/__init__.py b/syncopy/__init__.py index aaab625aa..a4ac6bd63 100644 --- a/syncopy/__init__.py +++ b/syncopy/__init__.py @@ -7,6 +7,7 @@ import os import sys import subprocess +import getpass import numpy as np from hashlib import blake2b, sha1 from importlib.metadata import version, PackageNotFoundError @@ -59,10 +60,14 @@ __plt__ = False # Set package-wide temp directory +csHome = "/cs/home/{}".format(getpass.getuser()) if os.environ.get("SPYTMPDIR"): __storage__ = os.path.abspath(os.path.expanduser(os.environ["SPYTMPDIR"])) else: - __storage__ = os.path.join(os.path.expanduser("~"), ".spy") + if os.path.exists(csHome): + __storage__ = os.path.join(csHome, ".spy") + else: + __storage__ = os.path.join(os.path.expanduser("~"), ".spy") # Set upper bound for temp directory size (in GB) __storagelimit__ = 10 diff --git a/syncopy/tests/conftest.py b/syncopy/tests/conftest.py index 1e5ecf323..fa4503435 100644 --- a/syncopy/tests/conftest.py +++ b/syncopy/tests/conftest.py @@ -17,11 +17,14 @@ # skipped anyway) if __acme__: import dask.distributed as dd + import resource from acme.dask_helpers import esi_cluster_setup from syncopy.tests.misc import is_slurm_node + if max(resource.getrlimit(resource.RLIMIT_NOFILE)) < 1024: + msg = "Not enough open file descriptors allowed. Consider increasing " +\ + "the limit using, e.g., `ulimit -Sn 1024`" + raise ValueError(msg) if is_slurm_node(): - os.environ["SPYTMPDIR"] = "/cs/home/{}/.spy".format(os.environ["USER"]) - importlib.reload(syncopy) cluster = esi_cluster_setup(partition="8GBS", n_jobs=10, timeout=360, interactive=False, start_client=False) diff --git a/syncopy/tests/misc.py b/syncopy/tests/misc.py index 0ceb566ee..2b7e7d3c5 100644 --- a/syncopy/tests/misc.py +++ b/syncopy/tests/misc.py @@ -9,15 +9,19 @@ import os import h5py import tempfile +import time import numpy as np # Local imports from syncopy.datatype import AnalogData from syncopy.shared.filetypes import _data_classname_to_extension, FILE_EXT -from syncopy import __plt__ +from syncopy import __plt__, __acme__ if __plt__: import matplotlib.pyplot as plt from matplotlib.backends.backend_agg import FigureCanvasAgg +if __acme__: + import dask.distributed as dd + def is_win_vm(): """ @@ -59,12 +63,12 @@ def is_slurm_node(): else: return False - + def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None, overlapping=False, inmemory=True, dimord="default"): """ Create :class:`~syncopy.AnalogData` object with synthetic harmonic signal(s) - + Parameters ---------- nTrials : int @@ -74,45 +78,45 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None equidistant : bool If `True`, trials of equal length are defined seed : None or int - If `None`, imposed noise is completely random. If `seed` is an integer, - it is used to fix the (initial) state of NumPy's random number generator - :func:`numpy.random.default_rng`, i.e., objects created wtih same `seed` - will be populated with identical artificial signals. + If `None`, imposed noise is completely random. If `seed` is an integer, + it is used to fix the (initial) state of NumPy's random number generator + :func:`numpy.random.default_rng`, i.e., objects created wtih same `seed` + will be populated with identical artificial signals. overlapping : bool If `True`, constructed trials overlap inmemory : bool If `True`, the full `data` array (all channels across all trials) is allocated - in memory (fast but dangerous for large arrays), otherwise the output data - object's corresponding backing HDF5 file in `__storage__` is filled with - synthetic data in a trial-by-trial manner (slow but safe even for very - large datasets). + in memory (fast but dangerous for large arrays), otherwise the output data + object's corresponding backing HDF5 file in `__storage__` is filled with + synthetic data in a trial-by-trial manner (slow but safe even for very + large datasets). dimord : str or list If `dimord` is "default", the constructed output object uses the default - dimensional layout of a standard :class:`~syncopy.AnalogData` object. + dimensional layout of a standard :class:`~syncopy.AnalogData` object. If `dimord` is a list (i.e., ``["channel", "time"]``) the provided sequence - of dimensions is used. - + of dimensions is used. + Returns ------- out : :class:`~syncopy.AnalogData` object - Syncopy :class:`~syncopy.AnalogData` object with specified properties - populated with a synthetic multivariate trigonometric signal. + Syncopy :class:`~syncopy.AnalogData` object with specified properties + populated with a synthetic multivariate trigonometric signal. Notes ----- - This is an auxiliary method that is intended purely for internal use. Thus, - no error checking is performed. - + This is an auxiliary method that is intended purely for internal use. Thus, + no error checking is performed. + Examples -------- Generate small artificial :class:`~syncopy.AnalogData` object in memory - + .. code-block:: python - + >>> iAmSmall = generate_artificial_data(nTrials=5, nChannels=10, inmemory=True) >>> iAmSmall Syncopy AnalogData object with fields - + cfg : dictionary with keys '' channel : [10] element container : None @@ -126,18 +130,18 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None time : 5 element list trialinfo : [5 x 0] element trials : 5 element iterable - + Use `.log` to see object history - - Generate artificial :class:`~syncopy.AnalogData` object of more substantial + + Generate artificial :class:`~syncopy.AnalogData` object of more substantial size on disk - + .. code-block:: python - + >>> iAmBig = generate_artificial_data(nTrials=50, nChannels=1024, inmemory=False) >>> iAmBig Syncopy AnalogData object with fields - + cfg : dictionary with keys '' channel : [1024] element container : None @@ -151,9 +155,9 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None time : 200 element list trialinfo : [200 x 0] element trials : 200 element iterable - + Use `.log` to see object history - + """ # Create dummy 1d signal that will be blown up to fill channels later @@ -170,23 +174,23 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None idx[timeAxis] = -1 sig = np.repeat(sig.reshape(*idx), axis=idx.index(1), repeats=nChannels) - # Initialize random number generator (with possibly user-provided seed-value) - rng = np.random.default_rng(seed) + # Initialize random number generator (with possibly user-provided seed-value) + rng = np.random.default_rng(seed) # Either construct the full data array in memory using tiling or create # an HDF5 container in `__storage__` and fill it trial-by-trial - # NOTE: use `swapaxes` here to ensure two objects created w/same seed really - # are affected w/identical additive noise patterns, no matter their respective + # NOTE: use `swapaxes` here to ensure two objects created w/same seed really + # are affected w/identical additive noise patterns, no matter their respective # `dimord`. out = AnalogData(samplerate=1/dt, dimord=dimord) if inmemory: - idx[timeAxis] = nTrials + idx[timeAxis] = nTrials sig = np.tile(sig, idx) shp = [slice(None), slice(None)] for iTrial in range(nTrials): shp[timeAxis] = slice(iTrial*t.size, (iTrial + 1)*t.size) noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5 - sig[tuple(shp)] += np.swapaxes(noise, timeAxis, 0) + sig[tuple(shp)] += np.swapaxes(noise, timeAxis, 0) out.data = sig else: with h5py.File(out.filename, "w") as h5f: @@ -197,7 +201,7 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None for iTrial in range(nTrials): shp[timeAxis] = slice(iTrial*t.size, (iTrial + 1)*t.size) noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5 - dset[tuple(shp)] = sig + np.swapaxes(noise, timeAxis, 0) + dset[tuple(shp)] = sig + np.swapaxes(noise, timeAxis, 0) dset.flush() out.data = h5py.File(out.filename, "r+")["data"] @@ -235,11 +239,11 @@ def construct_spy_filename(basepath, obj): objext = _data_classname_to_extension(obj.__class__.__name__) return os.path.join(basepath + FILE_EXT["dir"], basename + objext) - + def figs_equal(fig1, fig2, tol=None): """ Test if two figures are identical - + Parameters ---------- fig1 : matplotlib figure object @@ -247,20 +251,20 @@ def figs_equal(fig1, fig2, tol=None): fig2 : matplotlib figure object Template figure tol : float - Positive scalar (b/w 0 and 1) specifying tolerance level for considering - `fig1` and `fig2` identical. If `None`, two figures have to be exact - pixel-perfect copies to be qualified as identical. - + Positive scalar (b/w 0 and 1) specifying tolerance level for considering + `fig1` and `fig2` identical. If `None`, two figures have to be exact + pixel-perfect copies to be qualified as identical. + Returns ------- equal : bool `True` if `fig1` and `fig2` are identical, `False` otherwise - + Notes ----- - This is an auxiliary method that is intended purely for internal use. Thus, - no error checking is performed. - + This is an auxiliary method that is intended purely for internal use. Thus, + no error checking is performed. + Examples -------- >>> import numpy as np @@ -283,3 +287,21 @@ def figs_equal(fig1, fig2, tol=None): if tol is None: return np.array_equal(plt.imread(img1.name), plt.imread(img2.name)) return np.allclose(plt.imread(img1.name), plt.imread(img2.name), atol=tol) + + +def flush_local_cluster(testcluster, timeout=10): + """ + Resets a parallel computing client to avoid memory spilling + """ + if isinstance(testcluster, dd.LocalCluster): + # client.restart() + client = dd.get_client() + client.close() + time.sleep(1.0) + client = dd.Client(testcluster) + waiting = 0 + while len([w["memory_limit"] for w in testcluster.scheduler_info["workers"].values()]) == 0 \ + and waiting < timeout: + time.sleep(1.0) + waiting += 1 + return diff --git a/syncopy/tests/test_continuousdata.py b/syncopy/tests/test_continuousdata.py index 05c2ccac6..6d1bf4db0 100644 --- a/syncopy/tests/test_continuousdata.py +++ b/syncopy/tests/test_continuousdata.py @@ -18,7 +18,7 @@ from syncopy.datatype.methods.selectdata import selectdata from syncopy.shared.errors import SPYValueError, SPYTypeError from syncopy.shared.tools import StructDict -from syncopy.tests.misc import generate_artificial_data, construct_spy_filename +from syncopy.tests.misc import flush_local_cluster, generate_artificial_data, construct_spy_filename from syncopy import __acme__ if __acme__: import dask.distributed as dd @@ -455,6 +455,7 @@ def test_parallel(self, testcluster): "test_dataselection"] for test in par_tests: getattr(self, test)() + flush_local_cluster(testcluster) client.close() @@ -668,4 +669,5 @@ def test_sd_parallel(self, testcluster): par_tests = ["test_sd_dataselection"] for test in par_tests: getattr(self, test)() + flush_local_cluster(testcluster) client.close() diff --git a/syncopy/tests/test_discretedata.py b/syncopy/tests/test_discretedata.py index 6680f66e0..eb37714b2 100644 --- a/syncopy/tests/test_discretedata.py +++ b/syncopy/tests/test_discretedata.py @@ -17,7 +17,7 @@ from syncopy.datatype.methods.selectdata import selectdata from syncopy.io import save, load from syncopy.shared.errors import SPYValueError, SPYTypeError -from syncopy.tests.misc import construct_spy_filename +from syncopy.tests.misc import construct_spy_filename, flush_local_cluster from syncopy import __acme__ if __acme__: import dask.distributed as dd @@ -222,6 +222,7 @@ def test_parallel(self, testcluster): par_tests = ["test_dataselection"] for test in par_tests: getattr(self, test)() + flush_local_cluster(testcluster) client.close() class TestEventData(): @@ -244,7 +245,7 @@ class TestEventData(): adata = np.arange(1, nc * ns + 1).reshape(ns, nc) - def test_empty(self): + def test_ed_empty(self): dummy = EventData() assert len(dummy.cfg) == 0 assert dummy.dimord == None @@ -253,7 +254,7 @@ def test_empty(self): with pytest.raises(SPYTypeError): EventData({}) - def test_nparray(self): + def test_ed_nparray(self): dummy = EventData(self.data) assert dummy.dimord == ["sample", "eventid"] assert dummy.eventid.size == self.num_evt @@ -267,7 +268,7 @@ def test_nparray(self): with pytest.raises(SPYValueError): EventData(np.ones((3,))) - def test_trialretrieval(self): + def test_ed_trialretrieval(self): # test ``_get_trial`` with NumPy array: regular order dummy = EventData(self.data, trialdefinition=self.trl) smp = self.data[:, 0] @@ -287,7 +288,7 @@ def test_trialretrieval(self): trl_ref = self.data2[idx, ...] assert np.array_equal(dummy._get_trial(trlno), trl_ref) - def test_saveload(self): + def test_ed_saveload(self): with tempfile.TemporaryDirectory() as tdir: fname = os.path.join(tdir, "dummy") @@ -338,7 +339,7 @@ def test_saveload(self): del dummy, dummy2 time.sleep(0.1) - def test_trialsetting(self): + def test_ed_trialsetting(self): # Create sampleinfo w/ EventData vs. AnalogData samplerate sr_e = 2 @@ -471,7 +472,7 @@ def test_trialsetting(self): ang_dummy.definetrial(evt_dummy, pre=pre, post=post, trigger=1) # test data-selection via class method - def test_dataselection(self): + def test_ed_dataselection(self): dummy = EventData(data=self.data, trialdefinition=self.trl, samplerate=2.0) @@ -525,10 +526,11 @@ def test_dataselection(self): assert np.array_equal(cfg.out.data, selected.data) @skip_without_acme - def test_parallel(self, testcluster): + def test_ed_parallel(self, testcluster): # repeat selected test w/parallel processing engine client = dd.Client(testcluster) - par_tests = ["test_dataselection"] + par_tests = ["test_ed_dataselection"] for test in par_tests: getattr(self, test)() + flush_local_cluster(testcluster) client.close() diff --git a/syncopy/tests/test_selectdata.py b/syncopy/tests/test_selectdata.py index 17ceb7a9a..365ea61f1 100644 --- a/syncopy/tests/test_selectdata.py +++ b/syncopy/tests/test_selectdata.py @@ -10,6 +10,7 @@ # Local imports import syncopy.datatype as spd +from syncopy.tests.misc import flush_local_cluster from syncopy.datatype import AnalogData, SpectralData from syncopy.datatype.base_data import Selector from syncopy.datatype.methods.selectdata import selectdata @@ -733,6 +734,7 @@ def test_parallel(self, testcluster): if (inspect.ismethod(getattr(self, attr)) and attr != "test_parallel")] for test in all_tests: getattr(self, test)() + flush_local_cluster(testcluster) client.close() diff --git a/syncopy/tests/test_specest.py b/syncopy/tests/test_specest.py index 075238019..f4db5fff0 100644 --- a/syncopy/tests/test_specest.py +++ b/syncopy/tests/test_specest.py @@ -19,7 +19,7 @@ import dask.distributed as dd # Local imports -from syncopy.tests.misc import generate_artificial_data +from syncopy.tests.misc import generate_artificial_data, flush_local_cluster from syncopy.specest.freqanalysis import freqanalysis from syncopy.shared.errors import SPYValueError from syncopy.datatype.methods.padding import _nextpow2 @@ -31,20 +31,6 @@ skip_without_acme = pytest.mark.skipif(not __acme__, reason="acme not available") -# Workaround: close and re-open `LocalCluster` to free up memory accumulated over tests -def _flush_local_cluster(): - if __acme__: - try: - client = dd.get_client() - if isinstance(client.cluster, dd.LocalCluster): - cluster = client.cluster - client.close() - del client - client = dd.Client(cluster) - except ValueError: - pass - return - # Local helper for constructing TF testing signals def _make_tf_signal(nChannels, nTrials, seed, fadeIn=None, fadeOut=None): @@ -166,15 +152,13 @@ def test_output(self): spec = freqanalysis(self.adata, method="mtmfft", taper="hann", output="fourier", select=select) assert "complex" in spec.data.dtype.name - _flush_local_cluster() spec = freqanalysis(self.adata, method="mtmfft", taper="hann", output="abs", select=select) assert "float" in spec.data.dtype.name - _flush_local_cluster() spec = freqanalysis(self.adata, method="mtmfft", taper="hann", output="pow", select=select) assert "float" in spec.data.dtype.name - _flush_local_cluster() + def test_allocout(self): # call `freqanalysis` w/pre-allocated output object @@ -185,7 +169,6 @@ def test_allocout(self): assert out.freq.size == self.fband.size + 1 assert np.allclose([0] + self.fband.tolist(), out.freq) assert out.channel.size == self.nChannels - _flush_local_cluster() # build `cfg` object for calling cfg = StructDict() @@ -201,7 +184,6 @@ def test_allocout(self): assert len(cfg.out.time[0]) == 1 assert np.all(cfg.out.sampleinfo == [0, 1]) assert cfg.out.data.shape[0] == 1 # ensure trial-count == 1 - _flush_local_cluster() # keep trials but throw away tapers out = SpectralData(dimord=SpectralData._defaultDimord) @@ -209,7 +191,6 @@ def test_allocout(self): keeptapers=False, output="pow", out=out) assert out.sampleinfo.shape == (self.nTrials, 2) assert out.taper.size == 1 - _flush_local_cluster() # re-use `cfg` from above and additionally throw away `tapers` cfg.dataset = self.adata @@ -219,7 +200,6 @@ def test_allocout(self): cfg.keeptapers = False freqanalysis(cfg) assert cfg.out.taper.size == 1 - _flush_local_cluster() def test_solution(self): # ensure channel-specific frequencies are identified correctly @@ -227,7 +207,6 @@ def test_solution(self): sel = Selector(self.adata, select) spec = freqanalysis(self.adata, method="mtmfft", taper="hann", output="pow", select=select) - _flush_local_cluster() chanList = np.arange(self.nChannels)[sel.channel] amps = np.empty((len(sel.trials) * len(chanList),)) @@ -261,14 +240,12 @@ def test_foi(self): spec = freqanalysis(self.adata, method="mtmfft", taper="hann", foi=ftmp, select=select) assert np.all(spec.freq == foi) - _flush_local_cluster() # unsorted, duplicate entries in `foi` - result must stay the same ftmp = np.hstack([foi, np.full(20, foi[0])]) spec = freqanalysis(self.adata, method="mtmfft", taper="hann", foi=ftmp, select=select) assert np.all(spec.freq == foi) - _flush_local_cluster() def test_dpss(self): @@ -281,14 +258,12 @@ def test_dpss(self): taper="dpss", output="pow", select=select) assert spec.taper.size == 1 assert spec.channel.size == len(chanList) - _flush_local_cluster() # specify tapers spec = freqanalysis(self.adata, method="mtmfft", taper="dpss", tapsmofrq=7, keeptapers=True, select=select) assert spec.taper.size == 7 assert spec.channel.size == len(chanList) - _flush_local_cluster() # non-equidistant data w/multiple tapers artdata = generate_artificial_data(nTrials=5, nChannels=16, @@ -315,7 +290,6 @@ def test_dpss(self): sel = Selector(artdata, select) cfg.select = select spec = freqanalysis(cfg, artdata) - _flush_local_cluster() # ensure correctness of padding (respecting min. trial length + time-selection) if select is None: @@ -351,7 +325,6 @@ def test_dpss(self): cfg.select = select spec = freqanalysis(cfg) - _flush_local_cluster() # ensure correctness of padding (respecting min. trial length + time-selection) if select is None: @@ -389,7 +362,6 @@ def test_dpss(self): cfg.select = select spec = freqanalysis(cfg) - _flush_local_cluster() # ensure correctness of padding (respecting min. trial length + time-selection) if select is None: @@ -436,7 +408,7 @@ def test_parallel(self, testcluster): all_tests.remove("test_vdata") for test in all_tests: getattr(self, test)() - _flush_local_cluster() + flush_local_cluster(testcluster) # now create uniform `cfg` for remaining SLURM tests cfg = StructDict() @@ -459,7 +431,6 @@ def test_parallel(self, testcluster): spec = freqanalysis(artdata, cfg) assert spec.data.is_virtual assert len(spec.data.virtual_sources()) == fileCount[k] - _flush_local_cluster() # non-equidistant trial spacing cfg.keeptapers = False @@ -476,7 +447,6 @@ def test_parallel(self, testcluster): assert spec.freq.size == freqs.size assert np.allclose(spec.freq, freqs) assert spec.taper.size == 1 - _flush_local_cluster() # equidistant trial spacing, keep tapers cfg.output = "abs" @@ -486,7 +456,6 @@ def test_parallel(self, testcluster): for k, chan_per_worker in enumerate([None, chanPerWrkr]): spec = freqanalysis(artdata, cfg) assert spec.taper.size > 1 - _flush_local_cluster() # non-equidistant, overlapping trial spacing, throw away trials and tapers cfg.keeptapers = False @@ -496,7 +465,6 @@ def test_parallel(self, testcluster): inmemory=False, equidistant=False, overlapping=True) spec = freqanalysis(artdata, cfg) - _flush_local_cluster() timeAxis = artdata.dimord.index("time") maxtrlno = np.diff(artdata.sampleinfo).argmax() tmp = padding(artdata.trials[maxtrlno], "zero", spec.cfg.pad, @@ -549,15 +517,12 @@ def test_tf_output(self): cfg.select = select cfg.output = "fourier" tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() assert "complex" in tfSpec.data.dtype.name cfg.output = "abs" tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() assert "float" in tfSpec.data.dtype.name cfg.output = "pow" tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() assert "float" in tfSpec.data.dtype.name def test_tf_allocout(self): @@ -569,7 +534,6 @@ def test_tf_allocout(self): assert out.taper.size == 1 assert out.freq.size == self.tfData.samplerate / 2 + 1 assert out.channel.size == self.nChannels - _flush_local_cluster() # build `cfg` object for calling cfg = StructDict() @@ -588,7 +552,6 @@ def test_tf_allocout(self): assert len(cfg.out.time[0]) == trLen assert np.all(cfg.out.sampleinfo == [0, trLen]) assert cfg.out.data.shape[0] == trLen # ensure trial-count == 1 - _flush_local_cluster() # keep trials but throw away tapers out = SpectralData(dimord=SpectralData._defaultDimord) @@ -597,7 +560,6 @@ def test_tf_allocout(self): out=out) assert out.sampleinfo.shape == (self.nTrials, 2) assert out.taper.size == 1 - _flush_local_cluster() # re-use `cfg` from above and additionally throw away `tapers` cfg.dataset = self.tfData @@ -607,7 +569,6 @@ def test_tf_allocout(self): cfg.output = "pow" freqanalysis(cfg) assert cfg.out.taper.size == 1 - _flush_local_cluster() def test_tf_solution(self): # Compute "full" non-overlapping TF spectrum, i.e., center analysis windows @@ -633,14 +594,11 @@ def test_tf_solution(self): # Compute TF objects w\w/o`foi`/`foilim` cfg.select = select tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foi = maxFreqs tfSpecFoi = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foi = None cfg.foilim = [maxFreqs.min(), maxFreqs.max()] tfSpecFoiLim = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foilim = None # Ensure TF objects contain expected/requested frequencies @@ -741,7 +699,6 @@ def test_tf_toi(self): tStep = winsize - toi * winsize timeArr = np.arange(tStart, tStop, tStep) assert np.allclose(timeArr, tfSpec.time[0]) - _flush_local_cluster() # Test window-centroids specified as time-point arrays cfg.t_ftimwin = 0.05 @@ -750,13 +707,11 @@ def test_tf_toi(self): tfSpec = freqanalysis(cfg, self.tfData) assert np.allclose(cfg.toi, tfSpec.time[0]) assert tfSpec.samplerate == 1/(toi[1] - toi[0]) - _flush_local_cluster() # Unevenly sampled array: timing currently in lala-land, but sizes must match cfg.toi = [-5, 3, 10] tfSpec = freqanalysis(cfg, self.tfData) assert tfSpec.time[0].size == len(cfg.toi) - _flush_local_cluster() # Test correct time-array assembly for ``toi = "all"`` (cut down data signifcantly # to not overflow memory here); same for ``toi = 1.0``` @@ -767,7 +722,6 @@ def test_tf_toi(self): cfg.toi = "all" cfg.t_ftimwin = 0.05 tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() assert tfSpec.taper.size > 1 dt = 1/self.tfData.samplerate timeArr = np.arange(cfg.select["toilim"][0], cfg.select["toilim"][1] + dt, dt) @@ -775,7 +729,6 @@ def test_tf_toi(self): cfg.toi = 1.0 tfSpec = freqanalysis(cfg, self.tfData) assert np.allclose(tfSpec.time[0], timeArr) - _flush_local_cluster() # Use a window-size larger than the pre-selected interval defined above cfg.t_ftimwin = 5.0 @@ -814,7 +767,6 @@ def test_tf_irregular_trials(self): artdata = generate_artificial_data(nTrials=5, nChannels=16, equidistant=True, inmemory=False) tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() assert tfSpec.taper.size > 1 for tk, origTime in enumerate(artdata.time): assert np.array_equal(np.unique(np.floor(origTime)), tfSpec.time[tk]) @@ -824,7 +776,6 @@ def test_tf_irregular_trials(self): artdata = generate_artificial_data(nTrials=5, nChannels=4, equidistant=True, inmemory=False) tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() for tk, origTime in enumerate(artdata.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -833,13 +784,11 @@ def test_tf_irregular_trials(self): artdata = generate_artificial_data(nTrials=5, nChannels=8, equidistant=False, inmemory=False) tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() assert tfSpec.taper.size > 1 for tk, origTime in enumerate(artdata.time): assert np.array_equal(np.unique(np.floor(origTime)), tfSpec.time[tk]) cfg.toi = "all" tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() for tk, origTime in enumerate(artdata.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -849,13 +798,11 @@ def test_tf_irregular_trials(self): equidistant=False, inmemory=False, dimord=AnalogData._defaultDimord[::-1]) tfSpec = freqanalysis(cfg) - _flush_local_cluster() assert tfSpec.taper.size > 1 for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(np.unique(np.floor(origTime)), tfSpec.time[tk]) cfg.toi = "all" tfSpec = freqanalysis(cfg) - _flush_local_cluster() for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -866,13 +813,11 @@ def test_tf_irregular_trials(self): dimord=AnalogData._defaultDimord[::-1], overlapping=True) tfSpec = freqanalysis(cfg) - _flush_local_cluster() assert tfSpec.taper.size > 1 for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(np.unique(np.floor(origTime)), tfSpec.time[tk]) cfg.toi = "all" tfSpec = freqanalysis(cfg) - _flush_local_cluster() for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -884,7 +829,7 @@ def test_tf_parallel(self, testcluster): if (inspect.ismethod(getattr(self, attr)) and attr != "test_tf_parallel")] for test in all_tests: getattr(self, test)() - _flush_local_cluster() + flush_local_cluster(testcluster) # now create uniform `cfg` for remaining SLURM tests cfg = StructDict() @@ -908,7 +853,6 @@ def test_tf_parallel(self, testcluster): tfSpec = freqanalysis(artdata, cfg) assert tfSpec.data.is_virtual assert len(tfSpec.data.virtual_sources()) == fileCount[k] - _flush_local_cluster() # non-equidistant trial spacing cfg.keeptapers = False @@ -919,7 +863,6 @@ def test_tf_parallel(self, testcluster): tfSpec = freqanalysis(artdata, cfg) assert np.array_equal(tfSpec.freq, expectedFreqs) assert tfSpec.taper.size == 1 - _flush_local_cluster() # equidistant trial spacing, keep tapers cfg.output = "abs" @@ -931,7 +874,6 @@ def test_tf_parallel(self, testcluster): for chan_per_worker in enumerate([None, chanPerWrkr]): tfSpec = freqanalysis(artdata, cfg) assert tfSpec.taper.size > 1 - _flush_local_cluster() # overlapping trial spacing, throw away trials and tapers cfg.keeptapers = False @@ -946,7 +888,6 @@ def test_tf_parallel(self, testcluster): assert tfSpec.taper.size == 1 assert np.array_equal(np.unique(np.floor(artdata.time[0])), tfSpec.time[0]) assert tfSpec.data.shape == (tfSpec.time[0].size, 1, expectedFreqs.size, self.nChannels) - _flush_local_cluster() client.close() @@ -1009,14 +950,11 @@ def test_wav_solution(self): # Compute TF objects w\w/o`foi`/`foilim` cfg.select = select tfSpec = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foi = maxFreqs tfSpecFoi = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foi = None cfg.foilim = [maxFreqs.min(), maxFreqs.max()] tfSpecFoiLim = freqanalysis(cfg, self.tfData) - _flush_local_cluster() cfg.foilim = None # Ensure TF objects contain expected/requested frequencies @@ -1096,7 +1034,6 @@ def test_wav_toi(self): cfg.wav = "Morlet" cfg.output = "pow" cfg.keeptrials = False - _flush_local_cluster() # Test time-point arrays comprising onset, purely pre-onset, purely after # onset and non-unit spacing @@ -1112,7 +1049,6 @@ def test_wav_toi(self): tfSpec = freqanalysis(cfg, self.tfData) assert np.allclose(cfg.toi, tfSpec.time[0]) assert tfSpec.samplerate == 1/(toi[1] - toi[0]) - _flush_local_cluster() # Test correct time-array assembly for ``toi = "all"`` (cut down data signifcantly # to not overflow memory here) @@ -1122,7 +1058,6 @@ def test_wav_toi(self): dt = 1/self.tfData.samplerate timeArr = np.arange(cfg.select["toilim"][0], cfg.select["toilim"][1] + dt, dt) assert np.allclose(tfSpec.time[0], timeArr) - _flush_local_cluster() # Use `toi` array outside trial boundaries cfg.toi = self.tfData.time[0][:10] @@ -1144,13 +1079,11 @@ def test_wav_irregular_trials(self): cfg.wav = "Morlet" cfg.output = "pow" cfg.toi = "all" - _flush_local_cluster() # start harmless: equidistant trials w/multiple tapers artdata = generate_artificial_data(nTrials=5, nChannels=16, equidistant=True, inmemory=False) tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() for tk, origTime in enumerate(artdata.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -1158,7 +1091,6 @@ def test_wav_irregular_trials(self): artdata = generate_artificial_data(nTrials=5, nChannels=16, equidistant=False, inmemory=False) tfSpec = freqanalysis(artdata, **cfg) - _flush_local_cluster() for tk, origTime in enumerate(artdata.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -1167,7 +1099,6 @@ def test_wav_irregular_trials(self): equidistant=False, inmemory=False, dimord=AnalogData._defaultDimord[::-1]) tfSpec = freqanalysis(cfg) - _flush_local_cluster() for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -1177,7 +1108,6 @@ def test_wav_irregular_trials(self): dimord=AnalogData._defaultDimord[::-1], overlapping=True) tfSpec = freqanalysis(cfg) - _flush_local_cluster() for tk, origTime in enumerate(cfg.data.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -1189,7 +1119,7 @@ def test_wav_parallel(self, testcluster): if (inspect.ismethod(getattr(self, attr)) and attr != "test_wav_parallel")] for test in all_tests: getattr(self, test)() - _flush_local_cluster() + flush_local_cluster(testcluster) # now create uniform `cfg` for remaining SLURM tests cfg = StructDict() @@ -1212,7 +1142,6 @@ def test_wav_parallel(self, testcluster): tfSpec = freqanalysis(artdata, cfg) assert tfSpec.data.is_virtual assert len(tfSpec.data.virtual_sources()) == fileCount[k] - _flush_local_cluster() # non-equidistant trial spacing cfg.keeptapers = False @@ -1222,7 +1151,6 @@ def test_wav_parallel(self, testcluster): tfSpec = freqanalysis(artdata, cfg) assert 1 > tfSpec.freq.min() > 0 assert tfSpec.freq.max() == (self.tfData.samplerate / 2) - _flush_local_cluster() # equidistant trial spacing cfg.output = "abs" @@ -1230,7 +1158,6 @@ def test_wav_parallel(self, testcluster): inmemory=False) for chan_per_worker in enumerate([None, chanPerWrkr]): tfSpec = freqanalysis(artdata, cfg) - _flush_local_cluster() for tk, origTime in enumerate(artdata.time): assert np.array_equal(origTime, tfSpec.time[tk]) @@ -1244,6 +1171,5 @@ def test_wav_parallel(self, testcluster): tfSpec = freqanalysis(artdata, cfg) assert np.allclose(tfSpec.freq, expectedFreqs) assert tfSpec.data.shape == (tfSpec.time[0].size, 1, expectedFreqs.size, self.nChannels) - _flush_local_cluster() client.close()