Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F101356537
AE_Channel Scan vectors.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Feb 8, 16:07
Size
6 KB
Mime Type
text/x-python
Expires
Mon, Feb 10, 16:07 (1 d, 20 h)
Engine
blob
Format
Raw Data
Handle
24143090
Attached To
R13109 LPBF-Sinergia- SNF
AE_Channel Scan vectors.py
View Options
# -*- coding: utf-8 -*-
"""
Created on Sun Aug 29 23:08:34 2021
@author: srpv
contact: vigneashwara.solairajapandiyan@empa.ch
The codes in this following script will be used for the publication of the following work
"Dynamics of in-situ alloying of Ti6Al4V-Fe by means of acoustic emission monitoring
supported by operando synchrotron X-ray diffraction"
@any reuse of this code should be authorized by the first owner, code author
"""
# %%
# Librariies to import
from
operator
import
itemgetter
from
itertools
import
groupby
from
scipy
import
signal
import
os
import
ntpath
import
pandas
as
pd
import
glob
import
numpy
as
np
import
matplotlib.pyplot
as
mpl
from
Utils_AE_Channel
import
*
mpl
.
rcParams
[
'agg.path.chunksize'
]
=
1000000
# %% Inputs to be added
# Folder details --> Ensure all files are present in the folder in order
path_
=
r'C:\Users\srpv\Desktop\Sinergia Offline\Reza\Ti-Fe\CM\Ti64\CM_1'
sample_rate
=
1500000
# rate at which the acquistion happend.
window_size
=
5000
# window size we want
lowpass
=
sample_rate
//
2.1
layer_no
=
0
''' The important assumption here is we are building scan vectors of equal length ...
across each layer
--> Also we have one folder for each layer'''
# initialize the data collection
AE_window
=
[]
vector
=
1
# %%
# channel_0 is trigger from the RTC card
# channel_1 is trigger from the AE signal
# Helper functions
def
addlabels
(
k
):
label
=
k
return
label
def
path_leaf
(
path
):
head
,
tail
=
ntpath
.
split
(
path
)
return
tail
# return tail or ntpath.basename(head)
def
filter_signal
(
signal_window
):
lowpass_freq
=
lowpass
/
(
sample_rate
/
2
)
# Normalize the frequency
b
,
a
=
signal
.
butter
(
5
,
lowpass_freq
,
'low'
)
signal_window
=
signal
.
filtfilt
(
b
,
a
,
signal_window
)
return
signal_window
def
extract_window
(
b
,
a
,
data2
,
window_size
,
data_file_name
):
print
(
a
,
b
)
c
=
b
-
a
# calculate the difference for windowing
'''
There are offsets of few data points in the array length
'''
c_
=
round
(
c
,
-
3
)
_
,
d
=
divmod
(
c
,
c_
)
# addwindow #check reminder
a
=
a
+
d
# pass the reminder to the starting (lower limit)
'''Now the length of the scan vectors are to be equal'''
data2
=
data2
.
to_numpy
()
''' Mapping with the AE array'''
AE_signal_chop
=
data2
[
a
:
b
]
window
=
round
(
AE_signal_chop
.
shape
[
0
]
/
c_
)
AE_signal_chop
=
np
.
split
(
AE_signal_chop
,
window
)
window
=
len
(
AE_signal_chop
)
print
(
"Windows compute in the scan vector: "
,
window
)
for
i
in
range
(
window
):
scan_vector
=
AE_signal_chop
[
i
]
scan_vector
=
np
.
squeeze
(
scan_vector
)
scan_vector
=
filter_signal
(
scan_vector
)
scan_vector
=
np
.
expand_dims
(
scan_vector
,
axis
=
1
)
print
(
"Shape: "
,
scan_vector
.
shape
)
''' Prefer to have AE_window as global as its a simple script'''
AE_window
.
append
(
scan_vector
)
def
visualization
(
AE_window
,
vector
,
sample_rate
,
data_file_name
,
path
):
scan_vector
=
AE_window
[
vector
]
length
=
len
(
scan_vector
)
N
=
length
t0
=
0
dt
=
1
/
sample_rate
time
=
np
.
arange
(
0
,
N
)
*
dt
+
t0
lowpass
=
sample_rate
//
2.1
STFT
(
scan_vector
,
vector
,
time
,
path
,
lowpass
)
MEL
(
scan_vector
,
vector
,
sample_rate
,
time
,
path
,
lowpass
)
# --> Use wavelet of your choice...Ensure the Wavelet function is compatible in help[er function]
waveletname
=
'morl'
Wavelet
(
scan_vector
,
vector
,
time
,
path
,
waveletname
)
Frequencyplot
(
scan_vector
,
sample_rate
,
vector
,
path
)
# %% The out put of this block is the AE signals corresponding to each scan vector in One layer
path
=
os
.
path
.
join
(
path_
)
isDirectory
=
os
.
path
.
isdir
(
path
)
# To check if its a directory
if
isDirectory
:
print
(
path
)
print
(
isDirectory
)
Channel0
=
(
os
.
path
.
join
(
path
,
'channel_0_decoded'
))
# use channel_0 or channel_0_decoded
print
(
Channel0
)
Channel1
=
(
os
.
path
.
join
(
path
,
'channel_1_decoded'
))
print
(
Channel1
)
all_files
=
glob
.
glob
(
Channel0
+
"/*.csv"
)
for
filename
in
all_files
:
tail
=
path_leaf
(
filename
)
A
=
tail
.
split
(
'.'
)
data_file_name
=
A
[
0
]
print
(
data_file_name
)
# Name extracted from the .csv file
Channel_0
=
(
os
.
path
.
join
(
Channel0
,
tail
))
print
(
Channel0
)
Channel_1
=
(
os
.
path
.
join
(
Channel1
,
tail
))
print
(
Channel1
)
data1
=
pd
.
read_csv
(
Channel_0
)
data1
=
data1
.
to_numpy
()
data1
=
np
.
ravel
(
data1
)
data2
=
pd
.
read_csv
(
Channel_1
)
data2
=
data2
.
to_numpy
()
data2
=
np
.
ravel
(
data2
)
'''To look at the useful part of the data try to truncate...
...the data stream --> used 800000 here'''
# the value can be change iteratively
data1
=
data1
[
0
:
800000
]
data2
=
data2
[
0
:
800000
]
''' Lets dynamically create a Time stamp as we know the sampling rate'''
length
=
len
(
data1
)
N
=
length
t0
=
0
dt
=
1
/
sample_rate
time
=
np
.
arange
(
0
,
N
)
*
dt
+
t0
''' Lets Use pandas libraries and associated functions to create custom windows '''
data1
=
pd
.
DataFrame
(
data1
)
data2
=
pd
.
DataFrame
(
data2
)
data1
.
columns
=
[
'Value'
]
'''The Value 1.5 ensures that we set a condition in such a way that all the index laser trigger continous points above 1.5 can be extracted..
Since both the channels are synchronized we can use the same index on the AE channel and...
the AE signals corresponding to each scan vector could be extracted
---> instead of 1.5 any value less than 4.5 can be used (Our system some times doesnt reach 5V as per the specs)'''
reqd_Index
=
data1
[
data1
[
"Value"
]
>=
1.5
]
.
index
.
tolist
()
reqd_Index
=
np
.
array
(
reqd_Index
)
ranges
=
[]
data
=
reqd_Index
'''Continous indexes from the array are grouped and considered as each scan vector '''
for
k
,
g
in
groupby
(
enumerate
(
data
),
lambda
x
:
x
[
0
]
-
x
[
1
]):
group
=
(
map
(
itemgetter
(
1
),
g
))
group
=
list
(
map
(
int
,
group
))
ranges
.
append
((
group
[
0
],
group
[
-
1
]))
# [ranges] will have the index number for first and the last point satisfying the criteria of laser trigger value set
# in the previous section in this case 1.5
'''Looping over the list i.e scan vectors but mapping to the AE channel'''
for
i
in
ranges
:
if
i
[
1
]
-
i
[
0
]
>=
window_size
:
extract_window
(
i
[
1
],
i
[
0
],
data2
,
window_size
,
data_file_name
)
print
(
"Windows created"
)
else
:
print
(
"Window not greater"
)
# AE_window = np.asarray(AE_window)
AE_window
=
np
.
squeeze
(
AE_window
)
rawfile_1
=
str
(
data_file_name
)
+
''
+
str
(
layer_no
)
+
'.npy'
AE_window
=
AE_window
.
astype
(
np
.
float64
)
np
.
save
(
os
.
path
.
join
(
path_
,
rawfile_1
),
AE_window
,
allow_pickle
=
True
)
# %%
# Lets work on visualization
visualization
(
AE_window
,
vector
,
sample_rate
,
data_file_name
,
path_
)
Event Timeline
Log In to Comment