Page MenuHomec4science

custom_functions.py
No OneTemporary

File Metadata

Created
Tue, May 14, 23:19

custom_functions.py

import gym
import numpy as np
import pandas as pd
import torch
from functools import partial
import matplotlib.pyplot as plt
def angle_normalize_2pi(x):
return (((x+2*np.pi) % (4*np.pi)) - 2*np.pi) # To keep the angles between -2pi and 2pi
def angle_normalize_pi(x):
return (((x+np.pi) % (2*np.pi)) - np.pi) # To keep the angles between -pi and pi
def env_fn(env_name = 'gyroscopeenv-v0', reward_type = None, reward_args = None, ep_len = None):
if reward_type is not None:
env = gym.make(env_name)
env.args_init(reward_type, reward_args, ep_len)
return env
else: return gym.make(env_name)
def curr_from_seq(obs,i,x1_ref_seq,x3_ref_seq, w_seq):
# Change to current from sequence if needed
if x1_ref_seq is not None:
obs[4] = x1_ref_seq[i]
if x3_ref_seq is not None:
obs[5] = x3_ref_seq[i]
if w_seq is not None:
obs[6] = w_seq[i]
return obs
def step_info(time,x,x_ref,ss_bound):
# Starting from beginning, first to reach 0.9 of steady state value
idx_rise = np.nan
for i in range(0,len(x)-1):
if abs(angle_normalize_pi(x[i])-angle_normalize_pi(x[0]))/abs(angle_normalize_pi(x[-1])-angle_normalize_pi(x[0]))>0.9:
idx_rise = i
break
if idx_rise is np.nan:
t_rise = np.nan
else:
t_rise = time[idx_rise]-time[0]
# Settling time: starting from the end, first to be out of bounds
idx_set = np.nan
for i in range(2,len(x)-1):
if abs(angle_normalize_pi(x[-i]-x_ref))>ss_bound:
idx_set = len(x)-i
break
if idx_set is np.nan:
t_set = np.nan
else:
t_set = time[idx_set]-time[0]
return t_rise,t_set
def test_agent(env_name,reward_type,reward_args,seed,agent_path,state,t_end,ep_len,x1_ref_seq = None ,x3_ref_seq = None, w_seq = None,param = None, is_noise=False):
# Workaround to still use trick of passing args to gym env
def env_fn(env_name = 'gyroscopeenv-v0', reward_type = None, reward_args = None, ep_len = None, is_noise=False):
if reward_type is not None:
env = gym.make(env_name)
env.args_init(reward_type, reward_args, ep_len, is_noise)
return env
else: return gym.make(env_name)
# Create environment
env_fn = partial(env_fn,env_name,reward_type,reward_args,ep_len, is_noise)
env = env_fn()
env.seed(seed)
dt,eval_per_dt,maxVoltage = env.access_members()
if env_name == 'gyroscoperobustenv-v0':
env.init_param(param)
# Create agent
if agent_path == 'linearized controller':
agent = 'linearized controller'
else:
agent_fullpath = agent_path + '/pyt_save/model.pt'
agent = torch.load(agent_fullpath)
# Run trial
r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act = run_trial(env,dt,eval_per_dt,agent,state,t_end,ep_len,x1_ref_seq,x3_ref_seq, w_seq)
print("Total cumulative reward: {}".format(score))
return r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act
def run_trial(env,dt,eval_per_dt,agent,state,t_end,ep_len,x1_ref_seq = None,x3_ref_seq = None, w_seq = None):
# Env params
dt,eval_per_dt,maxVoltage = env.access_members()
# Initial state and obs
if state is None:
obs = env.reset()
state = env.access_state()
x1,x2,x3,x4,x1_ref,x3_ref = state[0:6]
else:
x1,x2,x3,x4,x1_ref,x3_ref,w = state
obs = env.reset2state(state)
# Start storing
x1_eval = [x1]
x2_eval = [x2]
x3_eval = [x3]
x4_eval = [x4]
x1_ref_eval = [x1_ref]
x3_ref_eval = [x3_ref]
act = []
r = []
score = 0
# Start simulation
time = np.arange(0, t_end, dt)
for i in range(len(time)):
# Change state if references or w are not constant (i.e. a sequence)
state = curr_from_seq(state,i,x1_ref_seq,x3_ref_seq, w_seq)
env.reset2state(state)
# Get action from agent
if agent == 'linearized controller':
action = lin_control(state,3,3,3) # With poles at 5 like in Agram's report
else:
action = agent.act(torch.as_tensor(obs, dtype=torch.float32))
# Perform step in environment
obs, reward, done, info= env.step(action)
state = info['state']
print(obs)
# Store results
if i is 0:
act = np.full((eval_per_dt,2),action)
else:
act = np.append(act,np.full((eval_per_dt-1,2),maxVoltage*action),0)
x1_eval = np.append(x1_eval,info['x1_eval'][1:])
x2_eval = np.append(x2_eval,info['x2_eval'][1:])
x3_eval = np.append(x3_eval,info['x3_eval'][1:])
x4_eval = np.append(x4_eval,info['x4_eval'][1:])
x1_ref_eval = np.append(x1_ref_eval,info['x1_ref_eval'][1:])
x3_ref_eval = np.append(x3_ref_eval,info['x3_ref_eval'][1:])
r.append(reward)
score += reward
if done:
break
# Close env and return
env.close()
return r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act
def plot_test(x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act,t_end):
time = np.linspace(0, t_end, len(x1_eval))
f, axs = plt.subplots(4,2,figsize=(30,30))
plt.subplot(4,2,1)
plt.title('Red gimbal angle',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\theta$ (deg)',fontsize=16)
plt.grid()
plt.plot(time,180*angle_normalize_pi(x1_eval)/np.pi,'r-')
plt.plot(time, 180*x1_ref_eval/np.pi, color='green', linestyle='dashed')
#plt.plot(time, np.full(len(x1_eval),180), '-', color='black')
#plt.plot(time, np.full(len(x1_eval),-180), '-', color='black')
plt.subplot(4,2,2)
plt.title('Blue gimbal angle',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\phi$ (deg)',fontsize=16)
plt.grid()
plt.plot(time,180*angle_normalize_pi(x3_eval)/np.pi,'b-')
plt.plot(time, 180*x3_ref_eval/np.pi, color='green', linestyle='dashed')
#plt.plot(time, np.full(len(x1_eval),180), '-', color='black')
#plt.plot(time, np.full(len(x1_eval),-180), '-', color='black')
plt.subplot(4,2,3)
plt.title('Red gimbal speed',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\dot \theta$ (rad/s)',fontsize=16)
plt.grid()
plt.plot(time,x2_eval,'r-')
plt.subplot(4,2,4)
plt.title('Blue gimbal speed',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\dot \phi$ (rad/s)',fontsize=16)
plt.grid()
plt.plot(time,x4_eval,'b-')
plt.subplot(4,2,5)
plt.title('Red gimbal tracking error',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\theta$ error (deg)',fontsize=16)
plt.grid()
plt.plot(time,180*angle_normalize_pi(x1_eval- x1_ref_eval)/np.pi,'r-')
#plt.plot(time, np.full(len(x1_eval),180), '-', color='black')
#plt.plot(time, np.full(len(x1_eval),-180), '-', color='black')
plt.subplot(4,2,6)
plt.title('Blue gimbal tracking error',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'$\phi$ error (deg)',fontsize=16)
plt.grid()
plt.plot(time,180*angle_normalize_pi(x3_eval- x3_ref_eval)/np.pi,'b-')
#plt.plot(time, np.full(len(x1_eval),180), '-', color='black')
#plt.plot(time, np.full(len(x1_eval),-180), '-', color='black')
plt.subplot(4,2,7)
plt.title('Red gimbal input',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'u1 (V)',fontsize=16)
plt.grid()
plt.plot(time,act[:,0],'r-')
plt.subplot(4,2,8)
plt.title('Blue gimbal input',fontsize=20)
plt.xlabel('time (s)',fontsize=16)
plt.ylabel(r'u2 (V)',fontsize=16)
plt.grid()
plt.plot(time,act[:,1],'b-')
plt.show()
return None
def evaluate_control(env_name,agent_path,ss_bound):
# Creat environment
seed = 0
env = gym.make(env_name) # default env
env.seed(seed)
dt,eval_per_dt,maxVoltage = env.access_members()
ep_len = 110
t_end = dt*ep_len
# Create agent
# Create agent
if agent_path == 'linearized controller':
agent = 'linearized controller'
else:
agent_fullpath = agent_path + '/pyt_save/model.pt'
agent = torch.load(agent_fullpath)
#agent_fullpath = agent_path + '/pyt_save/model.pt'
#agent_logpath = agent_path + '/progress.txt'
# Experiments setup
num_exp = 200
df_header = ['Config.','$\theta$ MAE (rad)','$\phi$ MAE (rad)','$\theta$ MSSE (rad)','$\phi$ MSSE (rad)','$\theta$ in bounds (%)','$\phi$ in bounds (%)','$\theta$ unsteady (%)','$\phi$ unsteady (%)','$\theta$ rise time (s)','$\phi$ rise time (s)','$\theta$ settling time (s)','$\phi$ settling time (s)','u1 (V)','u2 (V)','u1 variation (V)','u2 variation (V)','Convergence time (min)']
num_metrics = len(df_header)-1
# Initialize mean metrics
all_metrics = np.zeros((num_metrics,num_exp))
# Experiments rollout
for i in range(num_exp):
# Generate random init state
seed = np.random.randint(0,20)
env.seed(seed)
env.reset()
state = None #env.access_state()
#state[0] =0
#state[1] =0
#state[2] =0
#state[3] =0
state0 = state
# Run trial
r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act = run_trial(env,dt,eval_per_dt,agent,state,t_end,ep_len)
# Evaluate control metrics for that run
metrics = control_metrics(r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act,ss_bound,t_end)
# Put in all_metrics table
for idx in range(len(metrics)):
all_metrics[idx,i] = metrics[idx]
# Extract metrics from logger
#log = pd.read_csv(agent_logpath, sep="\t")
conv_time = 0#log['Time'].iloc[-1]/60
# Compute mean of metrics arrays
mean_metrics = {df_header[0]: [agent_path],
df_header[1]:[np.mean(all_metrics[0,:])],
df_header[2]:[np.mean(all_metrics[1,:])],
df_header[3]:[np.nanmean(all_metrics[2,:])],
df_header[4]:[np.nanmean(all_metrics[3,:])],
df_header[5]:[(100*np.mean(all_metrics[4,:]))],
df_header[6]:[(100*np.mean(all_metrics[5,:]))],
df_header[7]:[(100*np.mean(all_metrics[6,:]))],
df_header[8]:[(100*np.mean(all_metrics[7,:]))],
df_header[9]:[np.nanmean(all_metrics[8,:])],
df_header[10]:[np.nanmean(all_metrics[9,:])],
df_header[11]:[np.nanmean(all_metrics[10,:])],
df_header[12]:[np.nanmean(all_metrics[11,:])],
df_header[13]:[np.mean(all_metrics[12,:])],
df_header[14]:[np.mean(all_metrics[13,:])],
df_header[15]:[np.mean(all_metrics[14,:])],
df_header[16]:[np.mean(all_metrics[15,:])],
df_header[17]:[conv_time]}
# Put in panda dataframe
df = pd.DataFrame (mean_metrics, columns = df_header)
df.set_index('Config.', inplace=True)
return df
def control_metrics(r,score,x1_eval,x2_eval,x3_eval,x4_eval,x1_ref_eval,x3_ref_eval,act,ss_bound,t_end):
# Time vector
time = np.linspace(0, t_end, len(x1_eval))
# Check convergence to steady state
conv_thresh = 0.025
x1_arr_diff = np.abs(np.diff(x1_eval))
x3_arr_diff = np.abs(np.diff(x3_eval))
x1_conv = np.all(x1_arr_diff[int(0.75*len(x1_arr_diff)):] <= conv_thresh)
x3_conv = np.all(x3_arr_diff[int(0.75*len(x3_arr_diff)):] <= conv_thresh)
# Independent of convergence to steady state
abs_err_x1 = np.mean(np.abs(angle_normalize_pi(x1_eval-x1_ref_eval)))
abs_err_x3 = np.mean(np.abs(angle_normalize_pi(x3_eval-x3_ref_eval)))
control_mag_u1 = np.mean(np.abs(act[:,0]))
control_mag_u2 = np.mean(np.abs(act[:,1]))
control_diff_u1 = np.mean(np.abs(np.diff(act[:,0])))
control_diff_u2 = np.mean(np.abs(np.diff(act[:,1])))
# Dependent of convergence to steady state
x1_in_bound,x3_in_bound = 0,0
ss_err_x1,ss_err_x3 = np.nan,np.nan # use nans and then np.nanmean to ignore the values not in bounds
x1_us,x3_us = 1,1
rise_time_x1,settling_time_x1,rise_time_x3,settling_time_x3 = np.nan,np.nan,np.nan,np.nan
# Check convergence on x1
if x1_conv:
# Since it does converge, replace true x1 ss error and put unsteady to 0
ss_err_x1 = np.abs(angle_normalize_pi(x1_eval[-1]-x1_ref_eval[-1]))
x1_us = 0
# Check if x1 ss error is in bounds, otherwise no use in computing the metrics below
if ss_err_x1 <= ss_bound:
x1_in_bound = 1
rise_time_x1,settling_time_x1 = step_info(time,x1_eval,x1_ref_eval[0],ss_bound)
# Check convergence on x3
if x3_conv:
# Since it does converge, replace true x3 ss error and put unsteady to 0
ss_err_x3 = np.abs(angle_normalize_pi(x3_eval[-1]-x3_ref_eval[-1]))
x3_us = 0
# Check if x3 ss error is in bounds, otherwise no use in computing the metrics below
if ss_err_x3 <= ss_bound:
x3_in_bound = 1
rise_time_x3,settling_time_x3 = step_info(time,x3_eval,x3_ref_eval[0],ss_bound)
metrics = [abs_err_x1,abs_err_x3,ss_err_x1,ss_err_x3,x1_in_bound,x3_in_bound,x1_us,x3_us,rise_time_x1,rise_time_x3,settling_time_x1,settling_time_x3,control_mag_u1,control_mag_u2,control_diff_u1,control_diff_u2]
return metrics
def lin_control(x,sig_th,sig_phi,sig_psid):
# Constants from the controller
fvr = 0.002679
fvb = 0.005308
Jbx1 = 0.0019
Jbx2 = 0.0008
Jbx3 = 0.0012
Jrx1 = 0.0179
Jdx1 = 0.0028
Jdx3 = 0.0056
Kamp = 0.5
Ktorque = 0.0704
eff = 0.86
nRed = 1.5
nBlue = 1
KtotRed = Kamp*Ktorque*eff*nRed
KtotBlue = Kamp*Ktorque*eff*nBlue
# Extract states
x1,x2,x3,x4,x1_ref,x3_ref,x6 = x
# Speed refs at 0
x2_ref,x4_ref,x6_ref = 0,0,x6
# Controls
u1 = 0.5*((Jbx1+Jbx3+Jdx1+Jdx3+2*Jrx1+Jbx1*np.cos(2*x3)-Jbx3*np.cos(2*x3)+Jdx1*np.cos(2*x3)-Jdx3*np.cos(2*x3))*sig_th**2*x1_ref
+2*x6_ref*Jdx3*sig_psid*np.sin(x3)-sig_th**2*(Jbx1+Jbx3+Jdx1+Jdx3+2*Jrx1+(Jbx1-Jbx3+Jdx1-Jdx3)*np.cos(2*x3))*x1
+2*x2*(fvr-Jbx1*sig_th-Jbx3*sig_th-Jdx1*sig_th-Jdx3*sig_th-2*Jrx1*sig_th-(Jbx1-Jbx3+Jdx1-Jdx3)*sig_th*np.cos(2*x3)-(Jbx1-Jbx3+Jdx1-Jdx3)*np.sin(2*x3)*x4)
-2*Jdx3*sig_psid*np.sin(x3)*x6+2*Jdx3*np.cos(x3)*x4*x6)
u2 = -(Jbx2+Jdx1)*(-sig_phi*(x3_ref*sig_phi-sig_phi*x3-2*x4)+((-Jbx1+Jbx3-Jdx1+Jdx3)*np.cos(x3)*np.sin(x3)*x2*x2-fvb*x4+Jdx3*np.cos(x3)*x2*x6)/(Jbx2+Jdx1))
# Action
action = np.array([u1/(10*KtotRed),u2/(10*KtotBlue)])
return action

Event Timeline