Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize feats extraction with opensmile #181

Merged
merged 8 commits into from
Nov 15, 2024
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ torch = "~=2.4"
torchvision = "~=0.19"
torchaudio = "~=2.4"
transformers = "~=4.46.2"
pydra = "~=0.23"
pydra = "~=0.25"
pydantic = "~=2.7"
accelerate = "*"
huggingface-hub = "~=0.23"
Expand Down
4 changes: 4 additions & 0 deletions src/senselab/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
""".. include:: ../../README.md""" # noqa: D415

from multiprocessing import set_start_method

import nest_asyncio

nest_asyncio.apply()

from senselab.utils.data_structures.pydra_helpers import * # NOQA

set_start_method("spawn", force=True)
satra marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 9 additions & 3 deletions src/senselab/audio/tasks/features_extraction/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,13 @@ def extract_features_from_audios(
'si_sdr': 11.71167278289795}}]
"""
if opensmile:
default_opensmile = {"feature_set": "eGeMAPSv02", "feature_level": "Functionals"}
default_opensmile = {
"feature_set": "eGeMAPSv02",
"feature_level": "Functionals",
"plugin": "serial",
"plugin_args": {},
"cache_dir": None,
}
if isinstance(opensmile, dict):
my_opensmile = {**default_opensmile, **opensmile}
else:
Expand All @@ -330,7 +336,7 @@ def extract_features_from_audios(
"duration": True,
"jitter": True,
"shimmer": True,
"plugin": "cf",
"plugin": "serial",
"plugin_args": {},
}
# Update default_parselmouth with provided parselmouth dictionary
Expand All @@ -350,7 +356,7 @@ def extract_features_from_audios(
"n_mfcc": 40,
"win_length": None,
"hop_length": None,
"plugin": "cf",
"plugin": "serial",
"plugin_args": {},
"cache_dir": None,
}
Expand Down
89 changes: 72 additions & 17 deletions src/senselab/audio/tasks/features_extraction/opensmile.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
"""This module contains functions for extracting openSMILE features."""
"""This module contains functions for extracting openSMILE features.

from typing import Any, Dict, List
It includes a factory class for managing openSMILE feature extractors, ensuring
each extractor is created only once per feature set and feature level. The main
function, `extract_opensmile_features_from_audios`, applies feature extraction
across a list of audio samples using openSMILE, managed as a Pydra workflow
for parallel processing. This approach supports efficient and scalable feature
extraction across multiple audio files.
"""

import os
from typing import Any, Dict, List, Optional

import numpy as np
import opensmile
import pydra

from senselab.audio.data_structures import Audio


class OpenSmileFeatureExtractorFactory:
"""A factory for managing openSMILE feature extractors."""
"""A factory for managing openSMILE feature extractors.

This class creates and caches openSMILE feature extractors, allowing for
efficient reuse. It ensures only one instance of each feature extractor
exists per unique combination of `feature_set` and `feature_level`.
"""

_extractors: Dict[str, opensmile.Smile] = {}
_extractors: Dict[str, opensmile.Smile] = {} # Cache for feature extractors

@classmethod
def get_opensmile_extractor(cls, feature_set: str, feature_level: str) -> opensmile.Smile:
Expand All @@ -24,31 +39,40 @@ def get_opensmile_extractor(cls, feature_set: str, feature_level: str) -> opensm
Returns:
opensmile.Smile: The openSMILE feature extractor.
"""
key = f"{feature_set}-{feature_level}"
if key not in cls._extractors:
key = f"{feature_set}-{feature_level}" # Unique key for each feature extractor
if key not in cls._extractors: # Check if extractor exists in cache
# Create and store a new extractor if not found in cache
cls._extractors[key] = opensmile.Smile(
feature_set=opensmile.FeatureSet[feature_set],
feature_level=opensmile.FeatureLevel[feature_level],
)
return cls._extractors[key]
return cls._extractors[key] # Return cached or newly created extractor


def extract_opensmile_features_from_audios(
audios: List[Audio],
feature_set: str = "eGeMAPSv02",
feature_level: str = "Functionals",
plugin: str = "serial",
plugin_args: Optional[Dict[str, Any]] = {},
cache_dir: Optional[str | os.PathLike] = None,
) -> List[Dict[str, Any]]:
"""Apply feature extraction across a list of audio files.
"""Extract openSMILE features from a list of audio files using Pydra workflow.

This function sets up a Pydra workflow for parallel processing of openSMILE
feature extraction on a list of audio samples. Each sample's features are
extracted and formatted as dictionaries.

