Skip to content

Commit

Permalink
Bug Fixes and Documentation Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
dprohe committed Sep 16, 2024
1 parent 3aca53a commit c86e926
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 73 deletions.
4 changes: 0 additions & 4 deletions components/data_physics_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""


import ctypes
from enum import Enum
import numpy as np
from numpy.ctypeslib import ndpointer
from time import sleep,time
import matplotlib.pyplot as plt

plt.close('all')

DEBUG = False

Expand Down
16 changes: 15 additions & 1 deletion components/nidaqmx_hardware_multitask.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def set_parameters(self,test_data : DataAcquisitionParameters):
self.reader = ni_read.AnalogMultiChannelReader(self.task.in_stream)
self.read_data = np.zeros((len(self.task.ai_channels),test_data.samples_per_read))
self.acquisition_delay = BUFFER_SIZE_FACTOR*test_data.samples_per_write
print('Actual Acquisition Sample Rate: {:}'.format(self.task.timing.samp_clk_rate))

def start(self):
"""Start acquiring data"""
Expand Down Expand Up @@ -339,12 +340,24 @@ def create_sources(self,channel_data : List[Channel]):
channel_data : List[Channel] :
A list of ``Channel`` objects defining the channels in the test
"""
self.write_trigger = '/'+channel_data[0].physical_device+'/ai/StartTrigger'
# Get the physical devices
physical_devices = list(set([ni.system.device.Device(channel.feedback_device).product_type
for channel in channel_data
if not (channel.feedback_device is None)
and not (channel.feedback_device.strip() == '')]))
# Check if it's a CDAQ device
try:
devices = [ni.system.device.Device(channel.feedback_device)
for channel in channel_data
if not (channel.feedback_device is None)
and not (channel.feedback_device.strip() == '')]
if len(devices) == 0:
self.write_trigger = None # No output device
else:
chassis_device = devices[0].compact_daq_chassis_device
self.write_trigger = [trigger for trigger in chassis_device.terminals if 'ai/StartTrigger' in trigger][0]
except ni.DaqError:
self.write_trigger = '/'+channel_data[0].physical_device+'/ai/StartTrigger'
print('Output Devices: {:}'.format(physical_devices))
self.tasks = [ni.Task() for device in physical_devices]
index = 0
Expand Down Expand Up @@ -383,6 +396,7 @@ def set_parameters(self,test_data : DataAcquisitionParameters):
task.triggers.start_trigger.trig_type = ni.constants.TriggerType.DIGITAL_EDGE
task.out_stream.output_buf_size = self.buffer_size_factor*test_data.samples_per_write
self.writers.append(ni_write.AnalogMultiChannelWriter(task.out_stream,auto_start=False))
print('Actual Output Sample Rate: {:}'.format(task.timing.samp_clk_rate))

def start(self):
"""Method to start acquiring data"""
Expand Down
4 changes: 4 additions & 0 deletions components/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ def output_signal(self,data):
self.queue_container.input_output_sync_queue.put((environment,write_data[...,::self.output_oversample].copy()))
self.environment_first_data[environment] = False
self.hardware.write(write_data)
else:
if self.environment_first_data[environment]:
self.queue_container.input_output_sync_queue.put((environment,0))
self.environment_first_data[environment] = False
# np.savez('test_data/output_data_check.npz',output_data = write_data)
# Now check and see if we are starting up and start the hardare if so
if self.startup:
Expand Down
8 changes: 5 additions & 3 deletions components/random_vibration_sys_id_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,8 @@ def enable_control(self,enabled):
widget.setEnabled(enabled)
for widget in [self.run_widget.stop_test_button]:
widget.setEnabled(not enabled)
if enabled:
self.run_timer.stop()

def update_run_time(self):
"""Updates the time that the control has been running on the GUI"""
Expand Down Expand Up @@ -1229,11 +1231,11 @@ def save_spectral_data(self):
self.environment_parameters.store_to_netcdf(group_handle)
# Create Variables for Spectral Data
group_handle.createDimension('drive_channels',self.last_transfer_function.shape[2])
var = group_handle.createVariable('frf_data_real','f8',('fft_lines','control_channels','drive_channels'))
var = group_handle.createVariable('frf_data_real','f8',('fft_lines','specification_channels','drive_channels'))
var[...] = self.last_transfer_function.real
var = group_handle.createVariable('frf_data_imag','f8',('fft_lines','control_channels','drive_channels'))
var = group_handle.createVariable('frf_data_imag','f8',('fft_lines','specification_channels','drive_channels'))
var[...] = self.last_transfer_function.imag
var = group_handle.createVariable('frf_coherence','f8',('fft_lines','control_channels'))
var = group_handle.createVariable('frf_coherence','f8',('fft_lines','specification_channels'))
var[...] = self.last_coherence.real
var = group_handle.createVariable('response_cpsd_real','f8',('fft_lines','specification_channels','specification_channels'))
var[...] = self.last_response_cpsd.real
Expand Down
1 change: 0 additions & 1 deletion components/random_vibration_sys_id_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""


import os
import numpy as np
from scipy.io import loadmat
Expand Down
19 changes: 10 additions & 9 deletions components/transient_sys_id_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ def store_to_netcdf(self,netcdf_group_handle : nc4._netCDF4.Group):
netcdf_group_handle.control_python_function_parameters = self.control_python_function_parameters
# Save the output signal
netcdf_group_handle.createDimension('control_channels',len(self.control_channel_indices))
netcdf_group_handle.createDimension('specification_channels',len(self.control_channel_indices))
netcdf_group_handle.createDimension('signal_samples',self.signal_samples)
var = netcdf_group_handle.createVariable('control_signal','f8',('control_channels','signal_samples'))
var = netcdf_group_handle.createVariable('control_signal','f8',('specification_channels','signal_samples'))
var[...] = self.control_signal
# Control Channels
var = netcdf_group_handle.createVariable('control_channel_indices','i4',('control_channels'))
Expand Down Expand Up @@ -887,29 +888,29 @@ def save_control_data(self):
# Create Variables for Spectral Data
group_handle.createDimension('drive_channels',self.last_transfer_function.shape[2])
group_handle.createDimension('fft_lines',self.environment_parameters.sysid_frame_size//2 + 1)
var = group_handle.createVariable('frf_data_real','f8',('fft_lines','control_channels','drive_channels'))
var = group_handle.createVariable('frf_data_real','f8',('fft_lines','specification_channels','drive_channels'))
var[...] = self.last_transfer_function.real
var = group_handle.createVariable('frf_data_imag','f8',('fft_lines','control_channels','drive_channels'))
var = group_handle.createVariable('frf_data_imag','f8',('fft_lines','specification_channels','drive_channels'))
var[...] = self.last_transfer_function.imag
var = group_handle.createVariable('frf_coherence','f8',('fft_lines','control_channels'))
var = group_handle.createVariable('frf_coherence','f8',('fft_lines','specification_channels'))
var[...] = self.last_coherence.real
var = group_handle.createVariable('response_cpsd_real','f8',('fft_lines','control_channels','control_channels'))
var = group_handle.createVariable('response_cpsd_real','f8',('fft_lines','specification_channels','specification_channels'))
var[...] = self.last_response_cpsd.real
var = group_handle.createVariable('response_cpsd_imag','f8',('fft_lines','control_channels','control_channels'))
var = group_handle.createVariable('response_cpsd_imag','f8',('fft_lines','specification_channels','specification_channels'))
var[...] = self.last_response_cpsd.imag
var = group_handle.createVariable('drive_cpsd_real','f8',('fft_lines','drive_channels','drive_channels'))
var[...] = self.last_reference_cpsd.real
var = group_handle.createVariable('drive_cpsd_imag','f8',('fft_lines','drive_channels','drive_channels'))
var[...] = self.last_reference_cpsd.imag
var = group_handle.createVariable('response_noise_cpsd_real','f8',('fft_lines','control_channels','control_channels'))
var = group_handle.createVariable('response_noise_cpsd_real','f8',('fft_lines','specification_channels','specification_channels'))
var[...] = self.last_response_noise.real
var = group_handle.createVariable('response_noise_cpsd_imag','f8',('fft_lines','control_channels','control_channels'))
var = group_handle.createVariable('response_noise_cpsd_imag','f8',('fft_lines','specification_channels','specification_channels'))
var[...] = self.last_response_noise.imag
var = group_handle.createVariable('drive_noise_cpsd_real','f8',('fft_lines','drive_channels','drive_channels'))
var[...] = self.last_reference_noise.real
var = group_handle.createVariable('drive_noise_cpsd_imag','f8',('fft_lines','drive_channels','drive_channels'))
var[...] = self.last_reference_noise.imag
var = group_handle.createVariable('control_response','f8',('control_channels','signal_samples'))
var = group_handle.createVariable('control_response','f8',('specification_channels','signal_samples'))
var[...] = self.last_control_data
var = group_handle.createVariable('control_drives','f8',('drive_channels','signal_samples'))
var[...] = self.last_output_data
Expand Down
94 changes: 39 additions & 55 deletions control_laws/transient_control_laws.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,71 @@ def pseudoinverse_control(
last_excitation_signals = None, # Last excitation signal for drive-based control
last_response_signals = None, # Last response signal for error correction
):
# Get a tolerance if specified
# Parse the input arguments in extra_parameters
rcond = 1e-15
zero_impulse_after = None
# Split it up into lines
for entry in extra_parameters.split('\n'):
try:
# For each entry, split the key from the value using the colon
field,value = entry.split(':')
# Strip any whitespace
field = field.strip()
# Check the field to figure out which value to assign
if field == 'rcond':
rcond = float(value)
elif field == 'zero_impulse_after':
zero_impulse_after = float(value)
else:
# Report if we cannot understand the parameter
print('Unrecognized Parameter: {:}, skipping...'.format(field))
except ValueError:
# Report if we cannot parse the line
print('Unable to Parse Line {:}, skipping...'.format(entry))

# Compute impulse responses
# Compute impulse responses using the IFFT of the transfer function
# We will zero pad the IFFT to do interpolation in the frequency domain
# to match the length of the required signal
impulse_response = np.fft.irfft(transfer_function,axis=0)

# The impulse response should be going to zero at the end of the frame,
# but practically there may be some gibbs phenomenon effects that make the
# impulse response noncausal. If we zero pad, this might be wrong. We
# therefore give the use the ability to zero out this non-causal poriton of
# the impulse response.
if zero_impulse_after is not None:
# Remove noncausal portion
impulse_response_abscissa = np.arange(impulse_response.shape[0])/sample_rate
zero_indices = impulse_response_abscissa > zero_impulse_after
impulse_response[zero_indices] = 0

# Zero pad the impulse response to create a signal that is long enough
added_zeros = np.zeros((specification_signals.shape[-1]-impulse_response.shape[0],) + impulse_response.shape[1:])
# Zero pad the impulse response to create a signal that is long enough for
# the specification signal
added_zeros = np.zeros((specification_signals.shape[-1]-impulse_response.shape[0],)
+ impulse_response.shape[1:])
full_impulse_response = np.concatenate((impulse_response,added_zeros),axis=0)

# Compute FRFs
# Compute FRFs using the FFT from the impulse response. This is now
# interpolated such that it matches the frequency spacing of the specification
# signal
interpolated_transfer_function = np.fft.rfft(full_impulse_response,axis=0)

# Perform convolution in frequency domain
# Perform convolution by frequency domain multiplication
signal_fft = np.fft.rfft(specification_signals,axis=-1)
# Invert the FRF matrix using the specified rcond parameter
inverted_frf = np.linalg.pinv(interpolated_transfer_function,rcond=rcond)
# Multiply the inverted FRFs by the response spectra to get the drive spectra
drive_signals_fft = np.einsum('ijk,ki->ij',inverted_frf,signal_fft)

# Zero pad the FFT to oversample
# Zero pad the drive FFT to oversample to the output_oversample_factor
drive_signals_fft_zero_padded = np.concatenate((drive_signals_fft[:-1],
np.zeros((drive_signals_fft[:-1].shape[0]*(output_oversample_factor-1)+1,)+drive_signals_fft.shape[1:])),axis=0)
np.zeros((drive_signals_fft[:-1].shape[0]*(output_oversample_factor-1)+1,)
+drive_signals_fft.shape[1:])),axis=0)

drive_signals_oversampled = np.fft.irfft(drive_signals_fft_zero_padded.T,axis=-1)*output_oversample_factor
# Finally, take the IFFT to get the time domain signal. We need to scale
# by the output_oversample_factor due to how the IFFT is normalized.
drive_signals_oversampled = np.fft.irfft(
drive_signals_fft_zero_padded.T,axis=-1)*output_oversample_factor
return drive_signals_oversampled

def pseudoinverse_control_generator():
Expand Down Expand Up @@ -153,17 +176,6 @@ def __init__(self,
last_excitation_signals = None, # Last excitation signal for drive-based control
last_response_signals = None, # Last response signal for error correction
):
"""
Initializes the control law
Parameters
----------
Returns
-------
None.
"""
self.rcond = 1e-15
self.zero_impulse_after = None
for entry in extra_parameters.split('\n'):
Expand Down Expand Up @@ -202,17 +214,6 @@ def system_id_update(self,
frames, # Number of frames in the CPSD and FRF matrices
total_frames, # Total frames that could be in the CPSD and FRF matrices
):
"""
Updates the control law with the data from the system identification
Parameters
----------
transfer_function : np.ndarray
A complex 3d numpy ndarray with dimensions frequency lines x control
channels x excitation sources representing the FRF matrix measured
by the system identification process between drive voltages and
control response
"""
# Compute impulse responses
impulse_response = np.fft.irfft(transfer_function,axis=0)

Expand All @@ -223,7 +224,8 @@ def system_id_update(self,
impulse_response[zero_indices] = 0

# Zero pad the impulse response to create a signal that is long enough
added_zeros = np.zeros((self.specification_signals.shape[-1]-impulse_response.shape[0],) + impulse_response.shape[1:])
added_zeros = np.zeros((self.specification_signals.shape[-1]-impulse_response.shape[0],)
+ impulse_response.shape[1:])
full_impulse_response = np.concatenate((impulse_response,added_zeros),axis=0)

# Compute FRFs
Expand All @@ -236,34 +238,16 @@ def system_id_update(self,

# Zero pad the FFT to oversample
drive_signals_fft_zero_padded = np.concatenate((drive_signals_fft[:-1],
np.zeros((drive_signals_fft[:-1].shape[0]*(self.output_oversample_factor-1)+1,)+drive_signals_fft.shape[1:])),axis=0)
np.zeros((drive_signals_fft[:-1].shape[0]*(self.output_oversample_factor-1)+1,)
+drive_signals_fft.shape[1:])),axis=0)

self.drive_signals_oversampled = np.fft.irfft(drive_signals_fft_zero_padded.T,axis=-1)*self.output_oversample_factor
self.drive_signals_oversampled = np.fft.irfft(
drive_signals_fft_zero_padded.T,axis=-1)*self.output_oversample_factor

def control(self,
last_excitation_signals = None, # Last excitation signal for drive-based control
last_response_signals = None, # Last response signal for error correction
) -> np.ndarray:
"""
Perform the control operations
Parameters
----------
last_excitation_signals : np.ndarray, optional
The most recent output signal, which can be used for error-based
control. The default is None.
last_response_signals : np.ndarray, optional
The most recent responses to the last output signals, which can be
used for error-based control. The default is None.
Returns
-------
output_signal : np.ndarray
A 2D numpy array consisting of number of outputs x signal samples *
output_oversample_factor. This signal will be played directly to the
shakers.
"""

) -> np.ndarray:
# We could modify the output signal based on new data that we obtained
# Otherwise just output the same

Expand Down

0 comments on commit c86e926

Please sign in to comment.