diff --git a/examples/calculate_invariants_position_longtrial.py b/examples/calculate_invariants_position_longtrial.py index 6a7c5a8..4568b78 100644 --- a/examples/calculate_invariants_position_longtrial.py +++ b/examples/calculate_invariants_position_longtrial.py @@ -48,8 +48,7 @@ #/*********************************************************************************************************************/ #/* Option 1: Calculate invariants using discretized analytical formulas #/*********************************************************************************************************************/ - -progress_step = np.mean(np.diff(timestamps))#.reshape(-1,1) +progress_step = np.mean(np.diff(timestamps)) from invariants_py import discretized_vector_invariants as dvi invariants_init, trajectory_init, moving_frames_init = dvi.calculate_discretized_invariants(trajectory, progress_step) diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py index db6de78..0f5454f 100644 --- a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py @@ -99,8 +99,8 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error def calculate_invariants(self,trajectory_meas,stepsize, choice_initialization=2): - from invariants_py.ocp_initialization import calculate_velocity_from_discrete_rotations, calculate_vector_invariants - from invariants_py.discretized_vector_invariants import calculate_moving_frames + from invariants_py.ocp_initialization import calculate_velocity_from_discrete_rotations + from invariants_py.discretized_vector_invariants import calculate_moving_frames, calculate_vector_invariants from invariants_py.dynamics_vector_invariants import reconstruct_rotation_traj from invariants_py.kinematics.screw_kinematics import average_vector_orientation_frame diff --git a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py index b816760..f756854 100644 --- a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py +++ b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py @@ -22,11 +22,12 @@ import rockit from invariants_py import ocp_helper, ocp_initialization from invariants_py.ocp_helper import check_solver -from invariants_py.dynamics_vector_invariants import integrate_vector_invariants_position +from invariants_py.dynamics_vector_invariants import integrate_vector_invariants_position, integrate_vector_invariants_position_seq +from invariants_py import discretized_vector_invariants as dvi class OCP_calc_pos: - def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, solver_options = {}, geometric=False): + def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, solver_options = {}, geometric=False, periodic=False): """ Initializes an instance of the RockitCalculateVectorInvariantsPosition class. It specifies the optimal control problem (OCP) for calculating the invariants of a trajectory in a symbolic way. @@ -76,15 +77,20 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b # Lower bounds on controls if bool_unsigned_invariants: - ocp.subject_to(invars[0,:]>=0) # velocity always positive - #ocp.subject_to(invars[1,:]>=0) # curvature rate always positive + ocp.subject_to(invars[0]>=0) # velocity always positive + #ocp.subject_to(invars[1]>=0) # curvature rate always positive + + #ocp.subject_to(-np.pi <= invars[1,:]*h) # curvature rate bounded by 1 + #ocp.subject_to(-np.pi <= invars[2,:]*h) # curvature rate bounded by 1 + #ocp.subject_to(invars[1,:]*h <= np.pi) # curvature rate bounded by 1 + #ocp.subject_to(invars[2,:]*h <= np.pi) # curvature rate bounded by 1 # Measurement fitting constraint # TODO: what about specifying the tolerance per sample instead of the sum? ek = cas.dot(p_obj - p_obj_m, p_obj - p_obj_m) # squared position error if not fatrop_solver: # sum of squared position errors in the window should be less than the specified tolerance rms_error_traj - total_ek = ocp.sum(ek,grid='control',include_last=True) + total_ek = ocp.sum(ek,grid='control',include_last=True) ocp.subject_to(total_ek < (N*rms_error_traj**2)) #ocp.subject_to(ek < 0.01**2) @@ -103,11 +109,16 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b #ocp.subject_to(p_obj - p_obj_m < np.array((0.0001,0.0001,0.0001))) # squared position error at each sample should be less than the specified tolerance rms_error_traj - # TODO this is still needed because last sample is not included in the sum now - #total_ek = ocp.state() # total sum of squared error - #ocp.set_next(total_ek, total_ek) - #ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek)) - # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error + #ocp.subject_to(ek < rms_error_traj**2) # squared position error should be less than the specified tolerance rms_error_traj + + # # TODO this is still needed because last sample is not included in the sum now + # #total_ek = ocp.state() # total sum of squared error + # #ocp.set_next(total_ek, total_ek) + # #ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek)) + # # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error + + #total_ek = ocp.sum(ek,grid='control',include_last=True) + #ocp.add_objective(total_ek/N) #ocp.subject_to(total_ek/N < rms_error_traj**2) #total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error @@ -128,6 +139,12 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b L = ocp.state() # introduce extra state L for the speed ocp.set_next(L, L) # enforce constant speed ocp.subject_to(invars[0] - L == 0, include_last=False) # relate to first invariant + + #if periodic: + # ocp.subject_to(ocp.at_tf(p_obj) - ocp.at_t0(p_obj) == 0) + # # Periodic boundary conditions (optional) + # ocp.subject_to(ocp.at_tf(R_t) == ocp.at_t0(R_t)) + """ Objective function """ @@ -232,7 +249,7 @@ def calculate_invariants(self, measured_positions, stepsize, use_previous_soluti # print(measured_positions.T) # print(stepsize) - # print(*self.values_variables) + # print(*self.values_variables)False self.values_variables = self.ocp_function(measured_positions.T, stepsize, *self.values_variables) diff --git a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_cte.py b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_cte.py index 8567e98..60ee08b 100644 --- a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_cte.py +++ b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_cte.py @@ -9,14 +9,11 @@ class OCP_calc_pos: def __init__(self, window_len=100, - w_pos = 1, w_rot = 1, - w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]), - w_abs = (10**-10)*np.array([1.0, 1.0]), bool_unsigned_invariants = False, planar_task = False, - geometric = False, fatrop_solver = True, solver_options = {}): + # TODO change "planar_task" to "planar_trajectory" # TODO change bool_unsigned_invariants to positive_velocity @@ -46,7 +43,7 @@ def __init__(self, # System dynamics (integrate current states + controls to obtain next states) # this relates the states/controls over the whole window - (R_t_plus1, p_obj_plus1) = integrate_vector_invariants_position_seq(R_t, p_obj, invars, h) + (R_t_plus1, p_obj_plus1) = integrate_vector_invariants_position(R_t, p_obj, invars, h) ocp.set_next(p_obj,p_obj_plus1) ocp.set_next(R_t,R_t_plus1) @@ -120,9 +117,9 @@ def __init__(self, """ Objective function """ # Minimize moving frame invariants to deal with singularities and noise - objective_reg = ocp.sum(cas.dot(invars[:3],invars[:3])) - objective = 10e-10*objective_reg/(N-1) - ocp.add_objective(objective) + # objective_reg = ocp.sum(cas.dot(invars[:3],invars[:3])) + # objective = 10e-10*objective_reg/(N-1) + # ocp.add_objective(objective) # objective_reg2 = ocp.sum(cas.dot(invars_deriv,invars_deriv)) # objective_reg2_scaled = 0.000001*objective_reg2/(N-1) diff --git a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py index 8de95c7..22cfb32 100644 --- a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py +++ b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py @@ -1,22 +1,3 @@ -''' -This class is used to calculate the invariants of a measured position trajectory using the Rockit optimal control problem (OCP) framework. - -The invariants are calculated by finding the invariant control inputs that reconstruct a trajectory such that it lies within a specified tolerance from the measured trajectory, while minimizing the curvature and torsion rate of the trajectory to deal with noise and singularities. - -Usage: - # Example data (helix) - N = 100 - t = np.linspace(0, 4, N) - measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) - stepsize = t[1]-t[0] - - # Specify optimal control problem (OCP) - OCP = OCP_calc_pos(window_len=N, rms_error_traj=10**-3) - - # Calculate invariants - invariants,trajectory,moving-frames = OCP.calculate_invariants(measured_positions, stepsize) -''' - import numpy as np import casadi as cas import rockit @@ -26,18 +7,17 @@ class OCP_calc_pos: - def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, geometric = False, solver_options = {}): - """ - Initializes an instance of the RockitCalculateVectorInvariantsPosition class. - It specifies the optimal control problem (OCP) for calculating the invariants of a trajectory in a symbolic way. + def __init__(self, + window_len=100, + w_pos = 1, w_rot = 1, + w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]), + w_abs = (10**-10)*np.array([1.0, 1.0]), + fatrop_solver=False, + bool_unsigned_invariants=False, + planar_task=False, + geometric = False, + solver_options = {}): - Args: - window_len (int, optional): The length of the window of trajectory measurmeents in the optimization problem. Defaults to 100. - rms_error_traj (float, optional): The tolerance for the squared RMS error of the trajectory. Defaults to 10**-2. - fatrop_solver (bool, optional): Flag indicating whether to use the Fatrop solver. Defaults to False. - bool_unsigned_invariants (bool, optional): Flag indicating whether to enforce unsigned invariants. Defaults to False. - planar_task (bool, optional): Flag indicating whether the task is planar. Defaults to False. - """ # TODO change "planar_task" to "planar_trajectory" # TODO change bool_unsigned_invariants to positive_velocity @@ -84,29 +64,9 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b # Measurement fitting constraint # TODO: what about specifying the tolerance per sample instead of the sum? - ek = cas.dot(p_obj - p_obj_m, p_obj - p_obj_m) # squared position error - if not fatrop_solver: - # sum of squared position errors in the window should be less than the specified tolerance rms_error_traj - total_ek = ocp.sum(ek,grid='control',include_last=True) - ocp.subject_to(total_ek/N < rms_error_traj**2) - #else: - # # Fatrop does not support summing over grid points inside the constraint, so we implement it differently to achieve the same result - # running_ek = ocp.state() # running sum of squared error - # ocp.subject_to(ocp.at_t0(running_ek == 0)) - # ocp.set_next(running_ek, running_ek + ek) - # ocp.subject_to(ocp.at_tf(1000*running_ek/N < 1000*rms_error_traj**2)) # scaling to avoid numerical issues in fatrop - - # # TODO this is still needed because last sample is not included in the sum now - # total_ek = ocp.state() # total sum of squared error - # ocp.set_next(total_ek, total_ek) - # ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek)) - # # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error - # ocp.subject_to(1000*total_ek/N < 1000*rms_error_traj**2) - # #total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error - # #ocp.subject_to(ocp.at_tf(total_ek_scaled < 1)) - - #ocp.subject_to(ek < rms_error_traj**2) + + #total_ek = ocp.sum(ek,grid='control',include_last=True # Boundary conditions (optional, but may help to avoid straight line fits) #ocp.subject_to(ocp.at_t0(p_obj == p_obj_m)) # fix first position to measurement #ocp.subject_to(ocp.at_tf(p_obj == p_obj_m)) # fix last position to measurement @@ -126,16 +86,17 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b """ Objective function """ # Minimize moving frame invariants to deal with singularities and noise - objective_reg = ocp.sum(cas.dot(invars[1:3],invars[1:3])) - objective = 0.000001*objective_reg/(N-1) - ocp.add_objective(objective) - - objective_reg2 = ocp.sum(cas.dot(invars_deriv,invars_deriv)) - objective_reg2_scaled = 0.000001*objective_reg2/(N-1) - ocp.add_objective(objective_reg2_scaled) + deriv_invariants_weighted = w_deriv**(0.5)*invars_deriv + cost_deriv_invariants = ocp.sum(cas.dot(deriv_invariants_weighted,deriv_invariants_weighted))/(N-1) + ocp.add_objective(cost_deriv_invariants) + + abs_invariants_weighted = w_abs**(0.5)*invars[1:3] + cost_absolute_invariants = ocp.sum(cas.dot(abs_invariants_weighted,abs_invariants_weighted))/(N-1) + ocp.add_objective(cost_absolute_invariants) - objective_fit = ocp.sum(ek, include_last=True) - ocp.add_objective(objective_fit) + pos_error_weighted = w_pos**(0.5)*(p_obj_m - p_obj) + cost_fitting = ocp.sum(cas.dot(pos_error_weighted, pos_error_weighted),grid='control',include_last=True)/N + ocp.add_objective(cost_fitting) """ Solver definition """ @@ -289,18 +250,18 @@ def calculate_invariants_OLD(self, measured_positions, stepsize): # Example data for measured positions and stepsize N = 100 t = np.linspace(0, 4, N) - measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) + measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) + np.random.normal(0, 0.001, (N, 3)) stepsize = t[1]-t[0] # Test the functionalities of the class - OCP = OCP_calc_pos(window_len=N, rms_error_traj=10**-3, fatrop_solver=True) + OCP = OCP_calc_pos(window_len=N, fatrop_solver=True) # Call the calculate_invariants function and measure the elapsed time #start_time = time.time() - calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants_OLD(measured_positions, stepsize) + calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants(measured_positions, stepsize) #elapsed_time = time.time() - start_time - ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = 10**-3) + #ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = 10**-3) fig = plt.figure() ax = fig.add_subplot(111, projection='3d') diff --git a/invariants_py/discretized_vector_invariants.py b/invariants_py/discretized_vector_invariants.py index 360834b..7779b85 100644 --- a/invariants_py/discretized_vector_invariants.py +++ b/invariants_py/discretized_vector_invariants.py @@ -141,15 +141,15 @@ def calculate_binormal(vector_traj,tangent, tolerance_singularity, reference_vec return binormal -def calculate_moving_frames(vector_traj, tolerance_singularity = 1e-2): +def calculate_moving_frames(vector_traj, tolerance_singularity_vel = 1e-2, tolerance_singularity_curv = 1e-2): N = np.size(vector_traj, 0) # Calculate first axis - e_tangent = calculate_tangent(vector_traj, tolerance_singularity) + e_tangent = calculate_tangent(vector_traj, tolerance_singularity_vel) # Calculate third axis - e_binormal = calculate_binormal(vector_traj,e_tangent,tolerance_singularity,reference_vector=np.array([0,0,1])) + e_binormal = calculate_binormal(vector_traj,e_tangent,tolerance_singularity_curv,reference_vector=np.array([0,0,1])) # Calculate second axis as cross product of third and first axis e_normal = np.array([ np.cross(e_binormal[i,:],e_tangent[i,:]) for i in range(N) ]) @@ -214,7 +214,7 @@ def calculate_vector_invariants(R_mf_traj,vector_traj,progress_step): return invariants -def calculate_discretized_invariants(measured_positions, progress_step, tolerance_singularity = 1e0): +def calculate_discretized_invariants(measured_positions, progress_step, tolerance_singularity_vel = 1e-1, tolerance_singularity_curv = 1e-2): """ Calculate the vector invariants of a measured position trajectory based on a discrete approximation of the moving frame. @@ -236,7 +236,7 @@ def calculate_discretized_invariants(measured_positions, progress_step, toleranc #print(vel_vector) # Calculate the moving frame - R_mf_traj = calculate_moving_frames(vel_vector, tolerance_singularity) + R_mf_traj = calculate_moving_frames(vel_vector, tolerance_singularity_vel, tolerance_singularity_curv) #print(R_mf_traj) # Calculate the invariants diff --git a/invariants_py/ocp_initialization.py b/invariants_py/ocp_initialization.py index e74e4b2..5a7941a 100644 --- a/invariants_py/ocp_initialization.py +++ b/invariants_py/ocp_initialization.py @@ -1,7 +1,7 @@ import numpy as np import invariants_py.kinematics.orientation_kinematics as SO3 from invariants_py.reparameterization import interpR -from invariants_py.discretized_vector_invariants import calculate_vector_invariants, calculate_velocity_from_discrete_rotations +from invariants_py.discretized_vector_invariants import calculate_discretized_invariants, calculate_velocity_from_discrete_rotations def initial_trajectory_movingframe_rotation(R_obj_start,R_obj_end,N=100): @@ -204,26 +204,31 @@ def estimate_initial_frames(vector_traj): def initialize_VI_pos2(measured_positions,stepsize): N = np.size(measured_positions,0) - Pdiff = np.diff(measured_positions, axis=0) - Pdiff = np.vstack((Pdiff, Pdiff[-1])) + invariants, meas_pos, mf = calculate_discretized_invariants(measured_positions,stepsize) + + #Pdiff = np.diff(measured_positions, axis=0) + #Pdiff = np.vstack((Pdiff, Pdiff[-1])) - [ex,ey,ez] = estimate_initial_frames(Pdiff) + #[ex,ey,ez] = estimate_initial_frames(Pdiff) - R_t_init2 = np.zeros((N,3,3)) - for i in range(N): - R_t_init2[i,:,:] = np.column_stack((ex[i,:],ey[i,:],ez[i,:])) + #R_t_init2 = np.zeros((N,3,3)) + #for i in range(N): + # R_t_init2[i,:,:] = np.column_stack((ex[i,:],ey[i,:],ez[i,:])) #print(R_t_init2) - invars = calculate_vector_invariants(R_t_init2,Pdiff,stepsize) + 1e-12*np.ones((N,3)) + #invars = calculate_vector_invariants(R_t_init2,Pdiff,stepsize) + 1e-12*np.ones((N,3)) #print(invars) R_t_init = np.zeros((9,N)) for i in range(N): - R_t_init[:,i] = np.hstack([ex[i,:],ey[i,:],ez[i,:]]) + R_t_init[:,i] = np.hstack([mf[i,:,0],mf[i,:,1],mf[i,:,2]]) + #print(R_t_init) + p_obj_sol = measured_positions.T + #invars = np.vstack((1e0*np.ones((1,N-1)),1e-1*np.ones((1,N-1)), 1e-12*np.ones((1,N-1)))) - return [invars[:-1,:].T, p_obj_sol, R_t_init] + return [invariants[:-1,:].T, p_obj_sol, R_t_init] def initialize_VI_rot(input_trajectory):