diff --git a/EventDetection.py b/EventDetection.py index d71e97c..009cb64 100644 --- a/EventDetection.py +++ b/EventDetection.py @@ -1,316 +1,340 @@ import NanoporeClasses as NC import shelve import os import glob from time import sleep import Functions from pprint import pprint import matplotlib.pyplot as plt from matplotlib.ticker import EngFormatter import numpy as np import sys import argparse import datetime from tkinter.filedialog import askopenfilenames,askdirectory import timeit import LoadData timeInSec= EngFormatter(unit='s', places=2) #Default parameters extension = '*.log' coefficients = {'a': 0.999, 'E': 0, 'S': 5, 'maxEventLength': 200e-3, 'minEventLength': 500e-6, 'hbook': 1, 'delta': 0.2e-9, 'ChimeraLowPass': 10e3} def GetParameters(): print("Usage:") print("run(inputData, newExtension=None, newCoefficients={}, outputFile=None, force=False, cut=False, verbose=False)") print() print("Default extension:") print(extension) print() print("Default coefficients:") pprint(coefficients) def batcheventdetection(folder,extension,coefficients, verbose=True, forceRun=False, CutTraces=False): #Create new class that contains all events AllEvents=NC.AllEvents() print('found ' + str(len(glob.glob(os.path.join(folder,extension)))) + ' files.') #Loop over all files in folder for fullfilename in glob.glob(os.path.join(folder,extension)): #Extract filename and generate filename to save located events if verbose: print('analysing '+fullfilename) filename, file_extension = os.path.splitext(os.path.basename(fullfilename)) directory=os.path.dirname(fullfilename) + os.sep + 'analysisfiles' if not os.path.exists(directory): os.makedirs(directory,exist_ok=True) savefilename=directory + os.sep + filename + 'data' shelfFile = shelve.open(savefilename) if ~forceRun: try: #Check if file can be loaded coefficientsloaded=shelfFile['coefficients'] tEL=shelfFile['translocationEvents'] # If coefficients before are not identical, analysis needs to run again if (coefficientsloaded==coefficients): if verbose: print('loaded from file') else: forceRun = True except: #Except if cannot be loaded, analysis needs to run forceRun=True pass if forceRun: #Extract list of events for this file tEL=eventdetection(fullfilename,coefficients,verbose, CutTraces) if verbose: print('Saved {} events'.format(len(tEL.events))) #Open savefile and save events for this file shelfFile['translocationEvents']=tEL shelfFile['coefficients']=coefficients if verbose: print('saved to file') shelfFile.close() #Add events to the initially created class that contains all events AllEvents.AddEvent(tEL) return AllEvents def eventdetection(fullfilename, coefficients, verbose=True, CutTraces=False, showFigures = False): + """ + Function used to find the events of TranslocationEvents class in the raw data in file 'filename'. + It calls the function RecursiveLowPassFast to approximatively locate rough events in the data. + If a short TranslocationEvent object is detected its type attribute will be changed to 'Impulse' and the + meanTrace attribute will take the value of the minimal current value within the event. + + Then the CUSUM function will be called to build the CUSUM-fit and assign values to the different + attributes of the TranslocationEvent objects. + + Depending on how the CUSUM was able to fit the trace inside and around the event, the type attribute of the TransocationEvent will be set to 'Real' + (if the CUSUM fit went well) or 'Rough' (if the CUSUM was not able to fit the trace). + + Returns the list of TranslocationEvent objects. + + """ + + if 'ChimeraLowPass' in coefficients: ChimeraLowPass=coefficients['ChimeraLowPass'] else: ChimeraLowPass=None loadedData=LoadData.OpenFile(fullfilename, ChimeraLowPass, True, CutTraces) minTime=coefficients['minEventLength'] IncludedBaseline = int(1e-2 * loadedData['samplerate']) delta=coefficients['delta'] hbook=coefficients['hbook'] if verbose: print('Recursive lowpass...', end='') #Call RecursiveLowPassFast to detect events in current trace start_time = timeit.default_timer() if 'i1Cut' in loadedData: events=[] for cutTrace in loadedData['i1Cut']: events.extend(Functions.RecursiveLowPassFast(cutTrace,coefficients,loadedData['samplerate'])) else: events=Functions.RecursiveLowPassFast(loadedData['i1'],coefficients,loadedData['samplerate']) if verbose: print('done. Calculation took {}'.format(timeInSec.format_data(timeit.default_timer() - start_time))) print('nr events: {}'.format(len(events))) #CusumParameters = [delta sigma Normalize ImpulsionLimit IncludedBaseline TooMuchLevels IncludedBaselinePlots 1 # SamplingFrequency]; #[EventDatabase] = event_detection(RawSignal, CusumParameters, RoughEventLocations, lowpasscutoff); #Make a new class translocationEventList that contains all the found events in this file translocationEventList=NC.AllEvents() #Plot current file if showFigures: plt.figure(1) plt.cla() timeVals=np.linspace(0, len(loadedData['i1'])/loadedData['samplerate'], num=len(loadedData['i1'])) plt.plot(timeVals,loadedData['i1']) plt.draw() plt.pause(0.001) if verbose: print('CUSUM fitting...', end='') #Call RecursiveLowPassFast to detect events in current trace start_time = timeit.default_timer() cusumEvents = 0 #Loop over all detected events for event in events: beginEvent = event[0] endEvent = event[1] localBaseline = event[2] stdEvent = event[3] lengthevent=endEvent-beginEvent # Check if the start of the event is > IncludedBaseline then compare eventtime to minimal and maxima; if beginEvent > IncludedBaseline: # Extract trace and extract baseline Trace = loadedData['i1'][int(beginEvent):int(endEvent)] traceBefore = loadedData['i1'][int(beginEvent) - IncludedBaseline:int(beginEvent)] traceAfter = loadedData['i1'][int(endEvent):int(endEvent) + IncludedBaseline] if lengthevent<=(minTime*loadedData['samplerate']) and lengthevent>3: newEvent = NC.TranslocationEvent(fullfilename,'Impulse') # Add Trace, mean of the event,the samplerate, coefficients and baseline to the New Event class newEvent.SetEvent(Trace, beginEvent,localBaseline, loadedData['samplerate']) if 'blockLength' in loadedData: voltI = int(beginEvent/loadedData['blockLength']) else: voltI = int(0) newEvent.SetCoefficients(coefficients, loadedData['v1'][voltI]) newEvent.SetBaselineTrace(traceBefore, traceAfter) # Add event to TranslocationList translocationEventList.AddEvent(newEvent) elif lengthevent>(minTime*loadedData['samplerate']) and lengthevent<(coefficients['maxEventLength']*loadedData['samplerate']): #Make new event class #CUSUM fit sigma = np.sqrt(stdEvent) h = hbook * delta / sigma (mc, kd, krmv)= Functions.CUSUM(loadedData['i1'][int(beginEvent) - IncludedBaseline: int(endEvent) + IncludedBaseline], delta, h) krmv = [krmvVal + int(beginEvent) - IncludedBaseline + 1 for krmvVal in krmv] #Add Trace, mean of the event,the samplerate, coefficients and baseline to the New Event class if len(krmv)>2: newEvent = NC.TranslocationEvent(fullfilename, 'Real') Trace=loadedData['i1'][int(krmv[0]):int(krmv[-1])] traceBefore = loadedData['i1'][int(krmv[0]) - IncludedBaseline:int(krmv[0])] traceAfter = loadedData['i1'][int(krmv[-1]):int(krmv[-1]) + IncludedBaseline] beginEvent=krmv[0] else: newEvent = NC.TranslocationEvent(fullfilename, 'Rough') newEvent.SetEvent(Trace,beginEvent,localBaseline,loadedData['samplerate']) newEvent.SetBaselineTrace(traceBefore, traceAfter) newEvent.SetCUSUMVariables(mc, kd, krmv) cusumEvents+=1 if 'blockLength' in loadedData: voltI = int(beginEvent/loadedData['blockLength']) else: voltI = int(0) newEvent.SetCoefficients(coefficients,loadedData['v1'][voltI]) #Add event to TranslocationList translocationEventList.AddEvent(newEvent) if verbose: print('done. Calculation took {}'.format(timeInSec.format_data(timeit.default_timer() - start_time))) print('{} events fitted'.format(cusumEvents)) #Plot events if True if showFigures: minVal=np.max(loadedData['i1'])#-1.05e-9 #np.min(loadedData['i1']) maxVal=np.max(loadedData['i1'])+0.05e-9#-1.05e-9 #np.min(loadedData['i1']) for i in range(len(events)): beginEvent=events[i][0] endEvent = events[i][1] if beginEvent>100 and (endEvent - beginEvent) >= (minTime * loadedData['samplerate']) and (endEvent - beginEvent) < ( coefficients['maxEventLength'] * loadedData['samplerate']): plt.plot([beginEvent/loadedData['samplerate'], beginEvent/loadedData['samplerate']], [minVal, maxVal], 'y-', lw=1) #plt.draw() plt.show() #plt.pause(1) return translocationEventList def LoadEvents(loadname): #Check if loadname is a directory or not if os.path.isdir(loadname): #create standard filename for loading loadFile = os.path.join(loadname,os.path.basename(loadname)+'_Events') else: loadFile, file_extension = os.path.splitext(loadname) #Open file and extract TranslocationEvents shelfFile=shelve.open(loadFile) TranslocationEvents=shelfFile['TranslocationEvents'] shelfFile.close() AllEvents=NC.AllEvents() AllEvents.AddEvent(TranslocationEvents) AllEvents.SetFolder(loadname) return AllEvents def run(inputData, newExtension=None, newCoefficients={}, outputFile=None, force=False, cut=False, verbose=False): - + """ + Function used to call all the other functions in the module + needed to find the events in raw nanopore experiment data. + + Returns an object of class AllEvents with a list of TranslocationEvents as an argument. + + """ + if newExtension is None: newExtension = extension for key in newCoefficients: coefficients[key]=newCoefficients[key] if os.path.isdir(inputData): TranslocationEvents=batcheventdetection(inputData, newExtension, coefficients, verbose, force, cut) else: TranslocationEvents=eventdetection(inputData,coefficients, verbose, cut) #Check if list is empty if outputFile is not None and TranslocationEvents.events: if os.path.isdir(inputData): outputData = inputData + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") LoadData.SaveVariables(outputFile,TranslocationEvents=TranslocationEvents) return True else: return TranslocationEvents if __name__=='__main__': parser = argparse.ArgumentParser() parser.add_argument('-i', '--input', help='Input directory or file') parser.add_argument('-e', '--ext', help='Extension for input directory') parser.add_argument('-o', '--output', help='Output file for saving') parser.add_argument('-c', '--coeff', help='Coefficients for selecting events [-C filter E standarddev maxlength minlength', nargs = '+') parser.add_argument('-u', '--cut', help='Cut Traces before detecting event, prevent detecting appended chunks as event', action='store_true') parser.add_argument('-f', '--force', help='Force analysis to run (don''t load from file', action='store_true') args = parser.parse_args() inputData=args.input if inputData==None: inputData=askdirectory() outputData=args.output if outputData==None: if os.path.isdir(inputData): outputData = inputData + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") else: outputData=os.path.dirname(inputData) + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") if args.coeff is not None: if len(args.coeff) % 2 == 0: for i in range(0, len(args.coeff), 2): if i <= len(args.coeff): coefficients[str(args.coeff[i])]=float(args.coeff[i+1]) if args.ext is not None: extension = args.ext print('Loading from: ' + inputData) print('Saving to: ' + outputData) print('Coefficients are: ', end='') pprint(coefficients) if os.path.isdir(inputData): print('extension is: ' + extension +'\nStarting.... \n') TranslocationEvents=batcheventdetection(inputData, extension, coefficients, args.force, args.cut) else: print('Starting.... \n') TranslocationEvents=eventdetection(inputData,coefficients, args.cut) #Check if list is empty if TranslocationEvents.events: LoadData.SaveVariables(outputData,TranslocationEvents=TranslocationEvents) diff --git a/Functions.py b/Functions.py index 187545b..a2b0a5b 100644 --- a/Functions.py +++ b/Functions.py @@ -1,895 +1,978 @@ # -*- coding: utf-8 -*- import math import os import pickle as pkl from scipy import signal import matplotlib.pyplot as plt import numpy as np import pandas as pd import scipy import scipy.signal as sig from matplotlib.ticker import EngFormatter from numpy import linalg as lin from scipy import constants as cst from scipy.optimize import curve_fit def LowPass(data, samplerate, lowPass): + """ + Function used to filter data with a digital Bessel filter of the 4th order. + + Specially useful for high bandwidth recordings. + + Returns the filtered data. + + """ + Wn = round(2 * lowPass / samplerate, 4) # [0,1] nyquist frequency b, a = signal.bessel(4, Wn, btype='low', analog=False) # 4-th order digital filter z, p, k = signal.tf2zpk(b, a) eps = 1e-9 r = np.max(np.abs(p)) approx_impulse_len = int(np.ceil(np.log(eps) / np.log(r))) Filt_sig = (signal.filtfilt(b, a, data, method='gust', irlen=approx_impulse_len)) ds_factor = np.ceil(samplerate / (2 * lowPass)) output = {} output['data'] = scipy.signal.resample(Filt_sig, int(len(data) / ds_factor)) output['samplerate'] = samplerate / ds_factor return output def GetKClConductivity(Conc, Temp): p = pkl.load(open('KCl_ConductivityValues.p', 'rb')) if Conc == 1.0: Conc = np.uint(1) return np.polyval(p[str(Conc)], Temp) def GetTempFromKClConductivity(Conc, Cond): p = pkl.load(open('KCl_ConductivityValues.p', 'rb')) if Conc == 1.0: Conc = np.uint(1) return (Cond - p[str(Conc)][1]) / p[str(Conc)][0] def RecursiveLowPassFast(signal, coeff, samplerate): + """ + Function used to find where roughly where event are in a noisy signal using a first order recursive + low pass filter defined as : + u[k] = au[k-1]+(1-a)i[k] + with u the mean value at sample k, i the input signal and a < 1, a parameter. + + Returns a numpy array with the rough event locations in the input signal. + + """ + padlen = np.uint64(samplerate) prepadded = np.ones(padlen) * np.mean(signal[0:1000]) signaltofilter = np.concatenate((prepadded, signal)) mltemp = scipy.signal.lfilter([1 - coeff['a'], 0], [1, -coeff['a']], signaltofilter) vltemp = scipy.signal.lfilter([1 - coeff['a'], 0], [1, -coeff['a']], np.square(signaltofilter - mltemp)) ml = np.delete(mltemp, np.arange(padlen)) vl = np.delete(vltemp, np.arange(padlen)) sl = ml - coeff['S'] * np.sqrt(vl) Ni = len(signal) points = np.array(np.where(signal <= sl)[0]) to_pop = np.array([]) for i in range(1, len(points)): if points[i] - points[i - 1] == 1: to_pop = np.append(to_pop, i) points = np.unique(np.delete(points, to_pop)) RoughEventLocations = [] NumberOfEvents = 0 for i in points: if NumberOfEvents is not 0: if i >= RoughEventLocations[NumberOfEvents - 1][0] and i <= RoughEventLocations[NumberOfEvents - 1][1]: continue NumberOfEvents += 1 start = i El = ml[i] + coeff['E'] * np.sqrt(vl[i]) Mm = ml[i] Vv = vl[i] duration = 0 endp = start if (endp + 1) < len(signal): while signal[endp + 1] < El and endp < (Ni - 2): # and duration < coeff['maxEventLength']*samplerate: duration += 1 endp += 1 if duration >= coeff['maxEventLength'] * samplerate or endp > ( Ni - 10): # or duration <= coeff['minEventLength'] * samplerate: NumberOfEvents -= 1 continue else: k = start while signal[k] < Mm and k > 1: k -= 1 start = k - 1 k2 = i + 1 # while signal[k2] > Mm: # k2 -= 1 # endp = k2 if start < 0: start = 0 RoughEventLocations.append((start, endp, ml[start], vl[start])) return np.array(RoughEventLocations) # , ml, vl, sl def RecursiveLowPassFastUp(signal, coeff, samplerate): + """ + RecursiveLowPassFast function specially written for the detection of events characterized + not by a current drop, but by a current increase. + + Returns a numpy array with the rough event locations in the input signal. + + """ + ml = scipy.signal.lfilter([1 - coeff['a'], 0], [1, -coeff['a']], signal) vl = scipy.signal.lfilter([1 - coeff['a'], 0], [1, -coeff['a']], np.square(signal - ml)) sl = ml + coeff['S'] * np.sqrt(vl) Ni = len(signal) points = np.array(np.where(signal >= sl)[0]) to_pop = np.array([]) for i in range(1, len(points)): if points[i] - points[i - 1] == 1: to_pop = np.append(to_pop, i) points = np.delete(points, to_pop) points = np.delete(points, np.array(np.where(points == 0)[0])) RoughEventLocations = [] NumberOfEvents = 0 for i in points: if NumberOfEvents is not 0: if i >= RoughEventLocations[NumberOfEvents - 1][0] and i <= RoughEventLocations[NumberOfEvents - 1][1]: continue NumberOfEvents += 1 start = i El = ml[i] + coeff['E'] * np.sqrt(vl[i]) Mm = ml[i] duration = 0 while signal[i + 1] > El and i < (Ni - 2) and duration < coeff['maxEventLength'] * samplerate: duration += 1 i += 1 if duration >= coeff['maxEventLength'] * samplerate or i > (Ni - 10): NumberOfEvents -= 1 else: k = start while signal[k] > Mm and k > 2: k -= 1 start = k - 1 k2 = i + 1 while signal[k2] > Mm: k2 -= 1 endp = k2 RoughEventLocations.append((start, endp, ml[start], vl[start])) return np.array(RoughEventLocations) def Reshape1DTo2D(inputarray, buffersize): npieces = int(len(inputarray) / buffersize) voltages = np.array([], dtype=np.float64) currents = np.array([], dtype=np.float64) for i in range(1, npieces + 1): if i % 2 == 1: currents = np.append(currents, inputarray[(i - 1) * buffersize:i * buffersize - 1], axis=0) # print('Length Currents: {}'.format(len(currents))) else: voltages = np.append(voltages, inputarray[(i - 1) * buffersize:i * buffersize - 1], axis=0) # print('Length Voltages: {}'.format(len(voltages))) v1 = np.ones((len(voltages)), dtype=np.float64) i1 = np.ones((len(currents)), dtype=np.float64) v1[:] = voltages i1[:] = currents out = {'v1': v1, 'i1': i1} print('Currents:' + str(v1.shape)) print('Voltages:' + str(i1.shape)) return out def MakeIVData(output, approach='mean', delay=0.7, UseVoltageRange=0, verbose=False): + """ + Function used to build and analyse a dataset containing the applied voltages + and measured currents during a voltage sweep experiment. + Used to assess the linearity and drift of the resulting signal through a Nanopore. + + Returns All a dictionary with all the values necessary to build an IV plot for nanopore calibration. + + """ + (ChangePoints, sweepedChannel) = CutDataIntoVoltageSegments(output, verbose) if ChangePoints is 0: return 0 if output['graphene']: currents = ['i1', 'i2'] else: currents = ['i1'] Values = output[sweepedChannel][ChangePoints] Values = np.append(Values, output[sweepedChannel][::-1][0]) delayinpoints = np.int64(delay * output['samplerate']) if delayinpoints > ChangePoints[0]: raise ValueError("Delay is longer than Changepoint") # Store All Data All = {} for current in currents: Item = {} ItemExp = {} l = len(Values) Item['Voltage'] = np.zeros(l) Item['StartPoint'] = np.zeros(l, dtype=np.uint64) Item['EndPoint'] = np.zeros(l, dtype=np.uint64) Item['Mean'] = np.zeros(l) Item['STD'] = np.zeros(l) Item['SE'] = np.zeros(l) Item['SweepedChannel'] = sweepedChannel ItemExp['SweepedChannel'] = sweepedChannel Item['YorkFitValues'] = {} ItemExp['YorkFitValues'] = {} ### MEAN METHOD### # First item Item['Voltage'][0] = Values[0] trace = output[current][0 + delayinpoints:ChangePoints[0]] Item['StartPoint'][0] = 0 + delayinpoints Item['EndPoint'][0] = ChangePoints[0] Item['Mean'][0] = np.mean(trace) isProblematic = math.isclose(Values[0], 0, rel_tol=1e-3) or math.isclose(Values[0], np.min(Values), rel_tol=1e-3) or math.isclose( Values[0], np.max(Values), rel_tol=1e-3) if sweepedChannel is 'v2' and isProblematic: print('In Problematic If for Values: {}, index {}'.format(Values[0], 0)) Item['STD'][0] = np.std(trace[-np.int64(output['samplerate']):]) else: Item['STD'][0] = np.std(trace) Item['SE'][0] = Item['STD'][0] / np.sqrt(len(trace)) for i in range(1, len(Values) - 1): trace = output[current][ChangePoints[i - 1] + delayinpoints: ChangePoints[i]] Item['Voltage'][i] = Values[i] Item['StartPoint'][i] = ChangePoints[i - 1] + delayinpoints Item['EndPoint'][i] = ChangePoints[i] if len(trace) > 0: Item['Mean'][i] = np.mean(trace) isProblematic = math.isclose(Values[i], 0, rel_tol=1e-3) or math.isclose(Values[i], np.min(Values), rel_tol=1e-3) or math.isclose( Values[i], np.max(Values), rel_tol=1e-3) if sweepedChannel is 'v2' and isProblematic: print('In Problematic If for Values: {}, index {}'.format(Values[i], i)) Item['STD'][i] = np.std(trace[-np.int64(output['samplerate']):]) else: Item['STD'][i] = np.std(trace) Item['SE'][i] = Item['STD'][i] / np.sqrt(len(trace)) if verbose: print('{}, {},{}'.format(i, ChangePoints[i - 1] + delayinpoints, ChangePoints[i])) else: Item['Mean'][i] = np.NaN Item['STD'][i] = np.NaN # Last if 1: trace = output[current][ChangePoints[len(ChangePoints) - 1] + delayinpoints: len(output[current]) - 1] Item['Voltage'][-1:] = Values[len(Values) - 1] Item['StartPoint'][-1:] = ChangePoints[len(ChangePoints) - 1] + delayinpoints Item['EndPoint'][-1:] = len(output[current]) - 1 if len(trace) > 0: Item['Mean'][-1:] = np.mean(trace) isProblematic = math.isclose(Values[-1:], 0, rel_tol=1e-3) or math.isclose(Values[-1:], np.min(Values), rel_tol=1e-3) or math.isclose( Values[-1:], np.max(Values), rel_tol=1e-3) if sweepedChannel is 'v2' and isProblematic: print('In Problematic If for Values: {}, index {}'.format(Values[-1:], i)) Item['STD'][-1:] = np.std(trace[-np.int64(output['samplerate']):]) else: Item['STD'][-1:] = np.std(trace) else: Item['Mean'][-1:] = np.NaN Item['STD'][-1:] = np.NaN Item['SE'][-1:] = Item['STD'][-1:] / np.sqrt(len(trace)) ## GET RECTIFICATION FACTOR parts = {'pos': Item['Voltage'] > 0, 'neg': Item['Voltage'] < 0} a = {} b = {} sigma_a = {} sigma_b = {} b_save = {} x_values = {} for part in parts: (a[part], b[part], sigma_a[part], sigma_b[part], b_save[part]) = YorkFit( Item['Voltage'][parts[part]].flatten(), Item['Mean'][parts[part]].flatten(), 1e-12 * np.ones(len(Item['Voltage'][parts[part]].flatten())), Item['STD'][parts[part]].flatten()) factor = b['neg'] / b['pos'] Item['Rectification'] = factor ItemExp['Rectification'] = factor ###EXPONENTIAL METHOD### if approach is not 'mean': l = 1000 ItemExp['StartPoint'] = np.zeros(l, dtype=np.uint64) ItemExp['EndPoint'] = np.zeros(l, dtype=np.uint64) baselinestd = np.std(output[current][0:np.int64(1 * output['samplerate'])]) dataset = pd.Series(output[current]) movingstd = dataset.rolling(10).std() # Extract The Desired Parts ind = (np.abs(movingstd - baselinestd) < 1e-9) & (np.abs(output[current]) < np.max(output[current]) / 2) # How Many Parts? restart = True counter = 0 for i, value in enumerate(ind): if value and restart: ItemExp['StartPoint'][counter] = i print('Start: {}\t'.format(i)) restart = False elif not value and not restart: if ((i - 1) - ItemExp['StartPoint'][counter]) > 1 * output['samplerate']: ItemExp['EndPoint'][counter] = i - 1 print('End: {}'.format(i - 1)) restart = True counter += 1 else: restart = True if not restart: counter += 1 ItemExp['EndPoint'][counter] = len(ind) ItemExp['StartPoint'] = ItemExp['StartPoint'][np.where(ItemExp['StartPoint'])] ItemExp['EndPoint'] = ItemExp['EndPoint'][np.where(ItemExp['EndPoint'])] NumberOfPieces = len(ItemExp['StartPoint']) ItemExp['Voltage'] = np.zeros(NumberOfPieces) ItemExp['ExpValues'] = np.zeros((3, NumberOfPieces)) ItemExp['STD'] = np.zeros(NumberOfPieces) ItemExp['Mean'] = np.zeros(NumberOfPieces) # Make It Piecewise for i in np.arange(0, NumberOfPieces): trace = output[current][ItemExp['StartPoint'][i]:ItemExp['EndPoint'][i]] popt, pcov = MakeExponentialFit(np.arange(len(trace)) / output['samplerate'], trace) ItemExp['Voltage'][i] = output[sweepedChannel][ItemExp['StartPoint'][i]] if popt[0]: ItemExp['ExpValues'][:, i] = popt ItemExp['Mean'][i] = popt[2] try: ItemExp['STD'][i] = np.sqrt(np.diag(pcov))[2] except RuntimeWarning: ItemExp['STD'][i] = np.std(trace) / np.sqrt(len(trace)) print('ExpFit: STD of voltage {} failed calculating...'.format(ItemExp['Voltage'][i])) else: print('Exponential Fit on for ' + current + ' failed at V=' + str(ItemExp['Voltage'][0])) ItemExp['Mean'][i] = np.mean(trace) ItemExp['STD'][i] = np.std(trace) ## FIT THE EXP CALCULATED VALUES ### (a, b, sigma_a, sigma_b, b_save) = YorkFit(ItemExp['Voltage'], ItemExp['Mean'], 1e-12 * np.ones(len(ItemExp['Voltage'])), ItemExp['STD']) x_fit = np.linspace(min(Item['Voltage']), max(Item['Voltage']), 1000) y_fit = scipy.polyval([b, a], x_fit) ItemExp['YorkFitValues'] = {'x_fit': x_fit, 'y_fit': y_fit, 'Yintercept': a, 'Slope': b, 'Sigma_Yintercept': sigma_a, 'Sigma_Slope': sigma_b, 'Parameter': b_save} All[current + '_Exp'] = ItemExp ##Remove NaNs nans = np.isnan(Item['Mean'][:]) nans = np.logical_not(nans) Item['Voltage'] = Item['Voltage'][nans] Item['StartPoint'] = Item['StartPoint'][nans] Item['EndPoint'] = Item['EndPoint'][nans] Item['Mean'] = Item['Mean'][nans] Item['STD'] = Item['STD'][nans] Item['SE'] = Item['SE'][nans] ## Restrict to set voltage Range if UseVoltageRange is not 0: print('Voltages Cut') ind = np.argwhere((Item['Voltage'] >= UseVoltageRange[0]) & (Item['Voltage'] <= UseVoltageRange[1])) Item['Voltage'] = Item['Voltage'][ind].flatten() Item['StartPoint'] = Item['StartPoint'][ind].flatten() Item['EndPoint'] = Item['EndPoint'][ind].flatten() Item['Mean'] = Item['Mean'][ind].flatten() Item['STD'] = Item['STD'][ind].flatten() Item['SE'] = Item['SE'][ind].flatten() ## FIT THE MEAN CALCULATED VALUES ### # Yorkfit (a, b, sigma_a, sigma_b, b_save) = YorkFit(Item['Voltage'].flatten(), Item['Mean'].flatten(), 1e-9 * np.ones(len(Item['Voltage'])), Item['STD'].flatten()) x_fit = np.linspace(min(Item['Voltage']), max(Item['Voltage']), 1000) y_fit = scipy.polyval([b, a], x_fit) Item['YorkFit'] = {'x_fit': x_fit, 'y_fit': y_fit, 'Yintercept': a, 'Slope': b, 'Sigma_Yintercept': sigma_a, 'Sigma_Slope': sigma_b, 'Parameter': b_save} # Polyfit p = np.polyfit(Item['Voltage'].flatten(), Item['Mean'].flatten(), 1) x_fit2 = np.linspace(min(Item['Voltage']), max(Item['Voltage']), 1000) y_fit2 = scipy.polyval(p, x_fit) Item['PolyFit'] = {'x_fit': x_fit2, 'y_fit': y_fit2, 'Yintercept': p[1], 'Slope': p[0]} All[current] = Item All['Currents'] = currents return All def ExpFunc(x, a, b, c): + """ + Function used to create an exponential function used MakeIVData to fit the signal around the capacitive drift and therefore + predict the stabilised current signal at each voltage jump in the MakeIV function. + + Returns y=a*exp(-b*x)+c + + """ + return a * np.exp(-b * x) + c def YorkFit(X, Y, sigma_X, sigma_Y, r=0): + """ + Function used to do the linear regression taking the error into account as the standard deviation. + + It is used in MakeIV, for the estimation of the linear regression curve of voltage versus current, + estimated by a voltage sweep experiment. + + Returns a tuple with the coefficients of the linear regression y=ax+b and their variances. + + """ + N_itermax = 10 # maximum number of interations tol = 1e-15 # relative tolerance to stop at N = len(X) temp = np.matrix([X, np.ones(N)]) # make initial guess at b using linear squares tmp = np.matrix(Y) * lin.pinv(temp) b_lse = np.array(tmp)[0][0] # a_lse=tmp(2); b = b_lse # initial guess omega_X = np.true_divide(1, np.power(sigma_X, 2)) omega_Y = np.true_divide(1, np.power(sigma_Y, 2)) alpha = np.sqrt(omega_X * omega_Y) b_save = np.zeros(N_itermax + 1) # vector to save b iterations in b_save[0] = b for i in np.arange(N_itermax): W = omega_X * omega_Y / (omega_X + b * b * omega_Y - 2 * b * r * alpha) X_bar = np.sum(W * X) / np.sum(W) Y_bar = np.sum(W * Y) / np.sum(W) U = X - X_bar V = Y - Y_bar beta = W * (U / omega_Y + b * V / omega_X - (b * U + V) * r / alpha) b = sum(W * beta * V) / sum(W * beta * U) b_save[i + 1] = b if np.abs((b_save[i + 1] - b_save[i]) / b_save[i + 1]) < tol: break a = Y_bar - b * X_bar x = X_bar + beta y = Y_bar + b * beta x_bar = sum(W * x) / sum(W) y_bar = sum(W * y) / sum(W) u = x - x_bar # %v=y-y_bar sigma_b = np.sqrt(1 / sum(W * u * u)) sigma_a = np.sqrt(1. / sum(W) + x_bar * x_bar * sigma_b * sigma_b) return (a, b, sigma_a, sigma_b, b_save) def DoubleExpFunc(x, Y0, percentFast, plateau, Kfast, Kslow): SpanFast = (Y0 - plateau) * percentFast * 0.01 SpanSlow = (Y0 - plateau) * (100 - percentFast) * 0.01 return plateau + ((Y0 - plateau) * percentFast * 0.01) * np.exp(-Kfast * x) + ( (Y0 - plateau) * (100 - percentFast) * 0.01) * np.exp(-Kslow * x) def MakeExponentialFit(xdata, ydata): try: popt, pcov = curve_fit(ExpFunc, xdata, ydata, method='lm', xtol=1e-20, ftol=1e-12, gtol=1e-20) # popt, pcov = curve_fit(DoubleExpFunc, xdata, ydata, p0 = (ydata[0], 50, ydata[-1], 1 / 15, 1 / 60)) return (popt, pcov) except RuntimeError: popt = (0, 0, 0) pcov = 0 return (popt, pcov) def CutDataIntoVoltageSegments(output, verbose=False): sweepedChannel = '' if output['type'] == 'ChimeraNotRaw' or (output['type'] == 'Axopatch' and not output['graphene']): ChangePoints = np.where(np.diff(output['v1']))[0] sweepedChannel = 'v1' if len(ChangePoints) is 0: print('No voltage sweeps in this file:\n{}'.format(output['filename'])) return (0, 0) elif (output['type'] == 'Axopatch' and output['graphene']): ChangePoints = np.where(np.diff(output['v1']))[0] sweepedChannel = 'v1' if len(ChangePoints) is 0: ChangePoints = np.where(np.diff(output['v2']))[0] if len(ChangePoints) is 0: print('No voltage sweeps in this file') return (0, 0) else: sweepedChannel = 'v2' if verbose: print('Cutting into Segments...\n{} change points detected in channel {}...'.format(len(ChangePoints), sweepedChannel)) return (ChangePoints, sweepedChannel) def CalculatePoreSize(G, L, s): # https://doi.org/10.1088/0957-4484/22/31/315101 return (G + np.sqrt(G * (G + 16 * L * s / np.pi))) / (2 * s) def CalculateCapillarySize(G, D=0.3e-3, t=3.3e-3, s=10.5): # DOI: 10.1021/nn405029j # s=conductance (10.5 S/m at 1M KCl) # t=taper length (3.3 mm) # D=diameter nanocapillary at the shaft (0.3 mm) return G * (4 * t / np.pi + 0.5 * D) / (s * D - 0.5 * G) def CalculateShrunkenCapillarySize(G, D=0.3e-3, t=3.3e-3, s=10.5, ts=500e-9, Ds=500e-9): # DOI: 10.1021/nn405029j # s=conductance (10.5 S/m at 1M KCl) # t=taper length (3.3 mm) # D=diameter nanocapillary at the shaft (0.3 mm) # ts=taper length of the shrunken part (543nm fit) # Ds=diameter nanocapillary at the shrunken part (514nm fit) return G * D * (8 * ts / np.pi + Ds) / ((2 * s * D * Ds) - (G * (8 * t / np.pi + Ds))) def CalculateConductance(L, s, d): return s * 1 / (4 * L / (np.pi * d ** 2) + 1 / d) def GetSurfaceChargeFromLeeEquation(s, c, D, G, L, A, B, version=1): lDu = 1 / (8 * np.pi * B * D * s) * ( version * np.sqrt( (4 * np.pi * A * D ** 2 * s + np.pi * B * D ** 2 * s - 4 * B * G * L - 8 * np.pi * D * G) ** 2 - 16 * np.pi * B * D * s * (np.pi * A * D ** 3 * s - 4 * A * D * G * L - 2 * np.pi * D ** 2 * G)) - 4 * np.pi * A * D ** 2 * s - np.pi * B * D ** 2 * s + 4 * B * G * L + 8 * np.pi * D * G ) e = cst.elementary_charge Na = cst.Avogadro return lDu * (2 * c * Na * 1e3 * e) def ConductivityFromConductanceModel(L, d, G): return G * (4 * L / (np.pi * d ** 2) + 1 / d) def RefinedEventDetection(out, AnalysisResults, signals, limit): for sig in signals: # Choose correct reference if sig is 'i1_Up': sig1 = 'i1' elif sig is 'i2_Up': sig1 = 'i2' else: sig1 = sig if sig is 'i1' or 'i1_Up': volt = 'v1' elif sig is 'i2' or 'i2_Up': volt = 'v2' if len(AnalysisResults[sig]['RoughEventLocations']) > 1: startpoints = np.uint64(AnalysisResults[sig]['RoughEventLocations'][:, 0]) endpoints = np.uint64(AnalysisResults[sig]['RoughEventLocations'][:, 1]) localBaseline = AnalysisResults[sig]['RoughEventLocations'][:, 2] localVariance = AnalysisResults[sig]['RoughEventLocations'][:, 3] CusumBaseline = 500 numberofevents = len(startpoints) AnalysisResults[sig]['StartPoints'] = startpoints AnalysisResults[sig]['EndPoints'] = endpoints AnalysisResults[sig]['LocalBaseline'] = localBaseline AnalysisResults[sig]['LocalVariance'] = localVariance AnalysisResults[sig]['NumberOfEvents'] = len(startpoints) deli = np.zeros(numberofevents) dwell = np.zeros(numberofevents) fitvalue = np.zeros(numberofevents) AllEvents = [] #####FIX THE UP AND DOWN STORY!! ALL THE PROBLEMS ARE IN HERE... for i in range(numberofevents): length = endpoints[i] - startpoints[i] if out[sig1][startpoints[i]] < 0 and (sig == 'i1_Up' or sig == 'i2_Up'): if length <= limit and length > 3: # Impulsion Fit to minimal value fitvalue[i] = np.max(out[sig1][startpoints[i] + np.uint(1):endpoints[i] - np.uint(1)]) elif length > limit: fitvalue[i] = np.mean(out[sig1][startpoints[i] + np.uint(5):endpoints[i] - np.uint(5)]) else: fitvalue[i] = np.max(out[sig1][startpoints[i]:endpoints[i]]) deli[i] = -(localBaseline[i] - fitvalue[i]) elif out[sig1][startpoints[i]] < 0 and (sig == 'i1' or sig == 'i2'): if length <= limit and length > 3: # Impulsion Fit to minimal value fitvalue[i] = np.min(out[sig1][startpoints[i] + np.uint(1):endpoints[i] - np.uint(1)]) elif length > limit: fitvalue[i] = np.mean(out[sig1][startpoints[i] + np.uint(5):endpoints[i] - np.uint(5)]) else: fitvalue[i] = np.min(out[sig1][startpoints[i]:endpoints[i]]) deli[i] = -(localBaseline[i] - fitvalue[i]) elif out[sig1][startpoints[i]] > 0 and (sig == 'i1_Up' or sig == 'i2_Up'): if length <= limit and length > 3: # Impulsion Fit to minimal value fitvalue[i] = np.max(out[sig1][startpoints[i] + np.uint(1):endpoints[i] - np.uint(1)]) elif length > limit: fitvalue[i] = np.mean(out[sig1][startpoints[i] + np.uint(5):endpoints[i] - np.uint(5)]) else: fitvalue[i] = np.max(out[sig1][startpoints[i]:endpoints[i]]) deli[i] = (localBaseline[i] - fitvalue[i]) elif out[sig1][startpoints[i]] > 0 and (sig == 'i1' or sig == 'i2'): if length <= limit and length > 3: # Impulsion Fit to minimal value fitvalue[i] = np.min(out[sig1][startpoints[i] + np.uint(1):endpoints[i] - np.uint(1)]) elif length > limit: fitvalue[i] = np.mean(out[sig1][startpoints[i] + np.uint(5):endpoints[i] - np.uint(5)]) else: fitvalue[i] = np.min(out[sig1][startpoints[i]:endpoints[i]]) deli[i] = (localBaseline[i] - fitvalue[i]) else: print( 'Strange event that has to come from a neighboring universe...Computer will self-destruct in 3s!') dwell[i] = (endpoints[i] - startpoints[i]) / out['samplerate'] AllEvents.append(out[sig1][startpoints[i]:endpoints[i]]) frac = deli / localBaseline dt = np.array(0) dt = np.append(dt, np.diff(startpoints) / out['samplerate']) numberofevents = len(dt) # AnalysisResults[sig]['CusumFits'] = AllFits AnalysisResults[sig]['FractionalCurrentDrop'] = frac AnalysisResults[sig]['DeltaI'] = deli AnalysisResults[sig]['DwellTime'] = dwell AnalysisResults[sig]['Frequency'] = dt AnalysisResults[sig]['LocalVoltage'] = out[volt][startpoints] AnalysisResults[sig]['AllEvents'] = AllEvents AnalysisResults[sig]['FitLevel'] = fitvalue else: AnalysisResults[sig]['NumberOfEvents'] = 0 return AnalysisResults def MakeIV(filenames, directory, conductance=10, title=''): + """ + Function used to make the IV plot for the voltage sweep experiment. + To do so, it calls the MakeIVData function seen earlier and plots its output. + + """ + if len(conductance) is 1: conductance = np.ones(len(filenames)) * conductance for idx, filename in enumerate(filenames): print(filename) # Make Dir to save images output = LoadData.OpenFile(filename) if not os.path.exists(directory): os.makedirs(directory) AllData = MakeIVData(output, delay=2) if AllData == 0: print('!!!! No Sweep in: ' + filename) continue # Plot Considered Part # figExtracteParts = plt.figure(1) # ax1 = figExtracteParts.add_subplot(211) # ax2 = figExtracteParts.add_subplot(212, sharex=ax1) # (ax1, ax2) = uf.PlotExtractedPart(output, AllData, current = 'i1', unit=1e9, axis = ax1, axis2=ax2) # plt.show() # figExtracteParts.savefig(directory + os.sep + 'PlotExtracted_' + str(os.path.split(filename)[1])[:-4] + '.eps') # figExtracteParts.savefig(directory + os.sep + 'PlotExtracted_' + str(os.path.split(filename)[1])[:-4] + '.png', dpi=150) # Plot IV if output['graphene']: figIV2 = plt.figure(3) ax2IV = figIV2.add_subplot(111) ax2IV = PlotIV(output, AllData, current='i2', unit=1e9, axis=ax2IV, WithFit=1, title=title[idx]) figIV2.tight_layout() figIV2.savefig(directory + os.sep + str(os.path.split(filename)[1]) + '_IV_i2.png', dpi=300) figIV2.savefig(directory + os.sep + str(os.path.split(filename)[1]) + '_IV_i2.eps') figIV2.clear() figIV = plt.figure(2) ax1IV = figIV.add_subplot(111) ax1IV = PlotIV(output, AllData, current='i1', unit=1e9, axis=ax1IV, WithFit=1, PoreSize=[conductance[idx], 20e-9], title=title[idx]) figIV.tight_layout() # Save Figures figIV.savefig(directory + os.sep + str(os.path.split(filename)[1]) + 'IV_i1.png', dpi=300) figIV.savefig(directory + os.sep + str(os.path.split(filename)[1]) + 'IV_i1.eps') figIV.clear() def TwoChannelAnalysis(filenames, outdir, UpwardsOn=0): + """ + Function used to analyse two channel recordings with in one channel the current and voltage values across the nanopore and + in the other of the transverse current and voltage values just before the nanopore. + + Calls the RecursiveLowPassFast function to detect the events (and if needed the RecursiveLowPassFastUp for the argument UpwardsOn = 1) + to detect the events. Then calls RefinedEventDetection to get more precise informations on the events. + It correlates the 2 channels to seek the common and uncommon events. + + Finally the TranslocationEvents detection will be plotted. + + """ + for filename in filenames: print(filename) inp = LoadData.OpenFile(filename) file = os.sep + str(os.path.split(filename)[1][:-4]) if not os.path.exists(outdir): os.makedirs(outdir) # Low Pass Event Detection AnalysisResults = {} if inp['graphene']: chan = ['i1', 'i2'] # For looping over the channels else: chan = ['i1'] for sig in chan: AnalysisResults[sig] = {} AnalysisResults[sig]['RoughEventLocations'] = RecursiveLowPassFast(inp[sig], pm.coefficients[sig], inp['samplerate']) if UpwardsOn: # Upwards detection can be turned on or off AnalysisResults[sig + '_Up'] = {} AnalysisResults[sig + '_Up']['RoughEventLocations'] = RecursiveLowPassFastUp(inp[sig], pm.coefficients[sig], inp['samplerate']) # f.PlotRecursiveLPResults(AnalysisResults['i2'], inp, directory, 1e-3, channel='i2') # f.PlotRecursiveLPResults(AnalysisResults['i2_Up'], inp, directory+'UP_', 1e-3, channel='i2') # Refine the Rough Event Detection done by the LP filter and Add event infos AnalysisResults = RefinedEventDetection(inp, AnalysisResults, signals=chan, limit=pm.MinimalFittingLimit * inp['samplerate']) # Correlate the two channels if inp['graphene']: (CommonIndexes, OnlyIndexes) = f.CorrelateTheTwoChannels(AnalysisResults, 10e-3 * inp['samplerate']) print('\n\nAnalysis Done...\nThere are {} common events\n{} Events on i1 only\n{} Events on i2 only'.format( len(CommonIndexes['i1']), len(OnlyIndexes['i1']), len(OnlyIndexes['i2']))) # Plot The Events f.SaveAllPlots(CommonIndexes, OnlyIndexes, AnalysisResults, directory, inp, pm.PlotBuffer) else: # Plot The Events f.SaveAllAxopatchEvents(AnalysisResults, directory, inp, pm.PlotBuffer) def PlotExtractedPart(output, AllData, current='i1', unit=1e9, axis='', axis2=''): time = np.arange(0, len(output[current])) / output['samplerate'] axis.plot(time, output[current] * unit, 'b', label=str(os.path.split(output['filename'])[1])[:-4]) axis.set_ylabel('Current ' + current + ' [nA]') axis.set_title('Time Trace') for i in range(0, len(AllData[current]['StartPoint'])): axis.plot(time[AllData[current]['StartPoint'][i]:AllData[current]['EndPoint'][i]], output[current][AllData[current]['StartPoint'][i]:AllData[current]['EndPoint'][i]] * unit, 'r') axis2.plot(time, output[AllData[current]['SweepedChannel']], 'b', label=str(os.path.split(output['filename'])[1])[:-4]) axis2.set_ylabel('Voltage ' + AllData[current]['SweepedChannel'] + ' [V]') axis2.set_xlabel('Time') return (axis, axis2) def xcorr(x, y, k, normalize=True): n = x.shape[0] # initialize the output array out = np.empty((2 * k) + 1, dtype=np.double) lags = np.arange(-k, k + 1) # pre-compute E(x), E(y) mu_x = x.mean() mu_y = y.mean() # loop over lags for ii, lag in enumerate(lags): # use slice indexing to get 'shifted' views of the two input signals if lag < 0: xi = x[:lag] yi = y[-lag:] elif lag > 0: xi = x[:-lag] yi = y[lag:] else: xi = x yi = y # x - mu_x; y - mu_y xdiff = xi - mu_x ydiff = yi - mu_y # E[(x - mu_x) * (y - mu_y)] out[ii] = xdiff.dot(ydiff) / n # NB: xdiff.dot(ydiff) == (xdiff * ydiff).sum() if normalize: # E[(x - mu_x) * (y - mu_y)] / (sigma_x * sigma_y) out /= np.std(x) * np.std(y) return lags, out def CUSUM(input, delta, h): + """ + Function used in the detection of abrupt changes in mean current; optimal for Gaussian signals. + CUSUM is based on the cummulative sum algorithm (ADD CITATION.) + This function will define new start and end points more precisely than just + the RecursiveLowPassFast and will fit levels inside the TransolocationEvent objects. + + Returns mc the piecewise constant segmented signal, kd a list of detection times (in samples) + krmv a list of estimated change times (in samples). + + """ + # initialization Nd = k0 = 0 kd = [] krmv = [] k = 1 l = len(input) m = np.zeros(l) m[k0] = input[k0] v = np.zeros(l) sp = np.zeros(l) Sp = np.zeros(l) gp = np.zeros(l) sn = np.zeros(l) Sn = np.zeros(l) gn = np.zeros(l) while k < l: m[k] = np.mean(input[k0:k + 1]) v[k] = np.var(input[k0:k + 1]) sp[k] = delta / v[k] * (input[k] - m[k] - delta / 2) sn[k] = -delta / v[k] * (input[k] - m[k] + delta / 2) Sp[k] = Sp[k - 1] + sp[k] Sn[k] = Sn[k - 1] + sn[k] gp[k] = np.max([gp[k - 1] + sp[k], 0]) gn[k] = np.max([gn[k - 1] + sn[k], 0]) if gp[k] > h or gn[k] > h: kd.append(k) if gp[k] > h: kmin = np.argmin(Sp[k0:k + 1]) krmv.append(kmin + k0) else: kmin = np.argmin(Sn[k0:k + 1]) krmv.append(kmin + k0) # Re-initialize k0 = k m[k0] = input[k0] v[k0] = sp[k0] = Sp[k0] = gp[k0] = sn[k0] = Sn[k0] = gn[k0] = 0 Nd = Nd + 1 k += 1 if Nd == 0: mc = np.mean(input) * np.ones(k) elif Nd == 1: mc = np.append(m[krmv[0]] * np.ones(krmv[0]), m[k - 1] * np.ones(k - krmv[0])) else: mc = m[krmv[0]] * np.ones(krmv[0]) for ii in range(1, Nd): mc = np.append(mc, m[krmv[ii]] * np.ones(krmv[ii] - krmv[ii - 1])) mc = np.append(mc, m[k - 1] * np.ones(k - krmv[Nd - 1])) return (mc, kd, krmv) def GetPSD(input): Fulltrace = input['i1'] samplerate = input['samplerate'] f, Pxx_den = sig.welch(Fulltrace, samplerate) return f, Pxx_den def CondPoreSizeTick(x, pos): formCond = EngFormatter(unit='S') formPoreSize = EngFormatter(unit='m') outstring = '{}, {}'.format(formCond.format_eng(x), formPoreSize.format_eng(CalculatePoreSize(x, 1e-9, 10))) return outstring def DebyeHuckelParam(c, T=20): ## Valid Only for same valency and c1 = c2 epsilon_r = 87.740 - 0.40008 * T + 9.398e-4 * T ** 2 - 1.410e-6 * T ** 3 print('Dielectric of Water is {}'.format(epsilon_r)) n = c * 1e3 * cst.Avogadro formCond = EngFormatter(unit='m') k = np.sqrt((cst.e ** 2 * 2 * n) / (cst.k * (T + 273.15) * cst.epsilon_0 * epsilon_r)) return [k, 1 / k] # , '{}'.format(formCond.format_eng(1/k))] def ElectrostaticPotential(T, sigma, Tau=5e18, pK=7.9, pH=11): return cst.k * T / cst.e * (np.log(-sigma / (cst.e * Tau + sigma)) + (pK - pH) * np.log(10)) def GetTempRedoxPotential(Cdiff=1e-3 / 1e-1, T=293.15): return cst.R * T / cst.physical_constants['Faraday constant'][0] * np.log(Cdiff) def GetRedoxError(cmax, cmin, ErrorPercent=0.1, T=293.15): cmaxErr = cmax * ErrorPercent cminErr = cmin * ErrorPercent return cst.R * T / cst.physical_constants['Faraday constant'][0] * np.sqrt( (1 / cmax ** 2 * cmaxErr ** 2 + 1 / cmin ** 2 * cminErr ** 2)) ##Result: Negative ion divided by positive ion # Goldman–Hodgkin–Katz voltage equation def GetIonSelectivityWithoutPotential(c_trans, c_cis, Erev, T): n_trans = c_trans # *1e3#*cst.Avogadro n_cis = c_cis # *1e3#*cst.Avogadro A = Erev * cst.physical_constants['Faraday constant'][0] / (cst.R * T) return -(n_trans - n_cis * np.exp(-A)) * (1 - np.exp(A)) / ((n_trans - n_cis * np.exp(A)) * (1 - np.exp(-A))) def GetIonSelectivityWithPotential(c_trans, c_cis, Erev, T, phi): sel1 = GetIonSelectivityWithoutPotential(c_trans, c_cis, Erev, T) return sel1 * np.exp((-2 * phi * cst.physical_constants['Faraday constant'][0]) / (cst.R * T)) def Selectivity(ConcGradient, V): return V * cst.physical_constants['Faraday constant'][0] / ((cst.R * 293.15) * np.log(ConcGradient)) def GetRedox(cmax, cmin, T=295.15): return cst.R * T / cst.physical_constants['Faraday constant'][0] * np.log(cmax / cmin) diff --git a/NanoporeClasses.py b/NanoporeClasses.py index eb30094..d534eed 100644 --- a/NanoporeClasses.py +++ b/NanoporeClasses.py @@ -1,138 +1,196 @@ import numpy as np from pprint import pprint import shelve import os import math from Plotting.EventPlots import PlotCurrentTrace,PlotCurrentTraceBaseline import matplotlib.pyplot as plt from matplotlib.ticker import EngFormatter Amp = EngFormatter(unit='A', places=2) Time = EngFormatter(unit='s', places=2) Volt = EngFormatter(unit='V', places=2) Cond = EngFormatter(unit='S', places=2) class TranslocationEvent: + """ + Class used to represent an event. + + Attributes + ---------- + filename : str + directory of the source file from which the event was extracted + type: str + classification of the event 'Real' (CUSUM-fitted), 'Rough' (non-CUSUM fitted), 'Impulse' (short events) + evenTrace: lst(int) + list of data points within the event + baseline:float + mean current value in the basline around the event + samplerate:float + sampling frequency + beginEvent: int + start coordinate of the event in the signal + endEvent: int + end coordinate of the event in the signal + meanTrace: float + mean current in the event + minTrace: float + minimum current in the event + eventLength: float + lenght of the event in seconds + currentDrop: float + current drop defined as the difference of the baseline and meanTrace or minTrace dependeng on the event type + before: lst(float) + list of data points before the event, used for plotting + after: lst(float) + list of data points after the event, used for plotting + changeTimes: lst(int) + list of coordinates where the current level changes in the event, only for multilevel events + kd: **** + segmentedSignal: lst(float) + CUSUM fit + beginEventCUSUM: + more precise start coordinate of the event detected with the CUSUM algorithm + currentDropCUSUM: + more precise start coordinate of the event detected with the CUSUM algorithm + coefficients: + coefficients used in the CUSUM algorithm + voltage: + voltage applied across the nanopore during the data recording + + """ + def __init__(self, filename,type='roughEvent'): self.filename = filename self.type=type def SetEvent(self,eventTrace,beginEvent,baseline,samplerate): self.eventTrace=eventTrace self.baseline=baseline self.samplerate=samplerate self.beginEvent=beginEvent self.endEvent=beginEvent+len(eventTrace) self.meanTrace=np.mean(eventTrace) self.minTrace = np.min(eventTrace) self.eventLength=len(eventTrace)/samplerate if self.type=='Rough': self.currentDrop=baseline-self.meanTrace else: self.currentDrop = baseline - self.minTrace def SetCoefficients(self,coefficients,voltage): self.coefficients=coefficients self.voltage=voltage def SetBaselineTrace(self, before,after): self.before=before self.after=after self.baseline=np.mean(np.append(before,after)) if self.type=='Rough': self.currentDrop = self.baseline - self.meanTrace else: self.currentDrop = self.baseline - self.minTrace def SetCUSUMVariables(self, segmentedSignal, kd, changeTimes): self.changeTimes=changeTimes self.kd=kd self.segmentedSignal=segmentedSignal self.changeTimes=changeTimes if len(changeTimes)>1: self.beginEventCUSUM=changeTimes[0] self.currentDropCUSUM=max(segmentedSignal)-min(segmentedSignal) if len(changeTimes)>2: self.endEventCUSUM = changeTimes[-1] self.eventLengthCUSUM=(changeTimes[-1]-changeTimes[0])/self.samplerate if hasattr(self,'before') and hasattr(self,'after') and hasattr(self,'eventTrace'): self.mcbefore=np.mean(self.before)*np.ones(len(self.before)) self.mcafter = np.mean(self.after) * np.ones(len(self.after)) self.mctrace=np.array([]) for ii in range(1,len(changeTimes)): self.mctrace=np.append(self.mctrace,np.mean(self.eventTrace[changeTimes[ii-1]-changeTimes[0]:changeTimes[ii]-changeTimes[0]])*np.ones(changeTimes[ii]-changeTimes[ii-1])) class AllEvents: + """" + Class used to represent all the events in a nanopore experiment output as a list of events. + + Attributes + ---------- + + events : lst(events) + list of events containing all the events detected by the eventdetection function + + """ + def __init__(self): self.events=[] def AddEvent(self, translocationEvent): if isinstance(translocationEvent,AllEvents): self.events.extend(translocationEvent.events) elif isinstance(translocationEvent,list): self.events.extend(translocationEvent) else: self.events.append(translocationEvent) def GetAllLengths(self): Lengths=[event.lengthEvents for event in self.events] return Lengths def GetAllIdrops(self): currentDrops=[event.currentDrop for event in self.events] return currentDrops def GetAllIdropsNorm(self): currentDrops = [event.currentDrop/ event.baseline for event in self.events] return currentDrops def SetFolder(self,loadname): self.savefile=loadname def GetEventsMinCondition(self,minCurrent=-math.inf,maxCurrent=math.inf,minLength=0,maxLength=math.inf): minCurrent = -math.inf if not minCurrent else minCurrent maxCurrent = math.inf if not maxCurrent else maxCurrent minLength = 0 if not minLength else minLength maxLength = math.inf if not maxLength else maxLength newEvents=AllEvents() for event in self.events: if minCurrent