Page MenuHomec4science

AE_Channel Scan vectors.py
No OneTemporary

File Metadata

Created
Sat, Feb 8, 16:07

AE_Channel Scan vectors.py

# -*- coding: utf-8 -*-
"""
Created on Sun Aug 29 23:08:34 2021
@author: srpv
contact: vigneashwara.solairajapandiyan@empa.ch
The codes in this following script will be used for the publication of the following work
"Dynamics of in-situ alloying of Ti6Al4V-Fe by means of acoustic emission monitoring
supported by operando synchrotron X-ray diffraction"
@any reuse of this code should be authorized by the first owner, code author
"""
# %%
# Librariies to import
from operator import itemgetter
from itertools import groupby
from scipy import signal
import os
import ntpath
import pandas as pd
import glob
import numpy as np
import matplotlib.pyplot as mpl
from Utils_AE_Channel import *
mpl.rcParams['agg.path.chunksize'] = 1000000
# %% Inputs to be added
# Folder details --> Ensure all files are present in the folder in order
path_ = r'C:\Users\srpv\Desktop\Sinergia Offline\Reza\Ti-Fe\CM\Ti64\CM_1'
sample_rate = 1500000 # rate at which the acquistion happend.
window_size = 5000 # window size we want
lowpass = sample_rate//2.1
layer_no = 0
''' The important assumption here is we are building scan vectors of equal length ...
across each layer
--> Also we have one folder for each layer'''
# initialize the data collection
AE_window = []
vector = 1
# %%
# channel_0 is trigger from the RTC card
# channel_1 is trigger from the AE signal
# Helper functions
def addlabels(k):
label = k
return label
def path_leaf(path):
head, tail = ntpath.split(path)
return tail
# return tail or ntpath.basename(head)
def filter_signal(signal_window):
lowpass_freq = lowpass / (sample_rate / 2) # Normalize the frequency
b, a = signal.butter(5, lowpass_freq, 'low')
signal_window = signal.filtfilt(b, a, signal_window)
return signal_window
def extract_window(b, a, data2, window_size, data_file_name):
print(a, b)
c = b-a # calculate the difference for windowing
'''
There are offsets of few data points in the array length
'''
c_ = round(c, -3)
_, d = divmod(c, c_) # addwindow #check reminder
a = a+d # pass the reminder to the starting (lower limit)
'''Now the length of the scan vectors are to be equal'''
data2 = data2.to_numpy()
''' Mapping with the AE array'''
AE_signal_chop = data2[a:b]
window = round(AE_signal_chop.shape[0]/c_)
AE_signal_chop = np.split(AE_signal_chop, window)
window = len(AE_signal_chop)
print("Windows compute in the scan vector: ", window)
for i in range(window):
scan_vector = AE_signal_chop[i]
scan_vector = np.squeeze(scan_vector)
scan_vector = filter_signal(scan_vector)
scan_vector = np.expand_dims(scan_vector, axis=1)
print("Shape: ", scan_vector.shape)
''' Prefer to have AE_window as global as its a simple script'''
AE_window.append(scan_vector)
def visualization(AE_window, vector, sample_rate, data_file_name, path):
scan_vector = AE_window[vector]
length = len(scan_vector)
N = length
t0 = 0
dt = 1/sample_rate
time = np.arange(0, N) * dt + t0
lowpass = sample_rate//2.1
STFT(scan_vector, vector, time, path, lowpass)
MEL(scan_vector, vector, sample_rate, time, path, lowpass)
# --> Use wavelet of your choice...Ensure the Wavelet function is compatible in help[er function]
waveletname = 'morl'
Wavelet(scan_vector, vector, time, path, waveletname)
Frequencyplot(scan_vector, sample_rate, vector, path)
# %% The out put of this block is the AE signals corresponding to each scan vector in One layer
path = os.path.join(path_)
isDirectory = os.path.isdir(path) # To check if its a directory
if isDirectory:
print(path)
print(isDirectory)
Channel0 = (os.path.join(path, 'channel_0_decoded')) # use channel_0 or channel_0_decoded
print(Channel0)
Channel1 = (os.path.join(path, 'channel_1_decoded'))
print(Channel1)
all_files = glob.glob(Channel0 + "/*.csv")
for filename in all_files:
tail = path_leaf(filename)
A = tail.split('.')
data_file_name = A[0]
print(data_file_name) # Name extracted from the .csv file
Channel_0 = (os.path.join(Channel0, tail))
print(Channel0)
Channel_1 = (os.path.join(Channel1, tail))
print(Channel1)
data1 = pd.read_csv(Channel_0)
data1 = data1.to_numpy()
data1 = np.ravel(data1)
data2 = pd.read_csv(Channel_1)
data2 = data2.to_numpy()
data2 = np.ravel(data2)
'''To look at the useful part of the data try to truncate...
...the data stream --> used 800000 here''' # the value can be change iteratively
data1 = data1[0:800000]
data2 = data2[0:800000]
''' Lets dynamically create a Time stamp as we know the sampling rate'''
length = len(data1)
N = length
t0 = 0
dt = 1/sample_rate
time = np.arange(0, N) * dt + t0
''' Lets Use pandas libraries and associated functions to create custom windows '''
data1 = pd.DataFrame(data1)
data2 = pd.DataFrame(data2)
data1.columns = ['Value']
'''The Value 1.5 ensures that we set a condition in such a way that all the index laser trigger continous points above 1.5 can be extracted..
Since both the channels are synchronized we can use the same index on the AE channel and...
the AE signals corresponding to each scan vector could be extracted
---> instead of 1.5 any value less than 4.5 can be used (Our system some times doesnt reach 5V as per the specs)'''
reqd_Index = data1[data1["Value"] >= 1.5].index.tolist()
reqd_Index = np.array(reqd_Index)
ranges = []
data = reqd_Index
'''Continous indexes from the array are grouped and considered as each scan vector '''
for k, g in groupby(enumerate(data), lambda x: x[0]-x[1]):
group = (map(itemgetter(1), g))
group = list(map(int, group))
ranges.append((group[0], group[-1]))
# [ranges] will have the index number for first and the last point satisfying the criteria of laser trigger value set
# in the previous section in this case 1.5
'''Looping over the list i.e scan vectors but mapping to the AE channel'''
for i in ranges:
if i[1]-i[0] >= window_size:
extract_window(i[1], i[0], data2, window_size, data_file_name)
print("Windows created")
else:
print("Window not greater")
# AE_window = np.asarray(AE_window)
AE_window = np.squeeze(AE_window)
rawfile_1 = str(data_file_name)+''+str(layer_no)+'.npy'
AE_window = AE_window.astype(np.float64)
np.save(os.path.join(path_, rawfile_1), AE_window, allow_pickle=True)
# %%
# Lets work on visualization
visualization(AE_window, vector, sample_rate, data_file_name, path_)

Event Timeline