Skip to content

Commit

Permalink
FIX: More testing road bumps
Browse files Browse the repository at this point in the history
- check for open file descriptor limit before running the tests
- use `/cs/home` by default as temp storage on the ESI cluster
- new routine `flush_local_cluster` is used throughout testing pipeline to
  free up memory for `LocalCluster` computing

 On branch acme_amputation
 Changes to be committed:
	modified:   syncopy/__init__.py
	modified:   syncopy/tests/conftest.py
	modified:   syncopy/tests/misc.py
	modified:   syncopy/tests/test_continuousdata.py
	modified:   syncopy/tests/test_discretedata.py
	modified:   syncopy/tests/test_selectdata.py
	modified:   syncopy/tests/test_specest.py
  • Loading branch information
pantaray committed Oct 27, 2021
1 parent b00f278 commit 67b7f48
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 139 deletions.
7 changes: 6 additions & 1 deletion syncopy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import sys
import subprocess
import getpass
import numpy as np
from hashlib import blake2b, sha1
from importlib.metadata import version, PackageNotFoundError
Expand Down Expand Up @@ -59,10 +60,14 @@
__plt__ = False

# Set package-wide temp directory
csHome = "/cs/home/{}".format(getpass.getuser())
if os.environ.get("SPYTMPDIR"):
__storage__ = os.path.abspath(os.path.expanduser(os.environ["SPYTMPDIR"]))
else:
__storage__ = os.path.join(os.path.expanduser("~"), ".spy")
if os.path.exists(csHome):
__storage__ = os.path.join(csHome, ".spy")
else:
__storage__ = os.path.join(os.path.expanduser("~"), ".spy")

# Set upper bound for temp directory size (in GB)
__storagelimit__ = 10
Expand Down
7 changes: 5 additions & 2 deletions syncopy/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
# skipped anyway)
if __acme__:
import dask.distributed as dd
import resource
from acme.dask_helpers import esi_cluster_setup
from syncopy.tests.misc import is_slurm_node
if max(resource.getrlimit(resource.RLIMIT_NOFILE)) < 1024:
msg = "Not enough open file descriptors allowed. Consider increasing " +\
"the limit using, e.g., `ulimit -Sn 1024`"
raise ValueError(msg)
if is_slurm_node():
os.environ["SPYTMPDIR"] = "/cs/home/{}/.spy".format(os.environ["USER"])
importlib.reload(syncopy)
cluster = esi_cluster_setup(partition="8GBS", n_jobs=10,
timeout=360, interactive=False,
start_client=False)
Expand Down
116 changes: 69 additions & 47 deletions syncopy/tests/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import os
import h5py
import tempfile
import time
import numpy as np

# Local imports
from syncopy.datatype import AnalogData
from syncopy.shared.filetypes import _data_classname_to_extension, FILE_EXT
from syncopy import __plt__
from syncopy import __plt__, __acme__
if __plt__:
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg
if __acme__:
import dask.distributed as dd


