Page MenuHomec4science

Triplet.py
No OneTemporary

File Metadata

Created
Thu, Feb 13, 00:28

Triplet.py

# -*- coding: utf-8 -*-
"""
Created on Tue Dec 26 07:14:15 2023
@author: srpv
contact: vigneashwara.solairajapandiyan@empa.ch
contact: vigneashwara.pandiyan@tii.ae
The codes in this following script will be used for the publication of the following work
"Dynamics of in-situ alloying of Ti6Al4V-Fe by means of acoustic emission monitoring
supported by operando synchrotron X-ray diffraction"
@any reuse of this code should be authorized by the first owner, code author
"""
# %%
# Libraries to import
import os
import torch
import random
import numpy as np
import pandas as pd
import torch.optim as optim
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from torchvision import transforms
from sklearn.model_selection import train_test_split # implementing train-test-split
from Utils.utils import *
from Utils.Network import *
from Utils.Dataloader import *
from Utils.Loss import *
from Utils.Classifier import *
from matplotlib import animation
''' This script is based on contrastive learners - with Triplet loss as its back bone
--> The idea is to avoid manual feature extraction in the dataspace and use Triplet loss to compute discrete latent spaces across the dataspaces'''
# %%
# folder creation to store latent spaces, graphs and figure.
file = os.path.join(os.getcwd(), os.listdir(os.getcwd())[0])
total_path = os.path.dirname(file)
print(total_path)
# %% Inputs to be added
embedding_dims = 16 # The latent embedding that will be computed
batch_size = 300
epochs = 100
dataset = 'CM' # for KM dataset change to KM
path = r'C:\Users\srpv\Desktop\Sinergia Offline\Reza\Code\Pre_processed Dataset'
folder = dataset+'_Figures'
folder = os.path.join(total_path, folder)
print(folder)
try:
os.makedirs(folder, exist_ok=True)
print("Directory created....")
except OSError as error:
print("Directory already exists....")
# %% Loading of the dataset
dataset_name = path+'/'+dataset+'_' + 'Rawspace'+'.npy'
print(dataset_name)
rawspace = np.load(dataset_name).astype(np.float64)
dataset_label = path+'/'+dataset+'_' + 'Class'+'_' + 'label'+'.npy'
print(dataset_label)
classspace = np.load(dataset_label).astype(np.int64)
classspace = np.expand_dims(classspace, axis=1)
# %%
# Some intialization and random seeding for the torch library.
torch.manual_seed(2020)
np.random.seed(2020)
random.seed(2020)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type == "cuda":
torch.cuda.get_device_name()
print('Using device:', device) # To check whether ther is GPU
# %%
# Blocks for Loading some datasets
X_train, X_test, y_train, y_test = train_test_split(
rawspace, classspace, test_size=0.20, random_state=66)
Training = np.concatenate((y_train, X_train), axis=1)
Testing = np.concatenate((y_test, X_test), axis=1)
train_df = pd.DataFrame(Training)
test_df = pd.DataFrame(Testing)
train_df.head()
train_df.head()
# %%
# Data loader block
train_ds = Triplet_loader(train_df,
train=True,
transform=transforms.Compose([
transforms.ToTensor()
]))
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
test_ds = Triplet_loader(test_df,
train=False,
transform=transforms.Compose([
transforms.ToTensor()
]))
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=0)
# %%
# Block for training the network
model = Network(embedding_dims)
model.apply(init_weights)
model = torch.jit.script(model).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = torch.jit.script(TripletLoss())
model.train()
running_loss = []
Training_loss = []
Training_loss_mean = []
Training_loss_std = []
for epoch in tqdm(range(epochs), desc="Epochs"):
total_loss = 0
iterations = 0
epoch_smoothing = []
for step, (anchor_img, positive_img, negative_img, anchor_label) in enumerate(tqdm(train_loader, desc="Training", leave=False)):
#torch.Size([100, 1, 5000])
anchor_img = anchor_img.to(device, dtype=torch.float)
anchor_img = anchor_img.unsqueeze(1)
# print(anchor_img.shape)
positive_img = positive_img.to(device, dtype=torch.float)
positive_img = positive_img.unsqueeze(1)
# print(positive_img.shape)
negative_img = negative_img.to(device, dtype=torch.float)
negative_img = negative_img.unsqueeze(1)
# print(negative_img.shape)
optimizer.zero_grad()
anchor_out = model(anchor_img)
positive_out = model(positive_img)
negative_out = model(negative_img)
loss = criterion(anchor_out, positive_out, negative_out)
loss.backward()
optimizer.step()
running_loss.append(loss.cpu().detach().numpy())
epoch_smoothing.append(loss.cpu().detach().numpy())
total_loss += loss
iterations = iterations+1
loss_train = total_loss/len(train_loader)
Training_loss.append(loss_train.cpu().detach().numpy())
Training_loss_mean.append(np.mean(epoch_smoothing))
Training_loss_std.append(np.std(epoch_smoothing))
print("Epoch: {}/{} - Loss: {:.4f}".format(epoch+1, epochs, loss_train))
torch.save({"model_state_dict": model.state_dict(),
"optimzier_state_dict": optimizer.state_dict()
}, folder+'/'+dataset+"_trained_model.pth") # Saving the model
'''
The Network training ends here.
'''
# %%
# Visualization of the learning curves
Training_loss = np.asarray(Training_loss)
Training_loss_mean = np.asarray(Training_loss_mean)
Training_loss_std = np.asarray(Training_loss_std)
plt.rcParams.update({'font.size': 15})
plt.figure(1)
plt.plot(running_loss, 'b--', linewidth=2.0)
plt.title('Iteration vs Loss value')
plt.xlabel('Iteration')
plt.ylabel('Loss value')
plt.savefig(os.path.join(folder, '_Training loss.png'), dpi=600, bbox_inches='tight')
plt.show()
fig, ax = plt.subplots()
plt.plot(Training_loss, 'g', linewidth=2.0)
ax.fill_between(Training_loss, Training_loss_mean - Training_loss_std,
Training_loss_mean + Training_loss_std, alpha=0.9)
ax.legend(['Training loss'])
plt.title('Training loss')
plt.xlabel('Epoch')
plt.ylabel('Loss value')
plt.savefig(os.path.join(folder, '_Training loss average.png'), dpi=600, bbox_inches='tight')
plt.show()
# %%
# Computing of the Embeddings for the train datset and test dataset.
train_results = []
labels = []
model.eval()
with torch.no_grad():
for img, _, _, label in tqdm(train_loader):
print(img.shape)
img = img.unsqueeze(1)
print(img.shape)
# img=img.unsqueeze(1)
train_results.append(model(img.to(device, dtype=torch.float)).cpu().numpy())
labels.append(label)
train_results = np.concatenate(train_results)
train_labels = np.concatenate(labels)
train_results.shape
train_embeddings = dataset+'_train_embeddings'+'_' + '.npy'
train_labelsname = dataset+'_train_labels'+'_'+'.npy'
np.save(os.path.join(folder, train_embeddings), train_results, allow_pickle=True)
np.save(os.path.join(folder, train_labelsname), train_labels, allow_pickle=True)
test_results = []
labels = []
model.eval()
with torch.no_grad():
for img, label in tqdm(test_loader):
print(img.shape)
img = img.unsqueeze(1)
print(img.shape)
# img=img.unsqueeze(1)
test_results.append(model(img.to(device, dtype=torch.float)).cpu().numpy())
labels.append(label)
test_results = np.concatenate(test_results)
test_labels = np.concatenate(labels)
test_results.shape
test_embeddings = dataset+'_test_embeddings'+'_' + '.npy'
test_labelsname = dataset+'_test_labels'+'_'+'.npy'
np.save(os.path.join(folder, test_embeddings), test_results, allow_pickle=True)
np.save(os.path.join(folder, test_labelsname), test_labels, allow_pickle=True)
# %%
# Plotting the latent spaces in 2D
graph_name_2D = dataset+'Training_Feature_2D' + '_'+'.png'
graph_name_2D = os.path.join(folder, graph_name_2D)
plot_embeddings(train_results, train_labels, graph_name_2D)
graph_name_2D = dataset+'Testing_Feature_2D' + '_'+'.png'
graph_name_2D = os.path.join(folder, graph_name_2D)
plot_embeddings(test_results, test_labels, graph_name_2D)
# %%
# Plotting the latent spaces in 3D
graph_name = dataset+'Training_Feature_3D' + '_'+'.png'
graph_name = os.path.join(folder, graph_name)
ax, fig = Three_embeddings(train_results, train_labels, graph_name, ang=35)
gif1_name = dataset+str('Training_Feature')+'.gif'
gif1_name = os.path.join(folder, gif1_name)
def rotate(angle):
ax.view_init(azim=angle)
angle = 3
ani = animation.FuncAnimation(fig, rotate, frames=np.arange(0, 360, angle), interval=50)
ani.save(gif1_name, writer=animation.PillowWriter(fps=20))
graph_name = dataset+'Testing_Feature_3D' + '_'+'.png'
graph_name = os.path.join(folder, graph_name)
ax, fig = Three_embeddings(test_results, test_labels, graph_name, ang=35)
gif1_name = dataset+str('Testing_Feature')+'.gif'
gif1_name = os.path.join(folder, gif1_name)
def rotate(angle):
ax.view_init(azim=angle)
angle = 3
ani = animation.FuncAnimation(fig, rotate, frames=np.arange(0, 360, angle), interval=50)
ani.save(gif1_name, writer=animation.PillowWriter(fps=20))
# %%
# Get more info on the model parameters.
count_parameters(model)
y_pred_prob, pred_prob = LR(folder, train_results, test_results, train_labels,
test_labels)

Event Timeline