Skip to content

Commit

Permalink
Use Meter class for processing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nocarryr committed Jun 21, 2024
1 parent 0ef3ee9 commit 8ee65cc
Showing 1 changed file with 37 additions and 48 deletions.
85 changes: 37 additions & 48 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import numpy as np

from lupy import Sampler, BlockProcessor
from lupy import Meter
from lupy.types import FloatArray

from conftest import gen_1k_sine
Expand All @@ -31,102 +31,91 @@ def build_samples(
samples[np.array(sine_channels),...] = sig
return samples

def iter_process(sampler: Sampler, processor: BlockProcessor, src_data: FloatArray):
def iter_process(meter: Meter, src_data: FloatArray, process_all: bool = False):
assert src_data.ndim == 3

num_channels, num_blocks, block_size = src_data.shape
assert num_channels == sampler.num_channels
assert block_size == sampler.block_size
assert num_channels == meter.num_channels
assert block_size == meter.block_size

write_index = 0

while write_index < num_blocks:
while sampler.can_write() and write_index < num_blocks:
sampler.write(src_data[:,write_index,:])
while meter.can_write() and write_index < num_blocks:
block_index = meter.processor.block_index
meter.write(src_data[:,write_index,:], process=True, process_all=process_all)
write_index += 1
if meter.processor.block_index > block_index:
yield from range(block_index, meter.processor.block_index)

while sampler.can_read():
blk_samples = sampler.read()
assert blk_samples.shape == (num_channels, sampler.gate_size)
block_index = processor.block_index
processor.process_block(blk_samples)
yield block_index

def process_all(sampler: Sampler, processor: BlockProcessor, src_data: FloatArray):
for _ in iter_process(sampler, processor, src_data):
def process_all(meter: Meter, src_data: FloatArray):
for _ in iter_process(meter, src_data, process_all=True):
pass


def test_integrated_lkfs(block_size, all_channels, is_silent, reset_process):
num_channels, sine_channel = all_channels
sampler = Sampler(block_size=block_size, num_channels=num_channels)
meter = Meter(block_size=block_size, num_channels=num_channels)

N, Fs = sampler.total_samples, int(sampler.sample_rate)
num_blocks, gate_size = sampler.num_blocks, sampler.gate_size

processor = BlockProcessor(num_channels=num_channels, gate_size=gate_size)
N, Fs = meter.sampler.total_samples, int(meter.sample_rate)
num_blocks, gate_size = meter.sampler.num_blocks, meter.sampler.gate_size

_sine_channels = None if is_silent else [sine_channel]
src_data = build_samples(N, num_channels, Fs, _sine_channels, 1)
src_data = src_data.reshape((num_channels, num_blocks, block_size))

if reset_process:
for block_index in iter_process(sampler, processor, src_data):
for block_index in iter_process(meter, src_data):
if block_index >= num_blocks // 2:
break
processor.reset()
meter.processor.reset()

for block_index in iter_process(sampler, processor, src_data):
print(f'{block_index=}, {processor._rel_threshold=}, {processor.integrated_lkfs=}')
for block_index in iter_process(meter, src_data):
print(f'{block_index=}, {meter.processor._rel_threshold=}, {meter.integrated_lkfs=}')


print(f'{N=}, {N / gate_size}, {len(processor)=}')
print(f'{N=}, {N / gate_size}, {len(meter.processor)=}')

lkfs = round(processor.integrated_lkfs, 2)
if is_silent:
assert processor.integrated_lkfs <= 120
assert meter.integrated_lkfs <= 120
elif sine_channel < 3:
assert round(processor.integrated_lkfs, 2) == -3.01
assert round(meter.integrated_lkfs, 2) == -3.01
# assert -3.02 <= lkfs <= -3.00
else:
assert round(processor.integrated_lkfs, 2) == -1.52
assert round(meter.integrated_lkfs, 2) == -1.52
# assert -1.53 <= lkfs <= -1.51


# https://tech.ebu.ch/docs/tech/tech3341.pdf Section 2.9
# Stereo 1k (997 Hz) sine at -18 dBFS should read -18 LUFS
def test_integrated_lkfs_neg18(block_size):
num_channels = 2
sampler = Sampler(block_size=block_size, num_channels=num_channels)

N, Fs = sampler.total_samples, int(sampler.sample_rate)
num_blocks, gate_size = sampler.num_blocks, sampler.gate_size
meter = Meter(block_size=block_size, num_channels=num_channels)

processor = BlockProcessor(num_channels=num_channels, gate_size=gate_size)
N, Fs = meter.sampler.total_samples, int(meter.sample_rate)
num_blocks, gate_size = meter.sampler.num_blocks, meter.sampler.gate_size

sine_channels = [0, 1]
amp = 10 ** (-18/20)
src_data = build_samples(N, num_channels, Fs, sine_channels, amp)
src_data = src_data.reshape((num_channels, num_blocks, block_size))

for block_index in iter_process(sampler, processor, src_data):
print(f'{block_index=}, {processor._rel_threshold=}, {processor.integrated_lkfs=}')
for block_index in iter_process(meter, src_data):
print(f'{block_index=}, {meter.processor._rel_threshold=}, {meter.integrated_lkfs=}')

print(f'{N=}, {N / gate_size}, {len(processor)=}')
print(f'{N=}, {N / gate_size}, {len(meter.processor)=}')

assert round(processor.integrated_lkfs, 2) == -18
assert round(meter.integrated_lkfs, 2) == -18


def test_compliance_cases(compliance_case):
block_size = 128
num_channels = 5
sampler = Sampler(block_size=block_size, num_channels=num_channels)
meter = Meter(block_size=block_size, num_channels=num_channels)

processor = BlockProcessor(
num_channels=num_channels, gate_size=sampler.gate_size
)
print('generating samples...')
src_data = compliance_case.generate_samples(int(sampler.sample_rate))
src_data = compliance_case.generate_samples(int(meter.sample_rate))
N = src_data.shape[1]
# assert N % block_size == 0
# num_blocks = N // block_size
Expand All @@ -142,13 +131,13 @@ def test_compliance_cases(compliance_case):
# for block_index in iter_process(sampler, processor, src_data):
# # print(f'{block_index=}, {processor.integrated_lkfs=}, {sampler.samples_available=}')
# pass
process_all(sampler, processor, src_data)
process_all(meter, src_data)

print(f'{processor.t[-1]=}')
integrated = processor.integrated_lkfs
momentary, short_term = processor.momentary_lkfs[-1], processor.short_term_lkfs[-1]
print(f'{meter.t[-1]=}')
integrated = meter.integrated_lkfs
momentary, short_term = meter.momentary_lkfs[-1], meter.short_term_lkfs[-1]
print(f'{integrated=}, {momentary=}, {short_term=}')
print(f'{processor._rel_threshold=}')
print(f'{meter.processor._rel_threshold=}')

integrated_target = compliance_case.result.integrated
momentary_target = compliance_case.result.momentary
Expand All @@ -166,4 +155,4 @@ def test_compliance_cases(compliance_case):
assert lufs - tol <= integrated <= lufs + tol
if lra_target is not None:
lra_lu, tol = lra_target
assert lra_lu - tol <= processor.lra <= lra_lu + tol
assert lra_lu - tol <= meter.lra <= lra_lu + tol

0 comments on commit 8ee65cc

Please sign in to comment.