Args:
audios (List[Audio]): The list of audio objects to extract features from.
feature_set (str): The openSMILE feature set
(default is "eGeMAPSv02". The alternatives include "ComParE_2016").
feature_level (str): The openSMILE feature level
(default is "Functionals". The alternative is "LowLevelDescriptors").
feature_set (str): The openSMILE feature set (default is "eGeMAPSv02").
feature_level (str): The openSMILE feature level (default is "Functionals").
plugin (str): The Pydra plugin to use (default is "serial").
plugin_args (Optional[Dict[str, Any]]): Additional arguments for the Pydra plugin.
cache_dir (Optional[str | os.PathLike]): The path to the Pydra cache directory.

Returns:
List[Dict[str, Any]]: The list of feature dictionaries for each audio.
List[Dict[str, Any]]: A list of dictionaries, each containing extracted features.
"""

def _extract_feats_from_audio(sample: Audio, smile: opensmile.Smile) -> Dict[str, Any]:
Expand All @@ -61,19 +85,50 @@ def _extract_feats_from_audio(sample: Audio, smile: opensmile.Smile) -> Dict[str
Returns:
Dict[str, Any]: The extracted features as a dictionary.
"""
# Convert audio tensor to a NumPy array for processing
audio_array = sample.waveform.squeeze().numpy()
sampling_rate = sample.sampling_rate
sampling_rate = sample.sampling_rate # Get sampling rate from Audio object
try:
# Process the audio and extract features
sample_features = smile.process_signal(audio_array, sampling_rate)
# Convert to a dictionary with float values and return it
# Convert features to a dictionary and handle single-item lists
return {
k: v[0] if isinstance(v, list) and len(v) == 1 else v
for k, v in sample_features.to_dict("list").items()
}
except Exception as e:
# Log error and return NaNs if feature extraction fails
print(f"Error processing sample {sample.orig_path_or_id}: {e}")
return {feature: np.nan for feature in smile.feature_names}

# Decorate the feature extraction function for Pydra
_extract_feats_from_audio_pt = pydra.mark.task(_extract_feats_from_audio)

# Obtain the feature extractor using the factory
smile = OpenSmileFeatureExtractorFactory.get_opensmile_extractor(feature_set, feature_level)
features = [_extract_feats_from_audio(audio, smile) for audio in audios]
return features

# Create a Pydra workflow, split it over the list of audio samples
wf = pydra.Workflow(name="wf", input_spec=["x"], cache_dir=cache_dir)
wf.split("x", x=audios) # Each audio is treated as a separate task
# Add feature extraction task to the workflow
wf.add(_extract_feats_from_audio_pt(name="_extract_feats_from_audio_pt", sample=wf.lzin.x, smile=smile))

# Set workflow output to the results of each audio feature extraction
wf.set_output([("opensmile", wf._extract_feats_from_audio_pt.lzout.out)])

# Run the workflow using the specified Pydra plugin and arguments
with pydra.Submitter(plugin=plugin, **plugin_args) as sub:
sub(wf)

# Retrieve results from the completed workflow
outputs = wf.result()