def is_win_vm():
"""
Expand Down Expand Up @@ -59,12 +63,12 @@ def is_slurm_node():
else:
return False


def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None,
overlapping=False, inmemory=True, dimord="default"):
"""
Create :class:`~syncopy.AnalogData` object with synthetic harmonic signal(s)
Parameters
----------
nTrials : int
Expand All @@ -74,45 +78,45 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None
equidistant : bool
If `True`, trials of equal length are defined
seed : None or int
If `None`, imposed noise is completely random. If `seed` is an integer,
it is used to fix the (initial) state of NumPy's random number generator
:func:`numpy.random.default_rng`, i.e., objects created wtih same `seed`
will be populated with identical artificial signals.
If `None`, imposed noise is completely random. If `seed` is an integer,
it is used to fix the (initial) state of NumPy's random number generator
:func:`numpy.random.default_rng`, i.e., objects created wtih same `seed`
will be populated with identical artificial signals.
overlapping : bool
If `True`, constructed trials overlap
inmemory : bool
If `True`, the full `data` array (all channels across all trials) is allocated
in memory (fast but dangerous for large arrays), otherwise the output data
object's corresponding backing HDF5 file in `__storage__` is filled with
synthetic data in a trial-by-trial manner (slow but safe even for very
large datasets).
in memory (fast but dangerous for large arrays), otherwise the output data
object's corresponding backing HDF5 file in `__storage__` is filled with
synthetic data in a trial-by-trial manner (slow but safe even for very
large datasets).
dimord : str or list
If `dimord` is "default", the constructed output object uses the default
dimensional layout of a standard :class:`~syncopy.AnalogData` object.
dimensional layout of a standard :class:`~syncopy.AnalogData` object.
If `dimord` is a list (i.e., ``["channel", "time"]``) the provided sequence
of dimensions is used.
of dimensions is used.
Returns
-------
out : :class:`~syncopy.AnalogData` object
Syncopy :class:`~syncopy.AnalogData` object with specified properties
populated with a synthetic multivariate trigonometric signal.
Syncopy :class:`~syncopy.AnalogData` object with specified properties
populated with a synthetic multivariate trigonometric signal.
Notes
-----
This is an auxiliary method that is intended purely for internal use. Thus,
no error checking is performed.
This is an auxiliary method that is intended purely for internal use. Thus,
no error checking is performed.
Examples
--------
Generate small artificial :class:`~syncopy.AnalogData` object in memory
.. code-block:: python
>>> iAmSmall = generate_artificial_data(nTrials=5, nChannels=10, inmemory=True)
>>> iAmSmall
Syncopy AnalogData object with fields
cfg : dictionary with keys ''
channel : [10] element <class 'numpy.ndarray'>
container : None
Expand All @@ -126,18 +130,18 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None
time : 5 element list
trialinfo : [5 x 0] element <class 'numpy.ndarray'>
trials : 5 element iterable
Use `.log` to see object history
Generate artificial :class:`~syncopy.AnalogData` object of more substantial
Generate artificial :class:`~syncopy.AnalogData` object of more substantial
size on disk
.. code-block:: python
>>> iAmBig = generate_artificial_data(nTrials=50, nChannels=1024, inmemory=False)
>>> iAmBig
Syncopy AnalogData object with fields
cfg : dictionary with keys ''
channel : [1024] element <class 'numpy.ndarray'>
container : None
Expand All @@ -151,9 +155,9 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None
time : 200 element list
trialinfo : [200 x 0] element <class 'numpy.ndarray'>
trials : 200 element iterable
Use `.log` to see object history
"""

# Create dummy 1d signal that will be blown up to fill channels later
Expand All @@ -170,23 +174,23 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None
idx[timeAxis] = -1
sig = np.repeat(sig.reshape(*idx), axis=idx.index(1), repeats=nChannels)

# Initialize random number generator (with possibly user-provided seed-value)
rng = np.random.default_rng(seed)
# Initialize random number generator (with possibly user-provided seed-value)
rng = np.random.default_rng(seed)

# Either construct the full data array in memory using tiling or create
# an HDF5 container in `__storage__` and fill it trial-by-trial
# NOTE: use `swapaxes` here to ensure two objects created w/same seed really
# are affected w/identical additive noise patterns, no matter their respective
# NOTE: use `swapaxes` here to ensure two objects created w/same seed really
# are affected w/identical additive noise patterns, no matter their respective
# `dimord`.
out = AnalogData(samplerate=1/dt, dimord=dimord)
if inmemory:
idx[timeAxis] = nTrials
idx[timeAxis] = nTrials
sig = np.tile(sig, idx)
shp = [slice(None), slice(None)]
for iTrial in range(nTrials):
shp[timeAxis] = slice(iTrial*t.size, (iTrial + 1)*t.size)
noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5
sig[tuple(shp)] += np.swapaxes(noise, timeAxis, 0)
sig[tuple(shp)] += np.swapaxes(noise, timeAxis, 0)
out.data = sig
else:
with h5py.File(out.filename, "w") as h5f:
Expand All @@ -197,7 +201,7 @@ def generate_artificial_data(nTrials=2, nChannels=2, equidistant=True, seed=None
for iTrial in range(nTrials):
shp[timeAxis] = slice(iTrial*t.size, (iTrial + 1)*t.size)
noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5
dset[tuple(shp)] = sig + np.swapaxes(noise, timeAxis, 0)
dset[tuple(shp)] = sig + np.swapaxes(noise, timeAxis, 0)
dset.flush()
out.data = h5py.File(out.filename, "r+")["data"]

Expand Down Expand Up @@ -235,32 +239,32 @@ def construct_spy_filename(basepath, obj):
objext = _data_classname_to_extension(obj.__class__.__name__)
return os.path.join(basepath + FILE_EXT["dir"], basename + objext)


def figs_equal(fig1, fig2, tol=None):
"""
Test if two figures are identical
Parameters
----------
fig1 : matplotlib figure object
Reference figure
fig2 : matplotlib figure object
Template figure
tol : float
Positive scalar (b/w 0 and 1) specifying tolerance level for considering
`fig1` and `fig2` identical. If `None`, two figures have to be exact
pixel-perfect copies to be qualified as identical.
Positive scalar (b/w 0 and 1) specifying tolerance level for considering
`fig1` and `fig2` identical. If `None`, two figures have to be exact
pixel-perfect copies to be qualified as identical.
Returns
-------
equal : bool
`True` if `fig1` and `fig2` are identical, `False` otherwise
Notes
-----
This is an auxiliary method that is intended purely for internal use. Thus,
no error checking is performed.
This is an auxiliary method that is intended purely for internal use. Thus,
no error checking is performed.
Examples
--------
>>> import numpy as np
Expand All @@ -283,3 +287,21 @@ def figs_equal(fig1, fig2, tol=None):
if tol is None:
return np.array_equal(plt.imread(img1.name), plt.imread(img2.name))
return np.allclose(plt.imread(img1.name), plt.imread(img2.name), atol=tol)


