diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2ddf570 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.9-slim +ENV PYTHONUNBUFFERED=1 +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY . /app +RUN pip install --upgrade pip setuptools wheel +RUN pip install . +RUN pip install matplotlib pandas +ENTRYPOINT ["python", "examples/test.py"] +CMD ["--api_key", "", "--vitals_path", "examples/sample_vitals_1.csv", "--video_path", "examples/sample_video_1.mp4", "--method", "VITALLENS", "--input_str", "True"] diff --git a/README.md b/README.md index cbefb7e..8c1d1c7 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,51 @@ vl = VitalLens(method=Method.POS) result = vl(my_video_arr, fps=my_video_fps) ``` +### Example: Run example script with Docker + +If you encounter issues installing `vitallens-python` dependencies directly, you can use our Docker image, which contains all necessary tools and libraries. +This docker image is set up to execute the example Python script in `examples/test.py` for you. +Please note that the example script plots won't work when running them through Docker. + +#### Prerequisites + +- [Docker](https://docs.docker.com/engine/install/) installed on your system. + +#### Usage + +1. Clone the repository + +``` +git clone https://github.com/Rouast-Labs/vitallens-python.git && cd vitallens-python +``` + +2. Build the Docker image + +``` +docker build -t vitallens . +``` + +3. Run the Docker container + +To run the example script on the sample video: + +``` +docker run vitallens \ + --api_key "your_api_key_here" \ + --vitals_path "examples/sample_vitals_2.csv" \ + --video_path "examples/sample_video_2.mp4" \ + --method "VITALLENS" +``` + +You can also run it on your own video: + +``` +docker run vitallens \ + --api_key "your_api_key_here" \ + --video_path "path/to/your/video.mp4" \ + --method "VITALLENS" +``` + ## Linting and tests Before running tests, please make sure that you have an environment variable `VITALLENS_DEV_API_KEY` set to a valid API Key. diff --git a/examples/test.py b/examples/test.py index 1ab8512..58e307b 100644 --- a/examples/test.py +++ b/examples/test.py @@ -57,6 +57,8 @@ def run(args=None): stop = timeit.default_timer() time_ms = (stop-start)*1000 print("Inference time: {:.2f} ms".format(time_ms)) + # Print the results + print(result) # Plot the results vital_signs = result[0]['vital_signs'] if "respiratory_waveform" in vital_signs: diff --git a/pyproject.toml b/pyproject.toml index d00ad21..d352902 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ dependencies = [ "importlib_resources", "numpy", "onnxruntime", - "prpy[ffmpeg,numpy_min]>=0.2.12", + "prpy[ffmpeg,numpy_min]>=0.2.15", "python-dotenv", "pyyaml", "requests", diff --git a/tests/test_utils.py b/tests/test_utils.py index 01974e1..0c5dc86 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -19,13 +19,14 @@ # SOFTWARE. import numpy as np +from prpy.numpy.image import parse_image_inputs, probe_image_inputs import pytest import sys sys.path.append('../vitallens-python') from vitallens.client import Method -from vitallens.utils import load_config, probe_video_inputs, parse_video_inputs +from vitallens.utils import load_config from vitallens.utils import merge_faces, check_faces, check_faces_in_roi @pytest.mark.parametrize("method", [m for m in Method]) @@ -37,52 +38,47 @@ def test_load_config(method): def test_probe_video_inputs(request, file): if file: test_video_path = request.getfixturevalue('test_video_path') - video_shape, fps, i = probe_video_inputs(test_video_path) + video_shape, fps, i = probe_image_inputs(test_video_path) else: test_video_ndarray = request.getfixturevalue('test_video_ndarray') test_video_fps = request.getfixturevalue('test_video_fps') - video_shape, fps, i = probe_video_inputs(test_video_ndarray, fps=test_video_fps) + video_shape, fps, i = probe_image_inputs(test_video_ndarray, fps=test_video_fps) assert video_shape == (360, 480, 768, 3) assert fps == 30 assert i == False def test_probe_video_inputs_no_file(): with pytest.raises(Exception): - _ = probe_video_inputs("does_not_exist", fps="fps") + _ = probe_image_inputs("does_not_exist", fps="fps") def test_probe_video_inputs_wrong_fps(request): with pytest.raises(Exception): test_video_path = request.getfixturevalue('test_video_path') - _ = probe_video_inputs(test_video_path, fps="fps") + _ = probe_image_inputs(test_video_path, fps="fps") def test_probe_video_inputs_no_fps(request): test_video_ndarray = request.getfixturevalue('test_video_ndarray') with pytest.raises(Exception): - _ = probe_video_inputs(test_video_ndarray) + _ = probe_image_inputs(test_video_ndarray) def test_probe_video_inputs_wrong_dtype(request): test_video_ndarray = request.getfixturevalue('test_video_ndarray') with pytest.raises(Exception): - _ = probe_video_inputs(test_video_ndarray.astype(np.float32), fps=30.) + _ = probe_image_inputs(test_video_ndarray.astype(np.float32), fps=30.) def test_probe_video_inputs_wrong_shape_1(request): test_video_ndarray = request.getfixturevalue('test_video_ndarray') with pytest.raises(Exception): - _ = probe_video_inputs(test_video_ndarray[np.newaxis], fps=30.) + _ = probe_image_inputs(test_video_ndarray[np.newaxis], fps=30.) def test_probe_video_inputs_wrong_shape_2(request): test_video_ndarray = request.getfixturevalue('test_video_ndarray') with pytest.raises(Exception): - _ = probe_video_inputs(test_video_ndarray[...,0:1], fps=30.) - -def test_probe_video_inputs_wrong_shape_3(request): - test_video_ndarray = request.getfixturevalue('test_video_ndarray') - with pytest.raises(Exception): - _ = probe_video_inputs(test_video_ndarray[:10], fps=30.) + _ = probe_image_inputs(test_video_ndarray[...,0:1], fps=30.) def test_probe_video_inputs_wrong_type(): with pytest.raises(Exception): - _ = probe_video_inputs(12345, fps=30.) + _ = probe_image_inputs(12345, fps=30.) @pytest.mark.parametrize("file", [True, False]) @pytest.mark.parametrize("roi", [None, (200, 0, 500, 350)]) @@ -91,13 +87,13 @@ def test_probe_video_inputs_wrong_type(): def test_parse_video_inputs(request, file, roi, target_size, target_fps): if file: test_video_path = request.getfixturevalue('test_video_path') - parsed, fps_in, video_shape_in, ds_factor, idxs = parse_video_inputs( - test_video_path, roi=roi, target_size=target_size, target_fps=target_fps) + parsed, fps_in, video_shape_in, ds_factor, idxs = parse_image_inputs( + inputs=test_video_path, roi=roi, target_size=target_size, target_fps=target_fps) else: test_video_ndarray = request.getfixturevalue('test_video_ndarray') test_video_fps = request.getfixturevalue('test_video_fps') - parsed, fps_in, video_shape_in, ds_factor, idxs = parse_video_inputs( - test_video_ndarray, fps=test_video_fps, roi=roi, target_size=target_size, + parsed, fps_in, video_shape_in, ds_factor, idxs = parse_image_inputs( + inputs=test_video_ndarray, fps=test_video_fps, roi=roi, target_size=target_size, target_fps=target_fps) assert parsed.shape == (360 if target_fps is None else 360 // 2, 200 if target_size is not None else (350 if roi is not None else 480), @@ -110,11 +106,11 @@ def test_parse_video_inputs(request, file, roi, target_size, target_fps): def test_parse_video_inputs_no_file(): with pytest.raises(Exception): - _ = parse_video_inputs("does_not_exist") + _ = parse_image_inputs("does_not_exist") def test_parse_video_inputs_wrong_type(): with pytest.raises(Exception): - _ = parse_video_inputs(12345, fps=30.) + _ = parse_image_inputs(12345, fps=30.) def test_merge_faces(): np.testing.assert_equal( diff --git a/tests/test_vitallens.py b/tests/test_vitallens.py index 36d6f43..8e21617 100644 --- a/tests/test_vitallens.py +++ b/tests/test_vitallens.py @@ -21,6 +21,7 @@ import base64 import json import numpy as np +from prpy.numpy.image import parse_image_inputs import pytest import requests from unittest.mock import Mock, patch @@ -30,7 +31,7 @@ from vitallens.constants import API_MAX_FRAMES, API_MIN_FRAMES, API_URL from vitallens.methods.vitallens import VitalLensRPPGMethod -from vitallens.utils import load_config, parse_video_inputs +from vitallens.utils import load_config def create_mock_response( status_code: int, @@ -137,8 +138,8 @@ def test_VitalLens_API_valid_response(request, process_signals): test_video_ndarray = request.getfixturevalue('test_video_ndarray') test_video_fps = request.getfixturevalue('test_video_fps') test_video_faces = request.getfixturevalue('test_video_faces') - frames, *_ = parse_video_inputs( - video=test_video_ndarray, fps=test_video_fps, target_size=config['input_size'], + frames, *_ = parse_image_inputs( + inputs=test_video_ndarray, fps=test_video_fps, target_size=config['input_size'], roi=test_video_faces[0].tolist(), library='prpy', scale_algorithm='bilinear') headers = {"x-api-key": api_key} payload = {"video": base64.b64encode(frames[:16].tobytes()).decode('utf-8')} @@ -168,8 +169,8 @@ def test_VitalLens_API_wrong_api_key(request): test_video_ndarray = request.getfixturevalue('test_video_ndarray') test_video_fps = request.getfixturevalue('test_video_fps') test_video_faces = request.getfixturevalue('test_video_faces') - frames, *_ = parse_video_inputs( - video=test_video_ndarray, fps=test_video_fps, target_size=config['input_size'], + frames, *_ = parse_image_inputs( + inputs=test_video_ndarray, fps=test_video_fps, target_size=config['input_size'], roi=test_video_faces[0].tolist(), library='prpy', scale_algorithm='bilinear') headers = {"x-api-key": "WRONG_API_KEY"} payload = {"video": base64.b64encode(frames[:16].tobytes()).decode('utf-8')} diff --git a/vitallens/client.py b/vitallens/client.py index ab1746f..2ba1dff 100644 --- a/vitallens/client.py +++ b/vitallens/client.py @@ -25,6 +25,7 @@ import numpy as np import os from prpy.constants import SECONDS_PER_MINUTE +from prpy.numpy.image import probe_image_inputs from typing import Union from vitallens.constants import DISCLAIMER @@ -36,7 +37,7 @@ from vitallens.methods.vitallens import VitalLensRPPGMethod from vitallens.signal import windowed_freq, windowed_mean from vitallens.ssd import FaceDetector -from vitallens.utils import load_config, probe_video_inputs, check_faces, convert_ndarray_to_list +from vitallens.utils import load_config, check_faces, convert_ndarray_to_list class Method(IntEnum): VITALLENS = 1 @@ -118,10 +119,10 @@ def __call__( video file. Note that aggressive video encoding destroys the rPPG signal. faces: Face boxes in flat point form, containing [x0, y0, x1, y1] coords. Ignored unless detect_faces=False. Pass a list or np.ndarray of - shape (n_faces, n_frames, 4) for multiple faces detected on multiple frames, - shape (n_frames, 4) for single face detected on mulitple frames, or - shape (4,) for a single face detected globally, or - `None` to assume all frames already cropped to the same single face detection. + - shape (n_faces, n_frames, 4) for multiple faces detected on multiple frames, + - shape (n_frames, 4) for single face detected on mulitple frames, or + - shape (4,) for a single face detected globally, or + - `None` to assume all frames already cropped to the same single face detection. fps: Sampling frequency of the input video. Required if type(video) == np.ndarray. override_fps_target: Target fps at which rPPG inference should be run (optional). If not provided, will use default of the selected method. @@ -172,7 +173,7 @@ def __call__( ] """ # Probe inputs - inputs_shape, fps, _ = probe_video_inputs(video=video, fps=fps) + inputs_shape, fps, _ = probe_image_inputs(video, fps=fps, allow_image=False) # TODO: Optimize performance of simple rPPG methods for long videos # Warning if using long video target_fps = override_fps_target if override_fps_target is not None else self.rppg.fps_target diff --git a/vitallens/methods/rppg_method.py b/vitallens/methods/rppg_method.py index 9552a8e..a32e95f 100644 --- a/vitallens/methods/rppg_method.py +++ b/vitallens/methods/rppg_method.py @@ -19,7 +19,6 @@ # SOFTWARE. import abc -import numpy as np class RPPGMethod(metaclass=abc.ABCMeta): """Abstract superclass for rPPG methods""" diff --git a/vitallens/methods/simple_rppg_method.py b/vitallens/methods/simple_rppg_method.py index cc264ac..e5f9f15 100644 --- a/vitallens/methods/simple_rppg_method.py +++ b/vitallens/methods/simple_rppg_method.py @@ -22,13 +22,13 @@ import numpy as np from prpy.constants import SECONDS_PER_MINUTE from prpy.numpy.face import get_roi_from_det -from prpy.numpy.image import reduce_roi +from prpy.numpy.image import reduce_roi, parse_image_inputs from prpy.numpy.signal import interpolate_cubic_spline, estimate_freq from typing import Union, Tuple from vitallens.constants import CALC_HR_MIN, CALC_HR_MAX from vitallens.methods.rppg_method import RPPGMethod -from vitallens.utils import parse_video_inputs, merge_faces +from vitallens.utils import merge_faces class SimpleRPPGMethod(RPPGMethod): """A simple rPPG method using a handcrafted algorithm based on RGB signal trace""" @@ -89,9 +89,11 @@ def __call__( u_roi = merge_faces(faces) faces = faces - [u_roi[0], u_roi[1], u_roi[0], u_roi[1]] # Parse the inputs - frames_ds, fps, inputs_shape, ds_factor, _ = parse_video_inputs( - video=frames, fps=fps, target_size=None, roi=u_roi, - target_fps=override_fps_target if override_fps_target is not None else self.fps_target) + frames_ds, fps, inputs_shape, ds_factor, _ = parse_image_inputs( + inputs=frames, fps=fps, roi=u_roi, target_size=None, + target_fps=override_fps_target if override_fps_target is not None else self.fps_target, + preserve_aspect_ratio=False, library='prpy', scale_algorithm='bilinear', + trim=None, allow_image=False, videodims=True) assert inputs_shape[0] == faces.shape[0], "Need same number of frames as face detections" faces_ds = faces[0::ds_factor] assert frames_ds.shape[0] == faces_ds.shape[0], "Need same number of frames as face detections" diff --git a/vitallens/methods/vitallens.py b/vitallens/methods/vitallens.py index 6c9f7f3..4f7aa8c 100644 --- a/vitallens/methods/vitallens.py +++ b/vitallens/methods/vitallens.py @@ -24,6 +24,7 @@ import numpy as np from prpy.constants import SECONDS_PER_MINUTE from prpy.numpy.face import get_roi_from_det +from prpy.numpy.image import probe_image_inputs, parse_image_inputs from prpy.numpy.signal import detrend, moving_average, standardize from prpy.numpy.signal import interpolate_cubic_spline, estimate_freq from prpy.numpy.utils import enough_memory_for_ndarray @@ -39,7 +40,7 @@ from vitallens.signal import detrend_lambda_for_hr_response, detrend_lambda_for_rr_response from vitallens.signal import moving_average_size_for_hr_response, moving_average_size_for_rr_response from vitallens.signal import reassemble_from_windows -from vitallens.utils import probe_video_inputs, parse_video_inputs, check_faces_in_roi +from vitallens.utils import check_faces_in_roi class VitalLensRPPGMethod(RPPGMethod): """RPPG method using the VitalLens API for inference""" @@ -83,17 +84,18 @@ def __call__( - out_unit: The estimation unit for each signal. - out_conf: The estimation confidence for each signal. - out_note: An explanatory note for each signal. - - live: The face live confidence. Shape (1, n_frames) + - live: The face live confidence. Shape (n_frames,) """ - inputs_shape, fps, video_issues = probe_video_inputs(video=frames, fps=fps) - video_fits_in_memory = enough_memory_for_ndarray( - shape=(inputs_shape[0], self.input_size, self.input_size, 3), dtype=np.uint8) + inputs_shape, fps, video_issues = probe_image_inputs(frames, fps=fps) # Check the number of frames to be processed inputs_n = inputs_shape[0] fps_target = override_fps_target if override_fps_target is not None else self.fps_target expected_ds_factor = round(fps / fps_target) expected_ds_n = math.ceil(inputs_n / expected_ds_factor) # Check if we can parse the video globally + video_fits_in_memory = enough_memory_for_ndarray( + shape=(expected_ds_n, self.input_size, self.input_size, 3), dtype=np.uint8, + max_fraction_of_available_memory_to_use=0.1) global_face = faces[np.argmin(np.linalg.norm(faces - np.median(faces, axis=0), axis=1))] global_roi = get_roi_from_det( global_face, roi_method=self.roi_method, clip_dims=(inputs_shape[2], inputs_shape[1])) @@ -102,9 +104,10 @@ def __call__( if override_global_parse is not None: global_parse = override_global_parse if global_parse: # Parse entire video for inference globally - frames, _, _, _, idxs = parse_video_inputs( - video=frames, fps=fps, target_size=self.input_size, roi=global_roi, target_fps=fps_target, - library='prpy', scale_algorithm='bilinear', dim_deltas=(API_OVERLAP, 0, 0)) + frames, _, _, _, idxs = parse_image_inputs( + inputs=frames, fps=fps, roi=global_roi, target_size=self.input_size, target_fps=fps_target, + preserve_aspect_ratio=False, library='prpy', scale_algorithm='bilinear', + trim=None, allow_image=False, videodims=True) # Longer videos are split up with small overlaps n_splits = 1 if expected_ds_n <= API_MAX_FRAMES else math.ceil((expected_ds_n - API_MAX_FRAMES) / (API_MAX_FRAMES - API_OVERLAP)) + 1 split_len = expected_ds_n if n_splits == 1 else math.ceil((inputs_n + (n_splits-1) * API_OVERLAP * expected_ds_factor) / n_splits) @@ -228,10 +231,11 @@ def process_api_batch( idxs = list(range(0, inputs_shape[0], ds_factor)) else: # Inputs have not been parsed globally. Parse the inputs - frames_ds, _, _, ds_factor, idxs = parse_video_inputs( - video=inputs, fps=fps, target_size=self.input_size, roi=roi, target_fps=fps_target, + frames_ds, _, _, ds_factor, idxs = parse_image_inputs( + inputs=inputs, fps=fps, roi=roi, target_size=self.input_size, target_fps=fps_target, + preserve_aspect_ratio=False, library='prpy', scale_algorithm='bilinear', trim=(start, end) if start is not None and end is not None else None, - library='prpy', scale_algorithm='bilinear', dim_deltas=(API_OVERLAP, 0, 0)) + allow_image=False, videodims=True) # Make sure we have the correct number of frames expected_n = math.ceil(((end-start) if start is not None and end is not None else inputs_shape[0]) / ds_factor) if frames_ds.shape[0] != expected_n or len(idxs) != expected_n: diff --git a/vitallens/ssd.py b/vitallens/ssd.py index 82f98fc..d095fe9 100644 --- a/vitallens/ssd.py +++ b/vitallens/ssd.py @@ -23,6 +23,7 @@ import math import numpy as np import os +from prpy.numpy.image import parse_image_inputs from prpy.numpy.signal import interpolate_vals import sys from typing import Tuple @@ -32,8 +33,6 @@ else: from importlib_resources import files -from vitallens.utils import parse_video_inputs - INPUT_SIZE = (240, 320) MAX_SCAN_FRAMES = 60 @@ -299,9 +298,10 @@ def scan_batch( """ logging.debug("Batch {}/{}...".format(batch, n_batches)) # Parse the inputs - inputs, fps, _, _, idxs = parse_video_inputs( - video=inputs, fps=fps, target_size=INPUT_SIZE, target_fps=self.fs, - library='prpy', scale_algorithm='bilinear', trim=(start, end)) + inputs, fps, _, _, idxs = parse_image_inputs( + inputs=inputs, fps=fps, roi=None, target_size=INPUT_SIZE, target_fps=self.fs, + preserve_aspect_ratio=False, library='prpy', scale_algorithm='bilinear', + trim=(start, end), allow_image=False, videodims=True) # Forward pass onnx_inputs = {"args_0": (inputs.astype(np.float32) - 127.0) / 128.0} onnx_outputs = self.model.run(None, onnx_inputs)[0] diff --git a/vitallens/utils.py b/vitallens/utils.py index b6a435e..b703de6 100644 --- a/vitallens/utils.py +++ b/vitallens/utils.py @@ -19,14 +19,10 @@ # SOFTWARE. import logging -import math import numpy as np import os -from prpy.ffmpeg.probe import probe_video -from prpy.ffmpeg.readwrite import read_video_from_path -from prpy.numpy.image import crop_slice_resize import sys -from typing import Union, Tuple +from typing import Union import urllib.request import yaml @@ -63,137 +59,6 @@ def download_file(url: str, dest: str): else: logging.info("{} already exists, skipping download.".format(dest)) -def probe_video_inputs( - video: Union[np.ndarray, str], - fps: float = None - ) -> Tuple[tuple, float, bool]: - """Check the video inputs and probe to extract metadata. - - Args: - video: The video to analyze. Either a np.ndarray of shape (n_frames, h, w, 3) - with a sequence of frames in unscaled uint8 RGB format, or a path to a - video file. - fps: Sampling frequency of the input video. Required if type(video)==np.ndarray. - Returns: - Tuple of - - video_shape: The shape of the input video as (n_frames, h, w, 3) - - fps: Sampling frequency of the input video. - - issues: True if a possible issue with the video has been detected. - """ - # Check that fps is correct type - if not (fps is None or isinstance(fps, (int, float))): - raise ValueError("fps should be a number, but got {}".format(type(fps))) - # Check if video is array or file name - if isinstance(video, str): - if os.path.isfile(video): - try: - fps_, n, w_, h_, _, _, r, i = probe_video(video) - if fps is None: fps = fps_ - if abs(r) == 90: h = w_; w = h_ - else: h = h_; w = w_ - return (n, h, w, 3), fps, i - except Exception as e: - raise ValueError("Problem probing video at {}: {}".format(video, e)) - else: - raise ValueError("No file found at {}".format(video)) - elif isinstance(video, np.ndarray): - if fps is None: - raise ValueError("fps must be specified for ndarray input") - if video.dtype != np.uint8: - raise ValueError("video.dtype should be uint8, but got {}".format(video.dtype)) - if len(video.shape) != 4 or video.shape[0] < API_MIN_FRAMES or video.shape[3] != 3: - raise ValueError("video should have shape (n_frames [>= {}], h, w, 3), but found {}".format(API_MIN_FRAMES, video.shape)) - return video.shape, fps, False - else: - raise ValueError("Invalid video {}, type {}".format(video, type(input))) - -def parse_video_inputs( - video: Union[np.ndarray, str], - fps: float = None, - roi: tuple = None, - target_size: Union[int, tuple] = None, - target_fps: float = None, - library: str = 'prpy', - scale_algorithm: str = 'bilinear', - trim: tuple = None, - dim_deltas: tuple = (1, 1, 1) - ) -> Tuple[np.ndarray, float, tuple, int, list]: - """Parse video inputs into required shape. - - Args: - video: The video input. Either a filepath to video file or ndarray - fps: Framerate of video input. Can be `None` if video file provided. - roi: The region of interest as (x0, y0, x1, y1). Use None to keep all. - target_size: Optional target size as int or tuple (h, w) - target_fps: Optional target framerate - library: Library to use for resample if video is np.ndarray - scale_algorithm: Algorithm to use for resample - trim: Frame numbers for temporal trimming (start, end) (optional). - dim_deltas: Maximum acceptable deviation from expected video (n, h, w) dims. - Returns: - Tuple of - - parsed: Parsed inputs as `np.ndarray` with type uint8. Shape (n, h, w, c) - if target_size provided, h = target_size[0] and w = target_size[1]. - - fps_in: Frame rate of original inputs - - shape_in: Shape of original inputs in form (n, h, w, c) - - ds_factor: Temporal downsampling factor applied - - idxs: The frame indices returned from original video - """ - # Check if input is array or file name - if isinstance(video, str): - if os.path.isfile(video): - try: - fps_, n, w_, h_, _, _, r, i = probe_video(video) - if fps is None: fps = fps_ - if roi is not None: roi = (int(roi[0]), int(roi[1]), int(roi[2]), int(roi[3])) - if isinstance(target_size, tuple): target_size = (target_size[1], target_size[0]) - if abs(r) == 90: h = w_; w = h_ - else: h = h_; w = w_ - try: - video, ds_factor = read_video_from_path( - path=video, target_fps=target_fps, crop=roi, scale=target_size, trim=trim, - pix_fmt='rgb24', dim_deltas=dim_deltas, scale_algorithm=scale_algorithm) - except: - ValueError(VIDEO_PARSE_ERROR) - expected_n = math.ceil(((trim[1]-trim[0]) if trim is not None else n) / ds_factor) - if video.shape[0] < expected_n: - logging.warning("Less frames received than expected (delta = {}) - this may indicate an issue with the video file. Padding to avoid issues.".format(video.shape[0]-expected_n)) - video = np.concatenate((np.repeat(video[:1], expected_n - video.shape[0], axis=0), video), axis=0) - elif video.shape[0] > expected_n: - logging.warning("More frames received than expected (delta = {}) - this may indicate an issue with the video file. Trimming to avoid issues.".format(video.shape[0]-expected_n)) - video = video[:expected_n] - start_idx = max(0, trim[0]) if trim is not None else 0 - end_idx = min(n, trim[1]) if trim is not None else n - idxs = list(range(start_idx, end_idx, ds_factor)) - if video.shape[0] != expected_n or len(idxs) != expected_n: - raise ValueError(VIDEO_PARSE_ERROR) - return video, fps, (n, h, w, 3), ds_factor, idxs - except Exception as e: - raise ValueError("Problem reading video from {}: {}".format(video, e)) - else: - raise ValueError("No file found at {}".format(video)) - elif isinstance(video, np.ndarray): - video_shape_in = video.shape - # Downsample / crop / scale if necessary - ds_factor = 1 - if target_fps is not None: - if target_fps > fps: logging.warning("target_fps should not be greater than fps. Ignoring.") - else: ds_factor = max(round(fps / target_fps), 1) - target_idxs = None if ds_factor == 1 else list(range(video.shape[0])[0::ds_factor]) - if trim is not None: - if target_idxs is None: target_idxs = range(video_shape_in[0]) - target_idxs = [idx for idx in target_idxs if trim[0] <= idx < trim[1]] - if roi is not None or target_size is not None or target_idxs is not None: - if target_size is None and roi is not None: target_size = (int(roi[3]-roi[1]), int(roi[2]-roi[0])) - elif target_size is None: target_size = (video.shape[1], video.shape[2]) - video = crop_slice_resize( - inputs=video, target_size=target_size, roi=roi, target_idxs=target_idxs, - preserve_aspect_ratio=False, library=library, scale_algorithm=scale_algorithm) - if target_idxs is None: target_idxs = list(range(video_shape_in[0])) - return video, fps, video_shape_in, ds_factor, target_idxs - else: - raise ValueError("Invalid video {}, type {}".format(video, type(video))) - def merge_faces(faces: np.ndarray) -> tuple: """Compute the union of all faces.