# Format the outputs into a list of dictionaries
formatted_output: List[Dict[str, Any]] = []
for output in outputs:
# Extract features and organize into a dictionary
formatted_output_item = {
f"{feature}": output.output.opensmile[f"{feature}"] for feature in output.output.opensmile
}
formatted_output.append(formatted_output_item) # Append to final output list
return formatted_output # Return the list of formatted feature dictionaries
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ def extract_praat_parselmouth_features_from_audios(
duration: bool = True,
jitter: bool = True,
shimmer: bool = True,
plugin: str = "cf",
plugin: str = "serial",
plugin_args: Dict[str, Any] = {},
) -> List[Dict[str, Any]]:
"""Extract features from a list of Audio objects and return a JSON-like dictionary.
Expand All @@ -1138,7 +1138,7 @@ def extract_praat_parselmouth_features_from_audios(
duration (bool): Whether to extract duration. Defaults to True.
jitter (bool): Whether to extract jitter. Defaults to True.
shimmer (bool): Whether to extract shimmer. Defaults to True.
plugin (str): Plugin to use for feature extraction. Defaults to "cf".
plugin (str): Plugin to use for feature extraction. Defaults to "serial".
plugin_args (Optional[Dict[str, Any]]): Arguments for the pydra plugin. Defaults to {}.

Returns:
Expand Down
4 changes: 2 additions & 2 deletions src/senselab/audio/tasks/features_extraction/torchaudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def extract_torchaudio_features_from_audios(
n_mfcc: int = 40,
win_length: Optional[int] = None,
hop_length: Optional[int] = None,
plugin: str = "cf",
plugin: str = "serial",
plugin_args: Optional[Dict[str, Any]] = {},
cache_dir: Optional[str | os.PathLike] = None,
) -> List[Dict[str, Any]]:
Expand All @@ -258,7 +258,7 @@ def extract_torchaudio_features_from_audios(
n_mfcc (int): Number of MFCCs. Default is 40.
win_length (int): Window size. Default is None, using n_fft.
hop_length (int): Length of hop between STFT windows. Default is None, using win_length // 2.
plugin (str): The plugin to use. Default is "cf".
plugin (str): The plugin to use. Default is "serial".
plugin_args (Optional[Dict[str, Any]]): The arguments to pass to the plugin. Default is {}.
cache_dir (Optional[str | os.PathLike]): The directory to cache the results. Default is None.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
from senselab.audio.tasks.features_extraction.praat_parselmouth import extract_praat_parselmouth_features_from_audios


def extract_health_measurements(audios: List[Audio], cache_dir: Optional[str] = None) -> List[Dict[str, Any]]:
def extract_health_measurements(
audios: List[Audio], plugin: str = "serial", plugin_args: Dict[str, Any] = {}, cache_dir: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Extract health measurements from audio files.

Args:
audios (List[Audio]): List of Audio objects.
plugin (str): Plugin to use for feature extraction. Defaults to "serial".
plugin_args (Dict[str, Any]): Dictionary of arguments for the feature extraction plugin.
cache_dir (Optional[str]): Directory to use for caching by pydra. Defaults to None.

Returns:
Expand Down Expand Up @@ -87,6 +91,8 @@ def extract_health_measurements(audios: List[Audio], cache_dir: Optional[str] =
return extract_praat_parselmouth_features_from_audios(
audios=audios,
cache_dir=cache_dir,
plugin=plugin,
plugin_args=plugin_args,
duration=False,
jitter=False,
shimmer=False,
Expand Down
61 changes: 61 additions & 0 deletions src/senselab/utils/data_structures/pydra_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Iterator

import numpy as np
import opensmile
import torch
from pydra.utils.hash import Cache, bytes_repr_sequence_contents, register_serializer

Expand All @@ -19,6 +20,66 @@ def bytes_repr_arraylike(obj: torch.Tensor, cache: Cache) -> Iterator[bytes]:
yield array.tobytes(order="C")


@register_serializer(opensmile.Smile)
def bytes_repr_smile(obj: opensmile.Smile, _cache: Cache) -> Iterator[bytes]:
"""Serializer for opensmile.Smile.

This function registers a custom serializer for instances of `opensmile.Smile`,
allowing Pydra's caching system to recognize and hash these objects based on
their configurations. By encoding essential attributes to bytes, we ensure that
identical configurations produce the same hash, facilitating efficient workflow caching.

Key Attributes Serialized:
- `feature_set`: The OpenSMILE feature set, e.g., `eGeMAPSv02`.
- `feature_level`: The feature level, e.g., `Functionals` or `LowLevelDescriptors`.
- `options`: A dictionary containing additional configurations for feature extraction.
- `logfile`: The log file path, if logging is enabled.
- `verbose`: Boolean indicating verbosity in logging.
- `column_names`: Column names of features generated by OpenSMILE, represented as an index.
- `feature_names`: List of specific feature names extracted by OpenSMILE.
- `hop_dur`: The hop duration for windowed feature extraction, if applicable.
- `name`: Name identifier for the OpenSMILE instance.
- `num_channels`: Number of audio channels expected by the instance.
- `num_features`: Number of features generated for each frame.
- `params`: Dictionary of internal configuration parameters such as `sampling_rate`, `channels`,
`mixdown`, `resample`, and other settings impacting feature extraction.
- `process_func_applies_sliding_window`: Indicates if a sliding window is applied in feature extraction.
- `win_dur`: Duration of each window frame, if applicable.

Args:
obj (opensmile.Smile): The `opensmile.Smile` instance to be serialized.
_cache (Cache): The Pydra cache object.

Usage:
This serializer is automatically used by Pydra to calculate a unique hash for `opensmile.Smile`
objects in workflows, ensuring consistent hashing based on the object's configurations. The
serializer helps avoid hash collisions in cases where `opensmile.Smile` instances have the same
internal settings but different object IDs in memory.

