diff --git a/EventDetection.py b/EventDetection.py index 19923e9..480d59b 100644 --- a/EventDetection.py +++ b/EventDetection.py @@ -1,480 +1,484 @@ import NanoporeClasses as NC import shelve import os import glob from time import sleep import Functions from pprint import pprint import matplotlib.pyplot as plt from matplotlib.ticker import EngFormatter import numpy as np import sys import argparse import datetime from tkinter.filedialog import askopenfilenames,askdirectory import timeit import LoadData timeInSec = EngFormatter(unit='s', places=2) Amp = EngFormatter(unit='A', places=2) #Default parameters extension = '*.log' coefficients = {'a': 0.999, 'E': 0, 'S': 5, 'maxEventLength': 200e-3, # maximal event length to be considered an event 'minEventLength': 600e-6, # maximal event length to be considered an impulse 'fitLength': 3e-3, # minimal event length to be fitted for impulses 'dt': 25, # go back dt points for fitting of impulses 'hbook': 1, 'delta': 0.2e-9, 'deltaRel': None, # If set overrides delta, calculates delta relative to baseline 'ChimeraLowPass': 10e3} def GetParameters(): print("Usage:") print("run(inputData, newExtension=None, newCoefficients={}, outputFile=None, " "force=False, cut=False, verboseLevel=0)") print() print("Default extension:") print(extension) print() print("Default coefficients:") pprint(coefficients) def eventdetectionwithfilegeneration(file, coefficients, verboseLevel=1, forceRun=False, CutTraces=False): # Create new class that contains all events AllEvents = NC.AllEvents() # Loop over all files in folder # Extract filename and generate filename to save located events if verboseLevel >= 1: print('analysing ' + file) directory = os.path.dirname(file) + os.sep + 'analysisfiles' if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) filename, file_extension = os.path.splitext(os.path.basename(file)) savefilename = directory + os.sep + filename + 'data' shelfFile = shelve.open(savefilename) if ~forceRun: try: #Check if file can be loaded coefficientsloaded=shelfFile['coefficients'] tEL = shelfFile['TranslocationEvents'] # If coefficients before are not identical, analysis needs to run again if (coefficientsloaded == coefficients): if verboseLevel >= 1: print('loaded from file') else: forceRun = True except: # Except if cannot be loaded, analysis needs to run forceRun = True pass if forceRun: # Extract list of events for this file tEL = eventdetection(file, coefficients, verboseLevel, CutTraces) if verboseLevel >= 1: print('Saved {} events'.format(len(tEL.events))) # Open savefile and save events for this file shelfFile['TranslocationEvents'] = tEL shelfFile['coefficients'] = coefficients if verboseLevel >= 1: print('saved to file') shelfFile.close() # Add events to the initially created class that contains all events AllEvents.AddEvent(tEL) return AllEvents def batcheventdetection(folder, extension, coefficients, verboseLevel=1, forceRun=False, CutTraces=False): # Create new class that contains all events AllEvents = NC.AllEvents() print('found ' + str(len(glob.glob(os.path.join(folder, extension)))) + ' files.') # Loop over all files in folder for fullfilename in glob.glob(os.path.join(folder, extension)): # Extract filename and generate filename to save located events if verboseLevel >= 1: print('analysing '+fullfilename) filename, file_extension = os.path.splitext(os.path.basename(fullfilename)) directory = os.path.dirname(fullfilename) + os.sep + 'analysisfiles' if not os.path.exists(directory): os.makedirs(directory,exist_ok=True) savefilename = directory + os.sep + filename + 'data' shelfFile = shelve.open(savefilename) if ~forceRun: try: #Check if file can be loaded coefficientsloaded=shelfFile['coefficients'] tEL = shelfFile['TranslocationEvents'] # If coefficients before are not identical, analysis needs to run again if (coefficientsloaded == coefficients): if verboseLevel >= 1: print('loaded from file') else: forceRun = True except: # Except if cannot be loaded, analysis needs to run forceRun = True pass if forceRun: # Extract list of events for this file tEL = eventdetection(fullfilename, coefficients, verboseLevel, CutTraces) if verboseLevel >= 1: print('Saved {} events'.format(len(tEL.events))) # Open savefile and save events for this file shelfFile['TranslocationEvents'] = tEL shelfFile['coefficients'] = coefficients if verboseLevel >= 1: print('saved to file') shelfFile.close() # Add events to the initially created class that contains all events AllEvents.AddEvent(tEL) return AllEvents def eventdetection(fullfilename, coefficients, verboseLevel=1, CutTraces=False, showFigures=False): """ Function used to find the events of TranslocationEvents class in the raw data in file 'filename'. It calls the function RecursiveLowPassFast to approximatively locate rough events in the data. If a short TranslocationEvent object is detected its type attribute will be changed to 'Impulse' and the meanTrace attribute will take the value of the minimal current value within the event. Then the CUSUM function will be called to build the CUSUM-fit and assign values to the different attributes of the TranslocationEvent objects. Depending on how the CUSUM was able to fit the trace inside and around the event, the type attribute of the TransocationEvent will be set to 'Real' (if the CUSUM fit went well) or 'Rough' (if the CUSUM was not able to fit the trace). Parameters ---------- fullfilename : str Full path to data file. coefficients : dict Contains the parameters for the analysis. verboseLevel : bool, optional 1 by default. It will print strings indicating the progress of the function in the console with different levels of depth. CutTraces : bool, optional False by default. If True, will cut the signal traces around the events to avoid having appended chunks detected as events. showFigures : bool , optional False by default. If True, it will display a simple figure with the shape of the signal. Returns ------- list of TranslocationEvent All the events in the signal. """ + if os.path.getsize(fullfilename) <= 8: + print('File incorrect size') + return -1 + if 'ChimeraLowPass' in coefficients: ChimeraLowPass = coefficients['ChimeraLowPass'] else: ChimeraLowPass = None loadedData = LoadData.OpenFile(fullfilename, ChimeraLowPass, True, CutTraces) maxTime = coefficients['maxEventLength'] minTime = coefficients['minEventLength'] fitTime = coefficients['fitLength'] IncludedBaseline = int(1e-2 * loadedData['samplerate']) delta = coefficients['delta'] hbook = coefficients['hbook'] dt = coefficients['dt'] deltaRel = coefficients['deltaRel'] samplerate = loadedData['samplerate'] if verboseLevel >= 1: print('\n Recursive lowpass...', end='') # Call RecursiveLowPassFast to detect events in current trace start_time = timeit.default_timer() if 'i1Cut' in loadedData: events = [] for cutTrace in loadedData['i1Cut']: events.extend(Functions.RecursiveLowPassFast(cutTrace, coefficients, loadedData['samplerate'])) else: events = Functions.RecursiveLowPassFast(loadedData['i1'], coefficients, loadedData['samplerate']) if verboseLevel >= 1: print('done. Calculation took {}'.format(timeInSec.format_data(timeit.default_timer() - start_time))) print('Roughly detected events: {}'.format(len(events))) # Make a new class translocationEventList that contains all the found events in this file translocationEventList = NC.AllEvents() # Plot current file if showFigures: plt.figure(1) plt.cla() timeVals=np.linspace(0, len(loadedData['i1'])/loadedData['samplerate'], num=len(loadedData['i1'])) plt.plot(timeVals,loadedData['i1']) plt.draw() plt.pause(0.001) if verboseLevel >= 1: print('Fine tuning...', end='') # Call RecursiveLowPassFast to detect events in current trace start_time = timeit.default_timer() cusumEvents = 0 # Loop over all roughly detected events for event in events: beginEvent = event[0] endEvent = event[1] localBaseline = event[2] stdEvent = event[3] if 'blockLength' in loadedData: voltI = int(beginEvent / loadedData['blockLength']) else: voltI = int(0) # Check if the start of the event is later than the first few ms in IncludedBaseline if beginEvent > IncludedBaseline: # Extract trace and extract baseline traceforfitting = loadedData['i1'][int(beginEvent - dt):int(endEvent + dt)] # If event is shorter than fitTime if 3 < (endEvent-beginEvent) <= (fitTime * samplerate): # Try to fit and estimate real output = Functions.ImpulseFitting(traceforfitting, localBaseline, samplerate, verbose=(verboseLevel >= 3)) elif (endEvent-beginEvent) < (maxTime * samplerate): output = 2 else: if verboseLevel >= 2: print('Too long event {t:4.0f} ms\n'.format(t=1000*(endEvent-beginEvent)/samplerate)) output = -1 # Don't include in rest of analysis # if output -1 fit failed, if output -2 differential failed if not isinstance(output, int) and len(output) > 1: (ps1, pe1, pe2, rsquared_event, Idrop) = output if verboseLevel >= 3: print('ps1: {}, pe1: {}, pe2: {}, rsquared: {}'.format(ps1, pe1, pe2, rsquared_event)) # length small enough and r squared big enough if (pe2-ps1)/samplerate < minTime and rsquared_event > 0.7: newEvent = NC.TranslocationEvent(fullfilename, 'Impulse') trace = traceforfitting[ps1:pe2] ps1c = beginEvent - 25 + ps1 pe2c = beginEvent - 25 + pe2 traceBefore = loadedData['i1'][int(ps1c) - IncludedBaseline:int(ps1c)] traceAfter = loadedData['i1'][int(pe2c):int(pe2c) + IncludedBaseline] newEvent.SetEvent(trace, ps1c, localBaseline, samplerate, currentDrop=Idrop) newEvent.SetCoefficients(coefficients, loadedData['v1'][voltI]) newEvent.SetBaselineTrace(traceBefore, traceAfter) newEvent.eventLength = (pe1-ps1)/samplerate if verboseLevel >= 2: print('Fitted impulse of {t:1.3f} ms and {i:2.2f} nA and {r:1.2f} R-squared\n'.format( t=newEvent.eventLength*1e3, i=newEvent.currentDrop*1e9, r=rsquared_event)) elif (pe2-ps1)/samplerate < 3 * minTime and rsquared_event < 0.5: if verboseLevel >= 2: print('Bad fit {r:1.2f} R-squared, event ignored.\n'.format(r=rsquared_event)) output = -1 # Don't include in rest of analysis else: if verboseLevel >= 3: print('Too long event for impulse') output = 2 # Good enough for CUSUM fitting if isinstance(output, int) and abs(output) == 2: # CUSUM fit sigma = np.sqrt(stdEvent) # If deltaRel is set calculate delta based on relative value to baseline if deltaRel is not None: delta = deltaRel * localBaseline if verboseLevel >= 2: print('Using relative delta for CUSUM fitting: {}'.format(Amp.format_data(delta))) h = hbook * delta / sigma (mc, kd, krmv) = Functions.CUSUM(traceforfitting, delta, h, verbose=(verboseLevel >= 3)) krmv = [krmvVal + int(beginEvent) - dt + 1 for krmvVal in krmv] if len(krmv) > 1: trace = loadedData['i1'][int(krmv[0]):int(krmv[-1])] traceBefore = loadedData['i1'][int(krmv[0]) - IncludedBaseline:int(krmv[0])] traceAfter = loadedData['i1'][int(krmv[-1]):int(krmv[-1]) + IncludedBaseline] beginEvent = krmv[0] # Adding CUSUM fitted event newEvent = NC.TranslocationEvent(fullfilename, 'CUSUM') newEvent.SetEvent(trace, beginEvent, localBaseline, samplerate) newEvent.SetBaselineTrace(traceBefore, traceAfter) newEvent.SetCoefficients(coefficients, loadedData['v1'][voltI]) newEvent.SetCUSUMVariables(mc, kd, krmv) cusumEvents += 1 if verboseLevel >= 2: print('Fitted CUSUM of {t:1.3f} ms and {i:2.2f} nA\n'.format( t=newEvent.eventLengthCUSUM * 1e3, i=newEvent.currentDropCUSUM * 1e9)) else: trace = loadedData['i1'][int(beginEvent):int(endEvent)] traceBefore = loadedData['i1'][int(beginEvent) - IncludedBaseline:int(beginEvent)] traceAfter = loadedData['i1'][int(endEvent):int(endEvent) + IncludedBaseline] # Adding roughly detected event newEvent = NC.TranslocationEvent(fullfilename, 'Rough') newEvent.SetEvent(trace, beginEvent, localBaseline, samplerate) newEvent.SetBaselineTrace(traceBefore, traceAfter) newEvent.SetCoefficients(coefficients, loadedData['v1'][voltI]) if verboseLevel >= 2: print('CUSUM failed. Adding only roughly located event of {t:1.3f} ms and {i:2.2f} nA\n'.format( t=len(trace) * 1e3/samplerate, i=(np.mean(np.append(traceBefore,traceAfter)) - np.mean(trace) )* 1e9)) if output != -1: # Add event to TranslocationList translocationEventList.AddEvent(newEvent) if verboseLevel >= 1: print('done. Total fitting took {}'.format(timeInSec.format_data(timeit.default_timer() - start_time))) print('{} events fitted with CUSUM\n'.format(cusumEvents)) #Plot events if True if showFigures: minVal = np.max(loadedData['i1'])#-1.05e-9 #np.min(loadedData['i1']) maxVal = np.max(loadedData['i1'])+0.05e-9#-1.05e-9 #np.min(loadedData['i1']) for i in range(len(events)): beginEvent=events[i][0] endEvent = events[i][1] if beginEvent>100 and (endEvent - beginEvent) >= (minTime * loadedData['samplerate']) and (endEvent - beginEvent) < ( coefficients['maxEventLength'] * loadedData['samplerate']): plt.plot([beginEvent/loadedData['samplerate'], beginEvent/loadedData['samplerate']], [minVal, maxVal], 'y-', lw=1) plt.show() return translocationEventList def LoadEvents(loadname): # Check if loadname is a directory or not if os.path.isdir(loadname): # create standard filename for loading loadFile = os.path.join(loadname,os.path.basename(loadname)+'_Events') else: loadFile, file_extension = os.path.splitext(loadname) # Open file and extract TranslocationEvents shelfFile = shelve.open(loadFile) TranslocationEvents = shelfFile['TranslocationEvents'] shelfFile.close() AllEvents = NC.AllEvents() AllEvents.AddEvent(TranslocationEvents) AllEvents.SetFolder(loadname) return AllEvents def run(inputData, newExtension=None, newCoefficients={}, outputFile=None, force=False, cut=False, verboseLevel=0): """ Function used to call all the other functions in the module needed to find the events in raw nanopore experiment data. Parameters ----------- inputData : str Full path to data file. newExtension : str, optional None by default. NewExtension for input directory. newCoefficients : dict Contains the default parameters for the analysis. outputFile : str, optional None by default. Full path to output file. force : bool, optional False by default. cut : bool, optional False by default. False by default. If True, will cut the signal traces around the events to avoid having appended chunks detected as events. verboseLevel : int, optional 0 by default. 0 by default. If higher, it will print various outputs during running Returns ------- AllEvents object All the events. """ if newExtension is None: newExtension = extension for key in newCoefficients: coefficients[key]=newCoefficients[key] if os.path.isdir(inputData): TranslocationEvents = batcheventdetection(inputData, newExtension, coefficients, verboseLevel, force, cut) else: TranslocationEvents = eventdetection(inputData,coefficients, verboseLevel, cut) #Check if list is empty if outputFile is not None and TranslocationEvents.events: if os.path.isdir(inputData): outputData = inputData + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") LoadData.SaveVariables(outputFile, TranslocationEvents=TranslocationEvents) return True else: return TranslocationEvents if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-i', '--input', help='Input directory or file') parser.add_argument('-e', '--ext', help='Extension for input directory') parser.add_argument('-o', '--output', help='Output file for saving') parser.add_argument('-c', '--coeff', help='Coefficients for selecting events [-C filter E standarddev maxlength minlength', nargs='+') parser.add_argument('-u', '--cut', help='Cut Traces before detecting event, prevent detecting appended chunks as event', action='store_true') parser.add_argument('-f', '--force', help='Force analysis to run (don''t load from file', action='store_true') args = parser.parse_args() inputData = args.input if inputData is None: inputData = askdirectory() outputData = args.output if outputData is None: if os.path.isdir(inputData): outputData = inputData + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") else: outputData = os.path.dirname(inputData) + os.sep + 'Data' + os.sep + 'Data' + datetime.date.today().strftime("%Y%m%d") if args.coeff is not None: if len(args.coeff) % 2 == 0: for i in range(0, len(args.coeff), 2): if i <= len(args.coeff): coefficients[str(args.coeff[i])]=float(args.coeff[i+1]) if args.ext is not None: extension = args.ext print('Loading from: ' + inputData) print('Saving to: ' + outputData) print('Coefficients are: ', end='') pprint(coefficients) if os.path.isdir(inputData): print('extension is: ' + extension +'\nStarting.... \n') TranslocationEvents = batcheventdetection(inputData, extension, newcoefficients=coefficients, force=args.force, cut=args.cut) else: print('Starting.... \n') TranslocationEvents = eventdetection(inputData,coefficients, args.cut) #Check if list is empty if TranslocationEvents.events: LoadData.SaveVariables(outputData, TranslocationEvents=TranslocationEvents)