diff --git a/tests/test_processing.py b/tests/test_processing.py index e92179e..dd0a434 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -4,7 +4,7 @@ import pytest import numpy as np -from lupy import Sampler, BlockProcessor +from lupy import Meter from lupy.types import FloatArray from conftest import gen_1k_sine @@ -31,65 +31,59 @@ def build_samples( samples[np.array(sine_channels),...] = sig return samples -def iter_process(sampler: Sampler, processor: BlockProcessor, src_data: FloatArray): +def iter_process(meter: Meter, src_data: FloatArray, process_all: bool = False): assert src_data.ndim == 3 num_channels, num_blocks, block_size = src_data.shape - assert num_channels == sampler.num_channels - assert block_size == sampler.block_size + assert num_channels == meter.num_channels + assert block_size == meter.block_size write_index = 0 while write_index < num_blocks: - while sampler.can_write() and write_index < num_blocks: - sampler.write(src_data[:,write_index,:]) + while meter.can_write() and write_index < num_blocks: + block_index = meter.processor.block_index + meter.write(src_data[:,write_index,:], process=True, process_all=process_all) write_index += 1 + if meter.processor.block_index > block_index: + yield from range(block_index, meter.processor.block_index) - while sampler.can_read(): - blk_samples = sampler.read() - assert blk_samples.shape == (num_channels, sampler.gate_size) - block_index = processor.block_index - processor.process_block(blk_samples) - yield block_index -def process_all(sampler: Sampler, processor: BlockProcessor, src_data: FloatArray): - for _ in iter_process(sampler, processor, src_data): +def process_all(meter: Meter, src_data: FloatArray): + for _ in iter_process(meter, src_data, process_all=True): pass def test_integrated_lkfs(block_size, all_channels, is_silent, reset_process): num_channels, sine_channel = all_channels - sampler = Sampler(block_size=block_size, num_channels=num_channels) + meter = Meter(block_size=block_size, num_channels=num_channels) - N, Fs = sampler.total_samples, int(sampler.sample_rate) - num_blocks, gate_size = sampler.num_blocks, sampler.gate_size - - processor = BlockProcessor(num_channels=num_channels, gate_size=gate_size) + N, Fs = meter.sampler.total_samples, int(meter.sample_rate) + num_blocks, gate_size = meter.sampler.num_blocks, meter.sampler.gate_size _sine_channels = None if is_silent else [sine_channel] src_data = build_samples(N, num_channels, Fs, _sine_channels, 1) src_data = src_data.reshape((num_channels, num_blocks, block_size)) if reset_process: - for block_index in iter_process(sampler, processor, src_data): + for block_index in iter_process(meter, src_data): if block_index >= num_blocks // 2: break - processor.reset() + meter.processor.reset() - for block_index in iter_process(sampler, processor, src_data): - print(f'{block_index=}, {processor._rel_threshold=}, {processor.integrated_lkfs=}') + for block_index in iter_process(meter, src_data): + print(f'{block_index=}, {meter.processor._rel_threshold=}, {meter.integrated_lkfs=}') - print(f'{N=}, {N / gate_size}, {len(processor)=}') + print(f'{N=}, {N / gate_size}, {len(meter.processor)=}') - lkfs = round(processor.integrated_lkfs, 2) if is_silent: - assert processor.integrated_lkfs <= 120 + assert meter.integrated_lkfs <= 120 elif sine_channel < 3: - assert round(processor.integrated_lkfs, 2) == -3.01 + assert round(meter.integrated_lkfs, 2) == -3.01 # assert -3.02 <= lkfs <= -3.00 else: - assert round(processor.integrated_lkfs, 2) == -1.52 + assert round(meter.integrated_lkfs, 2) == -1.52 # assert -1.53 <= lkfs <= -1.51 @@ -97,36 +91,31 @@ def test_integrated_lkfs(block_size, all_channels, is_silent, reset_process): # Stereo 1k (997 Hz) sine at -18 dBFS should read -18 LUFS def test_integrated_lkfs_neg18(block_size): num_channels = 2 - sampler = Sampler(block_size=block_size, num_channels=num_channels) - - N, Fs = sampler.total_samples, int(sampler.sample_rate) - num_blocks, gate_size = sampler.num_blocks, sampler.gate_size + meter = Meter(block_size=block_size, num_channels=num_channels) - processor = BlockProcessor(num_channels=num_channels, gate_size=gate_size) + N, Fs = meter.sampler.total_samples, int(meter.sample_rate) + num_blocks, gate_size = meter.sampler.num_blocks, meter.sampler.gate_size sine_channels = [0, 1] amp = 10 ** (-18/20) src_data = build_samples(N, num_channels, Fs, sine_channels, amp) src_data = src_data.reshape((num_channels, num_blocks, block_size)) - for block_index in iter_process(sampler, processor, src_data): - print(f'{block_index=}, {processor._rel_threshold=}, {processor.integrated_lkfs=}') + for block_index in iter_process(meter, src_data): + print(f'{block_index=}, {meter.processor._rel_threshold=}, {meter.integrated_lkfs=}') - print(f'{N=}, {N / gate_size}, {len(processor)=}') + print(f'{N=}, {N / gate_size}, {len(meter.processor)=}') - assert round(processor.integrated_lkfs, 2) == -18 + assert round(meter.integrated_lkfs, 2) == -18 def test_compliance_cases(compliance_case): block_size = 128 num_channels = 5 - sampler = Sampler(block_size=block_size, num_channels=num_channels) + meter = Meter(block_size=block_size, num_channels=num_channels) - processor = BlockProcessor( - num_channels=num_channels, gate_size=sampler.gate_size - ) print('generating samples...') - src_data = compliance_case.generate_samples(int(sampler.sample_rate)) + src_data = compliance_case.generate_samples(int(meter.sample_rate)) N = src_data.shape[1] # assert N % block_size == 0 # num_blocks = N // block_size @@ -142,13 +131,13 @@ def test_compliance_cases(compliance_case): # for block_index in iter_process(sampler, processor, src_data): # # print(f'{block_index=}, {processor.integrated_lkfs=}, {sampler.samples_available=}') # pass - process_all(sampler, processor, src_data) + process_all(meter, src_data) - print(f'{processor.t[-1]=}') - integrated = processor.integrated_lkfs - momentary, short_term = processor.momentary_lkfs[-1], processor.short_term_lkfs[-1] + print(f'{meter.t[-1]=}') + integrated = meter.integrated_lkfs + momentary, short_term = meter.momentary_lkfs[-1], meter.short_term_lkfs[-1] print(f'{integrated=}, {momentary=}, {short_term=}') - print(f'{processor._rel_threshold=}') + print(f'{meter.processor._rel_threshold=}') integrated_target = compliance_case.result.integrated momentary_target = compliance_case.result.momentary @@ -166,4 +155,4 @@ def test_compliance_cases(compliance_case): assert lufs - tol <= integrated <= lufs + tol if lra_target is not None: lra_lu, tol = lra_target - assert lra_lu - tol <= processor.lra <= lra_lu + tol + assert lra_lu - tol <= meter.lra <= lra_lu + tol