def flush_local_cluster(testcluster, timeout=10):
"""
Resets a parallel computing client to avoid memory spilling
"""
if isinstance(testcluster, dd.LocalCluster):
# client.restart()
client = dd.get_client()
client.close()
time.sleep(1.0)
client = dd.Client(testcluster)
waiting = 0
while len([w["memory_limit"] for w in testcluster.scheduler_info["workers"].values()]) == 0 \
and waiting < timeout:
time.sleep(1.0)
waiting += 1
return
4 changes: 3 additions & 1 deletion syncopy/tests/test_continuousdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from syncopy.datatype.methods.selectdata import selectdata
from syncopy.shared.errors import SPYValueError, SPYTypeError
from syncopy.shared.tools import StructDict
from syncopy.tests.misc import generate_artificial_data, construct_spy_filename
from syncopy.tests.misc import flush_local_cluster, generate_artificial_data, construct_spy_filename
from syncopy import __acme__
if __acme__:
import dask.distributed as dd
Expand Down Expand Up @@ -455,6 +455,7 @@ def test_parallel(self, testcluster):
"test_dataselection"]
for test in par_tests:
getattr(self, test)()
flush_local_cluster(testcluster)
client.close()


Expand Down Expand Up @@ -668,4 +669,5 @@ def test_sd_parallel(self, testcluster):
par_tests = ["test_sd_dataselection"]
for test in par_tests:
getattr(self, test)()
flush_local_cluster(testcluster)
client.close()
20 changes: 11 additions & 9 deletions syncopy/tests/test_discretedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from syncopy.datatype.methods.selectdata import selectdata
from syncopy.io import save, load
from syncopy.shared.errors import SPYValueError, SPYTypeError
from syncopy.tests.misc import construct_spy_filename
from syncopy.tests.misc import construct_spy_filename, flush_local_cluster
from syncopy import __acme__
if __acme__:
import dask.distributed as dd
Expand Down Expand Up @@ -222,6 +222,7 @@ def test_parallel(self, testcluster):
par_tests = ["test_dataselection"]
for test in par_tests:
getattr(self, test)()
flush_local_cluster(testcluster)
client.close()

class TestEventData():
Expand All @@ -244,7 +245,7 @@ class TestEventData():

adata = np.arange(1, nc * ns + 1).reshape(ns, nc)

def test_empty(self):
def test_ed_empty(self):
dummy = EventData()
assert len(dummy.cfg) == 0
assert dummy.dimord == None
Expand All @@ -253,7 +254,7 @@ def test_empty(self):
with pytest.raises(SPYTypeError):
EventData({})

def test_nparray(self):
def test_ed_nparray(self):
dummy = EventData(self.data)
assert dummy.dimord == ["sample", "eventid"]
assert dummy.eventid.size == self.num_evt
Expand All @@ -267,7 +268,7 @@ def test_nparray(self):
with pytest.raises(SPYValueError):
EventData(np.ones((3,)))

def test_trialretrieval(self):
def test_ed_trialretrieval(self):
# test ``_get_trial`` with NumPy array: regular order
dummy = EventData(self.data, trialdefinition=self.trl)
smp = self.data[:, 0]
Expand All @@ -287,7 +288,7 @@ def test_trialretrieval(self):
trl_ref = self.data2[idx, ...]
assert np.array_equal(dummy._get_trial(trlno), trl_ref)

def test_saveload(self):
def test_ed_saveload(self):
with tempfile.TemporaryDirectory() as tdir:
fname = os.path.join(tdir, "dummy")

Expand Down Expand Up @@ -338,7 +339,7 @@ def test_saveload(self):
del dummy, dummy2
time.sleep(0.1)

def test_trialsetting(self):
def test_ed_trialsetting(self):

# Create sampleinfo w/ EventData vs. AnalogData samplerate
sr_e = 2
Expand Down Expand Up @@ -471,7 +472,7 @@ def test_trialsetting(self):
ang_dummy.definetrial(evt_dummy, pre=pre, post=post, trigger=1)

# test data-selection via class method
def test_dataselection(self):
def test_ed_dataselection(self):
dummy = EventData(data=self.data,
trialdefinition=self.trl,
samplerate=2.0)
Expand Down Expand Up @@ -525,10 +526,11 @@ def test_dataselection(self):
assert np.array_equal(cfg.out.data, selected.data)

@skip_without_acme
def test_parallel(self, testcluster):
def test_ed_parallel(self, testcluster):
# repeat selected test w/parallel processing engine
client = dd.Client(testcluster)
par_tests = ["test_dataselection"]
par_tests = ["test_ed_dataselection"]
for test in par_tests:
getattr(self, test)()
flush_local_cluster(testcluster)
client.close()
Loading

0 comments on commit 67b7f48

Please sign in to comment.