Page MenuHomec4science

drives.py
No OneTemporary

File Metadata

Created
Tue, May 21, 20:14

drives.py

# -*- coding: utf-8 -*-
# @Author: Theo Lemaire
# @Email: theo.lemaire@epfl.ch
# @Date: 2020-01-30 11:46:47
# @Last Modified by: Theo Lemaire
# @Last Modified time: 2021-06-07 23:52:02
import abc
import numpy as np
from .stimobj import StimObject
from ..constants import *
from .batches import Batch
class Drive(StimObject):
''' Generic interface to drive object. '''
@abc.abstractmethod
def compute(self, t):
''' Compute the input drive at a specific time.
:param t: time (s)
:return: specific input drive
'''
raise NotImplementedError
@classmethod
def createQueue(cls, *args):
''' Create a list of Drive objects for combinations of input parameters. '''
if len(args) == 1:
return [cls(item) for item in args[0]]
else:
return [cls(*item) for item in Batch.createQueue(*args)]
@property
def is_searchable(self):
return False
class XDrive(Drive):
''' Drive object that can be titrated to find the threshold value of one of its inputs. '''
xvar_initial = None
xvar_rel_thr = None
xvar_thr = None
xvar_precheck = False
@property
@abc.abstractmethod
def xvar(self):
raise NotImplementedError
@xvar.setter
@abc.abstractmethod
def xvar(self, value):
raise NotImplementedError
def updatedX(self, value):
other = self.copy()
other.xvar = value
return other
@property
def is_searchable(self):
return True
@property
def is_resolved(self):
return self.xvar is not None
def nullCopy(self):
return self.copy().updatedX(0.)
class ElectricDrive(XDrive):
''' Electric drive object with constant amplitude. '''
xkey = 'I'
xvar_initial = ESTIM_AMP_INITIAL
xvar_rel_thr = ESTIM_REL_CONV_THR
xvar_range = (0., ESTIM_AMP_UPPER_BOUND)
def __init__(self, I):
''' Constructor.
:param I: current density (mA/m2)
'''
self.I = I
@property
def I(self):
return self._I
@I.setter
def I(self, value):
if value is not None:
value = self.checkFloat('I', value)
self._I = value
@property
def xvar(self):
return self.I
@xvar.setter
def xvar(self, value):
self.I = value
def copy(self):
return self.__class__(self.I)
@staticmethod
def inputs():
return {
'I': {
'desc': 'current density amplitude',
'label': 'I',
'unit': 'A/m2',
'factor': 1e-3,
'precision': 1
}
}
def compute(self, t):
return self.I
class VoltageDrive(Drive):
''' Voltage drive object with a held potential and a step potential. '''
def __init__(self, Vhold, Vstep):
''' Constructor.
:param Vhold: held voltage (mV)
:param Vstep: step voltage (mV)
'''
self.Vhold = Vhold
self.Vstep = Vstep
@property
def Vhold(self):
return self._Vhold
@Vhold.setter
def Vhold(self, value):
value = self.checkFloat('Vhold', value)
self._Vhold = value
@property
def Vstep(self):
return self._Vstep
@Vstep.setter
def Vstep(self, value):
value = self.checkFloat('Vstep', value)
self._Vstep = value
def copy(self):
return self.__class__(self.Vhold, self.Vstep)
@staticmethod
def inputs():
return {
'Vhold': {
'desc': 'held voltage',
'label': 'V_{hold}',
'unit': 'V',
'precision': 0,
'factor': 1e-3
},
'Vstep': {
'desc': 'step voltage',
'label': 'V_{step}',
'unit': 'V',
'precision': 0,
'factor': 1e-3
}
}
@property
def filecodes(self):
return {
'Vhold': f'{self.Vhold:.1f}mV',
'Vstep': f'{self.Vstep:.1f}mV',
}
def compute(self, t):
return self.Vstep
class AcousticDrive(XDrive):
''' Acoustic drive object with intrinsic frequency and amplitude. '''
xkey = 'A'
xvar_initial = ASTIM_AMP_INITIAL
xvar_rel_thr = ASTIM_REL_CONV_THR
xvar_thr = ASTIM_ABS_CONV_THR
xvar_precheck = True
def __init__(self, f, A=None, phi=np.pi):
''' Constructor.
:param f: carrier frequency (Hz)
:param A: peak pressure amplitude (Pa)
:param phi: phase (rad)
'''
self.f = f
self.A = A
self.phi = phi
@property
def f(self):
return self._f
@f.setter
def f(self, value):
value = self.checkFloat('f', value)
self.checkStrictlyPositive('f', value)
self._f = value
@property
def A(self):
return self._A
@A.setter
def A(self, value):
if value is not None:
value = self.checkFloat('A', value)
self.checkPositiveOrNull('A', value)
self._A = value
@property
def phi(self):
return self._phi
@phi.setter
def phi(self, value):
value = self.checkFloat('phi', value)
self._phi = value
def pdict(self, **kwargs):
d = super().pdict(**kwargs)
if self.phi == np.pi:
del d['phi']
return d
@property
def xvar(self):
return self.A
@xvar.setter
def xvar(self, value):
self.A = value
def copy(self):
return self.__class__(self.f, self.A, phi=self.phi)
@staticmethod
def inputs():
return {
'f': {
'desc': 'US drive frequency',
'label': 'f',
'unit': 'Hz',
'precision': 0
},
'A': {
'desc': 'US pressure amplitude',
'label': 'A',
'unit': 'Pa',
'precision': 2
},
'phi': {
'desc': 'US drive phase',
'label': '\Phi',
'unit': 'rad',
'precision': 2
}
}
@property
def dt(self):
''' Determine integration time step. '''
return 1 / (NPC_DENSE * self.f)
@property
def dt_sparse(self):
return 1 / (NPC_SPARSE * self.f)
@property
def periodicity(self):
''' Determine drive periodicity. '''
return 1. / self.f
@property
def nPerCycle(self):
return NPC_DENSE
@property
def modulationFrequency(self):
return self.f
def compute(self, t):
return self.A * np.sin(2 * np.pi * self.f * t - self.phi)
class DriveIterator:
def __init__(self, drives):
self._drives = drives
self._index = 0
def __next__(self):
if self._index < len(self._drives):
result = self._drives[self._index]
self._index += 1
return result
raise StopIteration
class DriveArray(Drive):
def __init__(self, drives):
self.drives = {f'drive {i + 1}': s for i, s in enumerate(drives)}
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.ndrives != other.ndrives:
return False
if list(self.drives.keys()) != list(other.drives.keys()):
return False
for k, v in self.drives.items():
if other.drives[k] != v:
return False
return True
def toArray(self, dlist, skey='=', jkey=', ', wraplist=True):
# Arrange parameters as key: list of values dictionary (with right-hand sides of split key)
d = {}
for k in dlist[0].keys():
if k == 'phi':
d[k] = [x.get(k, f'phi{skey}3.14rad').split(skey)[1] for x in dlist]
else:
d[k] = [x[k].split(skey)[1] for x in dlist]
# d = {k: [x[k].split(skey)[1] for x in dlist] for k in dlist[0].keys()}
# Discard duplicates in each list (while retaining element order)
d = {k: [v[i] for i in sorted(np.unique(v, return_index=True)[1])] for k, v in d.items()}
# Format each list element as a string
dstr = {k: jkey.join(v) for k, v in d.items()}
# Wrap multi-values elements if specified
if wraplist:
dstr = {k: f'[{v}]' if len(d[k]) > 1 else v for k, v in dstr.items()}
# Re-add splitkey formulation and return dictionary
return {k: f"{k}{skey}{v}" for k, v in dstr.items()}
def __repr__(self):
pdict = self.toArray([x.pdict() for x in self.drives.values()], skey='=')
return f'{self.__class__.__name__}({", ".join(pdict.values())})'
def __getitem__(self, i):
return list(self.drives.values())[i]
def __len__(self):
return len(self.drives)
def __iter__(self):
return DriveIterator(list(self.drives.values()))
@staticmethod
def inputs():
return self.drives.values()[0].inputs()
def copy(self):
return self.__class__([x.copy() for x in self.drives.values()])
@property
def ndrives(self):
return len(self.drives)
@property
def meta(self):
return {k: s.meta for k, s in self.drives.items()}
@property
def desc(self):
pdict = self.toArray([x.pdict() for x in self.drives.values()], skey='=')
return ', '.join(pdict.values())
@property
def filecodes(self):
return self.toArray(
[x.filecodes for x in self.drives.values()], skey='_', jkey='_', wraplist=False)
def compute(self, t):
return sum(s.compute(t) for s in self.drives.values())
def updatedX(self, value):
return self.__class__([d.updatedX(value) for d in self.drives.values()])
def nullCopy(self):
return self.copy().updatedX(0.)
class AcousticDriveArray(DriveArray):
def is_monofrequency(self):
return all(x.f == self[0].f for x in self)
@property
def fmax(self):
return max(s.f for s in self.drives.values())
@property
def fmin(self):
return min(s.f for s in self.drives.values())
@property
def dt(self):
return 1 / (NPC_DENSE * self.fmax)
@property
def dt_sparse(self):
return 1 / (NPC_SPARSE * self.fmax)
@property
def periodicity(self):
if self.is_monofrequency():
return self[0].periodicity # s
if self.ndrives > 2:
raise ValueError('cannot compute periodicity for more than two drives')
return 1 / (self.fmax - self.fmin)
@property
def nPerCycle(self):
return int(self.periodicity // self.dt)
@property
def modulationFrequency(self):
return np.mean([s.f for s in self.drives.values()])

Event Timeline