Returns:
Iterator[bytes]: Byte-encoded representations of each serialized attribute.
"""
_ = _cache # This is just to silence the unused parameter warning

yield f"{obj.__class__.__module__}{obj.__class__.__name__}:".encode()

# Serialize key configuration attributes
yield f"feature_set:{obj.feature_set}".encode()
yield f"feature_level:{obj.feature_level}".encode()
yield f"options:{obj.options}".encode()
yield f"logfile:{obj.logfile}".encode()
yield f"verbose:{obj.verbose}".encode()
yield f"column_names:{obj.column_names}".encode()
yield f"feature_names:{obj.feature_names}".encode()
yield f"hop_dur:{obj.hop_dur}".encode()
yield f"name:{obj.name}".encode()
yield f"num_channels:{obj.num_channels}".encode()
yield f"num_features:{obj.num_features}".encode()
yield f"params:{obj.params}".encode()
yield f"process_func_applies_sliding_window:{obj.process_func_applies_sliding_window}".encode()
yield f"win_dur:{obj.win_dur}".encode()


# TODO: Ignore this for now but need to decide how to incorporate Pydra into the package
# Pydra runner
# need function that allows for marking a task (could be obfuscated internally)
Expand Down
7 changes: 1 addition & 6 deletions src/tests/audio/tasks/features_extraction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,14 @@ def test_extract_pitch_from_audios(resampled_mono_audio_sample: Audio) -> None:
def test_extract_opensmile_features_from_audios(resampled_mono_audio_sample: Audio) -> None:
"""Test extraction of openSMILE features from audio."""
# Perform eGeMAPSv02 and Functionals features extraction
result = extract_opensmile_features_from_audios([resampled_mono_audio_sample])
result = extract_opensmile_features_from_audios([resampled_mono_audio_sample], plugin="cf")

# Assert the result is a list of dictionaries, and check each dictionary
assert isinstance(result, list)
assert all(isinstance(features, dict) for features in result)

# Ensure that each dictionary contains the expected keys (e.g., certain features from eGeMAPS)
expected_keys = {"F0semitoneFrom27.5Hz_sma3nz_amean", "jitterLocal_sma3nz_amean", "shimmerLocaldB_sma3nz_amean"}
print(result[0].keys())
for features in result:
assert set(map(str.lower, features.keys())).issuperset(map(str.lower, expected_keys))

Expand All @@ -228,7 +227,6 @@ def test_extract_opensmile_features_from_audios(resampled_mono_audio_sample: Aud
assert all(isinstance(value, (float, int)) for value in features.values())


@pytest.mark.skipif(not torch.cuda.is_available(), reason="GPU is not available")
def test_extract_objective_quality_features_from_audios(resampled_mono_audio_sample: Audio) -> None:
"""Test extraction of objective quality features from audio."""
result = extract_objective_quality_features_from_audios([resampled_mono_audio_sample])
Expand All @@ -242,14 +240,12 @@ def test_extract_objective_quality_features_from_audios(resampled_mono_audio_sam
assert isinstance(result[0]["si_sdr"], float)


@pytest.mark.skipif(not torch.cuda.is_available(), reason="GPU is not available")
def test_extract_objective_quality_features_from_audios_invalid_audio(mono_audio_sample: Audio) -> None:
"""Test extraction of objective quality features from invalid audio."""
with pytest.raises(ValueError, match="Only 16000 Hz sampling rate is supported by Torchaudio-Squim model."):
extract_objective_quality_features_from_audios([mono_audio_sample])


@pytest.mark.skipif(not torch.cuda.is_available(), reason="GPU is not available")
def test_extract_subjective_quality_features_from_audios(resampled_mono_audio_sample: Audio) -> None:
"""Test extraction of subjective quality features from audio."""
result = extract_subjective_quality_features_from_audios(
Expand All @@ -261,7 +257,6 @@ def test_extract_subjective_quality_features_from_audios(resampled_mono_audio_sa
assert isinstance(result[0]["mos"], float)


@pytest.mark.skipif(not torch.cuda.is_available(), reason="GPU is not available")
def test_extract_subjective_quality_features_invalid_audio(mono_audio_sample: Audio) -> None:
"""Test extraction of subjective quality features from invalid audio."""
with pytest.raises(ValueError, match="Only 16000 Hz sampling rate is supported by Torchaudio-Squim model."):
Expand Down
Loading
Loading