diff --git a/pypulseq/Sequence/block.py b/pypulseq/Sequence/block.py index 038804c..e20038b 100755 --- a/pypulseq/Sequence/block.py +++ b/pypulseq/Sequence/block.py @@ -1,220 +1,221 @@ from types import SimpleNamespace import numpy as np from pypulseq.calc_duration import calc_duration from pypulseq.compress_shape import compress_shape from pypulseq.decompress_shape import decompress_shape -def add_block(self, block_index, *args): +def add_block(self, block_index: int, *args): """ - Adds supplied list of Holder objects at position specified by block_index. + Inserts list of `SimpleNamespace` event objects into `self.block_events` at position `block_index`. + Also performs gradient checks. Parameters ---------- block_index : int - Index at which Block has to be inserted. + Index at which SimpleNamespace events have to be inserted into `self.block_events`. args : list - List of Holder objects to be added as a Block. + List of `SimpleNamespace` event objects to be added as a Pulseq block. """ block_duration = calc_duration(*args) self.block_events[block_index] = np.zeros(6, dtype=np.int) duration = 0 check_g = {} for event in args: if event.type == 'rf': mag = np.abs(event.signal) amplitude = np.max(mag) mag = np.divide(mag, amplitude) # Following line of code is a workaround for numpy's divide functions returning NaN when mathematical # edge cases are encountered (eg. divide by 0) mag[np.isnan(mag)] = 0 phase = np.angle(event.signal) phase[phase < 0] += 2 * np.pi phase /= 2 * np.pi mag_shape = compress_shape(mag) data = np.insert(mag_shape.data, 0, mag_shape.num_samples) mag_id, found = self.shape_library.find(data) if not found: self.shape_library.insert(mag_id, data) phase_shape = compress_shape(phase) data = np.insert(phase_shape.data, 0, phase_shape.num_samples) phase_id, found = self.shape_library.find(data) if not found: self.shape_library.insert(phase_id, data) use = 0 use_cases = {'excitation': 1, 'refocusing': 2, 'inversion': 3} if hasattr(event, 'use'): use = use_cases[event.use] data = np.array((amplitude, mag_id, phase_id, event.delay, event.freq_offset, event.phase_offset, event.dead_time, event.ringdown_time, use)) data_id, found = self.rf_library.find(data) if not found: self.rf_library.insert(data_id, data) self.block_events[block_index][1] = data_id duration = max(duration, max( mag.shape) * self.rf_raster_time + event.dead_time + event.ringdown_time + event.delay) elif event.type == 'grad': channel_num = ['x', 'y', 'z'].index(event.channel) idx = 2 + channel_num check_g[channel_num] = SimpleNamespace() check_g[channel_num].idx = idx check_g[channel_num].start = np.array((event.delay + min(event.t), event.first)) check_g[channel_num].stop = np.array( (event.delay + max(event.t) + self.system.grad_raster_time, event.last)) amplitude = max(abs(event.waveform)) if amplitude > 0: g = event.waveform / amplitude else: g = event.waveform shape = compress_shape(g) data = np.insert(shape.data, 0, shape.num_samples) shape_id, found = self.shape_library.find(data) if not found: self.shape_library.insert(shape_id, data) data = np.array([amplitude, shape_id, event.delay, event.first, event.last]) grad_id, found = self.grad_library.find(data) if not found: self.grad_library.insert(grad_id, data, 'g') idx = 2 + channel_num self.block_events[block_index][idx] = grad_id duration = max(duration, len(g) * self.grad_raster_time) elif event.type == 'trap': channel_num = ['x', 'y', 'z'].index(event.channel) idx = 2 + channel_num check_g[channel_num] = SimpleNamespace() check_g[channel_num].idx = idx check_g[channel_num].start = np.array((0, 0)) check_g[channel_num].stop = np.array((event.delay + event.rise_time + event.fall_time + event.flat_time, 0)) data = np.array([event.amplitude, event.rise_time, event.flat_time, event.fall_time, event.delay]) trap_id, found = self.grad_library.find(data) if not found: self.grad_library.insert(trap_id, data, 't') self.block_events[block_index][idx] = trap_id duration = max(duration, event.delay + event.rise_time + event.flat_time + event.fall_time) elif event.type == 'adc': data = np.array( [event.num_samples, event.dwell, max(event.delay, event.dead_time), event.freq_offset, event.phase_offset, event.dead_time]) adc_id, found = self.adc_library.find(data) if not found: self.adc_library.insert(adc_id, data) self.block_events[block_index][5] = adc_id duration = max(duration, event.delay + event.num_samples * event.dwell + event.dead_time) elif event.type == 'delay': data = np.array([event.delay]) delay_id, found = self.delay_library.find(data) if not found: self.delay_library.insert(delay_id, data) self.block_events[block_index][0] = delay_id duration = max(duration, event.delay) # ========= # PERFORM GRADIENT CHECKS # ========= for cg_temp in check_g.keys(): cg = check_g[cg_temp] if abs(cg.start[1]) > self.system.max_slew * self.system.grad_raster_time: if cg.start[0] != 0: raise ValueError('No delay allowed for gradients which start with a non-zero amplitude') if block_index > 1: prev_id = self.block_events[block_index - 1][cg.idx] if prev_id != 0: prev_lib = self.grad_library.get(prev_id) prev_dat = prev_lib['data'] prev_type = prev_lib['type'] if prev_type == 't': raise Exception( 'Two consecutive gradients need to have the same amplitude at the connection point') elif prev_type == 'g': last = prev_dat[4] if abs(last - cg.start[1]) > self.system.max_slew * self.system.grad_raster_time: raise Exception( 'Two consecutive gradients need to have the same amplitude at the connection point') else: raise Exception('First gradient in the the first block has to start at 0.') if cg.stop[1] > self.system.max_slew * self.system.grad_raster_time and abs(cg.stop[0] - block_duration) > 1e-7: raise Exception('A gradient that doesnt end at zero needs to be aligned to the block boundary.') -def get_block(self, block_index): +def get_block(self, block_index: int) -> SimpleNamespace: """ - Returns Block at position specified by block_index. + Returns Pulseq block at `block_index` position in `self.block_events`. Parameters ---------- block_index : int - Index of Block to be retrieved. + Index of Pulseq block to be retrieved from `self.block_events`. Returns ------- - block : dict - Block at position specified by block_index. + block : SimpleNamespace + Pulseq block at 'block_index' position in `self.block_events`. """ block = SimpleNamespace() event_ind = self.block_events[block_index] if event_ind[0] > 0: delay = SimpleNamespace() delay.type = 'delay' delay.delay = self.delay_library.data[event_ind[0]][0] block.delay = delay elif event_ind[1] > 0: block.rf = self.rf_from_lib_data(self.rf_library.data[event_ind[1]]) grad_channels = ['gx', 'gy', 'gz'] for i in range(1, len(grad_channels) + 1): if event_ind[2 + (i - 1)] > 0: grad, compressed = SimpleNamespace(), SimpleNamespace() type = self.grad_library.type[event_ind[2 + (i - 1)]] lib_data = self.grad_library.data[event_ind[2 + (i - 1)]] grad.type = 'trap' if type == 't' else 'grad' grad.channel = grad_channels[i - 1][1] if grad.type == 'grad': amplitude = lib_data[0] shape_id = lib_data[1] delay = lib_data[2] shape_data = self.shape_library.data[shape_id] compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] g = decompress_shape(compressed) grad.waveform = amplitude * g grad.t = np.arange(g.size) * self.grad_raster_time grad.delay = delay if len(lib_data) > 4: grad.first = lib_data[3] grad.last = lib_data[4] else: grad.first = grad.waveform[0] grad.last = grad.waveform[-1] else: grad.amplitude, grad.rise_time, grad.flat_time, grad.fall_time, grad.delay = [lib_data[x] for x in range(5)] grad.area = grad.amplitude * (grad.flat_time + grad.rise_time / 2 + grad.fall_time / 2) grad.flat_area = grad.amplitude * grad.flat_time setattr(block, grad_channels[i - 1], grad) if event_ind[5] > 0: lib_data = self.adc_library.data[event_ind[5]] if max(lib_data.shape) < 6: lib_data = np.append(lib_data, 0) adc = SimpleNamespace() adc.num_samples, adc.dwell, adc.delay, adc.freq_offset, adc.phase_offset, adc.dead_time = [lib_data[x] for x in range(6)] adc.num_samples = int(adc.num_samples) adc.type = 'adc' block.adc = adc return block diff --git a/pypulseq/Sequence/read_seq.py b/pypulseq/Sequence/read_seq.py index 679bf13..21ef0a6 100755 --- a/pypulseq/Sequence/read_seq.py +++ b/pypulseq/Sequence/read_seq.py @@ -1,209 +1,267 @@ import numpy as np from pypulseq.event_lib import EventLibrary +from pathlib import Path -def read(self, path, **kwargs): +def read(self, path: Path, **kwargs): """ - Reads .seq file from path, and constructs a Sequence object from the file. + Constructs a `Sequence` object by reading .seq file from `path`. Parameters ---------- - path : str + path : Path Path of .seq file to be read. """ detect_rf_use = True if 'detect_rf_use' in kwargs else False input_file = open(path, 'r') self.shape_library = EventLibrary() self.rf_library = EventLibrary() self.grad_library = EventLibrary() self.adc_library = EventLibrary() self.delay_library = EventLibrary() self.block_events = {} self.rf_raster_time = self.system.rf_raster_time self.grad_raster_time = self.system.grad_raster_time self.definitions = {} while True: - section = skip_comments(input_file) + section = __skip_comments(input_file) if section == -1: break if section == '[DEFINITIONS]': - self.definitions = read_definitions(input_file) + self.definitions = __read_definitions(input_file) elif section == '[VERSION]': - version_major, version_minor, version_revision = read_version(input_file) + version_major, version_minor, version_revision = __read_version(input_file) if version_major != self.version_major or version_minor != self.version_minor or version_revision != self.version_revision: raise Exception( f'Unsupported version: {version_major, version_minor, version_revision}. ' f'Expected: {self.version_major, self.version_minor, self.version_revision}') elif section == '[BLOCKS]': - self.block_events = read_blocks(input_file) + self.block_events = __read_blocks(input_file) elif section == '[RF]': - self.rf_library = read_events(input_file, [1, 1, 1, 1e-6, 1, 1]) + self.rf_library = __read_events(input_file, [1, 1, 1, 1e-6, 1, 1]) elif section == '[GRAD]': - self.grad_library = read_events(input_file, [1, 1, 1e-6], 'g', self.grad_library) + self.grad_library = __read_events(input_file, [1, 1, 1e-6], 'g', self.grad_library) elif section == '[TRAP]': - self.grad_library = read_events(input_file, [1, 1e-6, 1e-6, 1e-6, 1e-6], 't', self.grad_library) + self.grad_library = __read_events(input_file, [1, 1e-6, 1e-6, 1e-6, 1e-6], 't', self.grad_library) elif section == '[ADC]': - self.adc_library = read_events(input_file, [1, 1e-9, 1e-6, 1, 1]) + self.adc_library = __read_events(input_file, [1, 1e-9, 1e-6, 1, 1]) elif section == '[DELAYS]': - self.delay_library = read_events(input_file, 1e-6) + self.delay_library = __read_events(input_file, 1e-6) elif section == '[SHAPES]': - self.shape_library = read_shapes(input_file) + self.shape_library = __read_shapes(input_file) else: raise ValueError(f'Unknown section code: {section}') if detect_rf_use: for k in self.rf_library.keys(): lib_data = self.rf_library.data[k] rf = self.rf_from_lib_data(lib_data) flip_deg = abs(np.sum(rf.signal)) * rf.t[0] * 360 if len(lib_data) < 9: if flip_deg <= 90: lib_data[8] = 0 else: lib_data[8] = 2 self.rf_library.data[k] = lib_data -def read_definitions(input_file): +def __read_definitions(input_file) -> dict: + """ + Read definitions from .seq file. + + Parameters + ---------- + input_file : file object + .seq file + + Returns + ------- + definitions : dict + Dict object containing key value pairs of definitions. + """ definitions = dict() - line = strip_line(input_file) + line = __strip_line(input_file) while line != '' and line[0] != '#': tok = line.split(' ') if not any([x.isalpha() for x in tok[1:]]): definitions[tok[0]] = np.array(tok[1:], dtype=float) else: definitions[tok[0]] = tok[1:] - line = strip_line(input_file) + line = __strip_line(input_file) return definitions -def read_version(input_file): - line = strip_line(input_file) +def __read_version(input_file) -> tuple: + """ + Read version from .seq file. + + Parameters + ---------- + input_file : file object + .seq file + + Returns + ------- + tuple + Tuple of major, minor and revision number. + """ + line = __strip_line(input_file) while line != '' and line[0] != '#': tok = line.split(' ') if tok[0] == 'major': major = np.array(tok[1:], dtype=float) elif tok[0] == 'minor': minor = np.array(tok[1:], dtype=float) elif tok[0] == 'revision': revision = np.array(tok[1:], dtype=float) - line = strip_line(input_file) + line = __strip_line(input_file) return major, minor, revision -def read_blocks(input_file): +def __read_blocks(input_file) -> dict: """ - Read Blocks from .seq file. Blocks are single lines under the '[BLOCKS]' header in the .seq file. + Read Pulseq blocks from .seq file. Parameters ---------- input_file : file - .seq file to be read. + .seq file Returns ------- - block_events : dict - Key-value mapping of Block ID and Event ID. + event_table : dict + Dict object containing key value pairs of Pulseq block ID and block definition. """ - line = strip_line(input_file) + line = __strip_line(input_file) event_table = dict() while line != '' and line != '#': block_events = np.fromstring(line, dtype=int, sep=' ') event_table[block_events[0]] = block_events[1:] - line = strip_line(input_file) + line = __strip_line(input_file) return event_table -def read_events(input_file, scale, type=None, event_library=None): +def __read_events(input_file, scale: int, type: str = None, event_library: EventLibrary = None) -> EventLibrary: + """ + Read Pulseq events from .seq file. + + Parameters + ---------- + input_file : file object + .seq file + scale : int + Scaling factor. + type : str + Type of Pulseq event. + event_library : EventLibrary + EventLibrary + + Returns + ------- + definitions : dict + `EventLibrary` object containing Pulseq event definitions. + """ scale = 1 if scale is None else scale event_library = event_library if event_library is not None else EventLibrary() - line = strip_line(input_file) + line = __strip_line(input_file) while line != '' and line != '#': data = np.fromstring(line, dtype=float, sep=' ') id = data[0] data = data[1:] * scale if type is None: event_library.insert(key_id=id, new_data=data) else: event_library.insert(key_id=id, new_data=data, data_type=type) - line = strip_line(input_file) + line = __strip_line(input_file) return event_library -def read_shapes(input_file): +def __read_shapes(input_file) -> EventLibrary: + """ + Read Pulseq shapes from .seq file. + + Parameters + ---------- + input_file : file + .seq file + + Returns + ------- + shape_library : EventLibrary + `EventLibrary` object containing shape definitions. + """ shape_library = EventLibrary() - line = skip_comments(input_file) + line = __skip_comments(input_file) while line != -1 and (line != '' or line[0:8] == 'shape_id'): tok = line.split(' ') id = int(tok[1]) - line = skip_comments(input_file) + line = __skip_comments(input_file) tok = line.split(' ') num_samples = int(tok[1]) data = [] - line = skip_comments(input_file) + line = __skip_comments(input_file) while line != '' and line != '#': data.append(float(line)) - line = strip_line(input_file) - line = skip_comments(input_file) + line = __strip_line(input_file) + line = __skip_comments(input_file) data.insert(0, num_samples) data = np.asarray(data) shape_library.insert(key_id=id, new_data=data) return shape_library -def skip_comments(input_file): +def __skip_comments(input_file) -> str: """ Skip one '#' comment in .seq file. Parameters ---------- input_file : file - .seq file to be read. + .seq file Returns ------- line : str - First line in input_file after skipping one '#' comment block. - Note: File pointer is remembered, so successive calls work as expected. + First line in `input_file` after skipping one '#' comment block. Note: File pointer is remembered, so successive calls work as expected. """ - line = strip_line(input_file) + line = __strip_line(input_file) while line != -1 and (line == '' or line[0] == '#'): - line = strip_line(input_file) + line = __strip_line(input_file) return line -def strip_line(input_file): +def __strip_line(input_file): """ - Remove spaces, newline whitespace and return line. + Removes spaces and newline whitespaces. Parameters ---------- input_file : file - .seq file to be read. + .seq file Returns ------- line : str - First line in input_file after removing spaces and newline whitespaces. - Note: File pointer is remembered, so successive calls work as expected. + First line in input_file after removing spaces and newline whitespaces. Note: File pointer is remembered, + so successive calls work as expected. Returns -1 for eof. """ line = input_file.readline() # If line is an empty string, end of the file has been reached return line.strip() if line != '' else -1 diff --git a/pypulseq/Sequence/sequence.py b/pypulseq/Sequence/sequence.py index a3d9157..eb84948 100755 --- a/pypulseq/Sequence/sequence.py +++ b/pypulseq/Sequence/sequence.py @@ -1,364 +1,364 @@ import math from types import SimpleNamespace import numpy as np from matplotlib import pyplot as plt import pypulseq.Sequence.test_report import pypulseq.check_timing from pypulseq.Sequence import block from pypulseq.Sequence.read_seq import read from pypulseq.Sequence.write_seq import write from pypulseq.calc_duration import calc_duration from pypulseq.calc_rf_center import calc_rf_center from pypulseq.decompress_shape import decompress_shape from pypulseq.event_lib import EventLibrary from pypulseq.opts import Opts from pypulseq.points_to_waveform import points_to_waveform class Sequence: def __init__(self, system=Opts()): self.version_major = 1 self.version_minor = 2 self.version_revision = 1 self.system = system self.definitions = dict() self.grad_library = EventLibrary() self.shape_library = EventLibrary() self.rf_library = EventLibrary() self.adc_library = EventLibrary() self.delay_library = EventLibrary() self.block_events = {} self.rf_raster_time = self.system.rf_raster_time self.grad_raster_time = self.system.grad_raster_time def __str__(self): s = "Sequence:" s += "\nshape_library: " + str(self.shape_library) s += "\nrf_library: " + str(self.rf_library) s += "\ngrad_library: " + str(self.grad_library) s += "\nadc_library: " + str(self.adc_library) s += "\ndelay_library: " + str(self.delay_library) s += "\nrf_raster_time: " + str(self.rf_raster_time) s += "\ngrad_raster_time: " + str(self.grad_raster_time) s += "\nblock_events: " + str(len(self.block_events)) return s def duration(self): """ Get duration of this sequence. Returns ------- duration : float Duration of this sequence in millis. num_blocks : int Number of blocks in this sequence. event_count : int Number of events in this sequence. """ num_blocks = len(self.block_events) event_count = np.zeros(len(self.block_events[1])) duration = 0 for i in range(num_blocks): block = self.get_block(i + 1) event_count += self.block_events[i + 1] > 0 duration += calc_duration(block) return duration, num_blocks, event_count def check_timing(self): """ Check timing of the events in each block based on grad raster time system limit. Returns ------- is_ok : bool Boolean flag indicating timing errors. error_report : str Error report in case of timing errors. """ num_blocks = len(self.block_events) is_ok = True error_report = [] for i in range(num_blocks): block = self.get_block(i + 1) event_names = ['rf', 'gx', 'gy', 'gz', 'adc', 'delay'] ind = [hasattr(block, 'rf'), hasattr(block, 'gx'), hasattr(block, 'gy'), hasattr(block, 'gz'), hasattr(block, 'adc'), hasattr(block, 'delay')] ev = [getattr(block, event_names[i]) for i in range(len(event_names)) if ind[i] == 1] res, rep = pypulseq.check_timing.check_timing(self, *ev) is_ok = is_ok and res if len(rep) != 0: error_report.append(f'Block: {i} - {rep}\n') return is_ok, error_report def test_report(self): pypulseq.Sequence.test_report.test_report(self) def set_definition(self, key, val): self.definitions[key] = val def get_definition(self, key): if key in self.definitions: return self.definitions[key] else: return None def add_block(self, *args): block.add_block(self, len(self.block_events) + 1, *args) def get_block(self, block_index): return block.get_block(self, block_index) def read(self, file_path): read(self, file_path) def write(self, name): write(self, name) def rf_from_lib_data(self, lib_data): """ Construct RF object from `lib_data`. Parameters ---------- lib_data : list RF data. Returns ------- rf : SimpleNamespace RF object constructed from lib_data. """ rf = SimpleNamespace() rf.type = 'rf' amplitude, mag_shape, phase_shape = lib_data[0], lib_data[1], lib_data[2] shape_data = self.shape_library.data[mag_shape] compressed = SimpleNamespace() compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] mag = decompress_shape(compressed) shape_data = self.shape_library.data[phase_shape] compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] phase = decompress_shape(compressed) rf.signal = 1j * 2 * np.pi * phase rf.signal = amplitude * mag * np.exp(rf.signal) rf.t = np.arange(1, max(mag.shape) + 1) * self.rf_raster_time rf.delay = lib_data[3] rf.freq_offset = lib_data[4] rf.phase_offset = lib_data[5] if max(lib_data.shape) < 6: lib_data = np.append(lib_data, 0) - rf.dead_time = lib_data[6] + rf.dead_time = lib_data[5] if max(lib_data.shape) < 7: lib_data = np.append(lib_data, 0) - rf.ringdown_time = lib_data[7] + rf.ringdown_time = lib_data[6] if max(lib_data.shape) < 8: lib_data = np.append(lib_data, 0) use_cases = {1: 'excitation', 2: 'refocusing', 3: 'inversion'} - if lib_data[8] in use_cases: - rf.use = use_cases[lib_data[8]] + if lib_data[7] in use_cases: + rf.use = use_cases[lib_data[7]] return rf def calculate_kspace(self, trajectory_delay=0): c_excitation = 0 c_refocusing = 0 c_adc_samples = 0 for i in range(len(self.block_events)): block = self.get_block(i + 1) if hasattr(block, 'rf'): if not hasattr(block.rf, 'use') or block.rf.use != 'refocusing': c_excitation += 1 else: c_refocusing += 1 if hasattr(block, 'adc'): c_adc_samples += int(block.adc.num_samples) t_excitation = np.zeros(c_excitation) t_refocusing = np.zeros(c_refocusing) k_time = np.zeros(c_adc_samples) current_dur = 0 c_excitation = 0 c_refocusing = 0 k_counter = 0 traj_recon_delay = trajectory_delay for i in range(len(self.block_events)): block = self.get_block(i + 1) if hasattr(block, 'rf'): rf = block.rf rf_center, _ = calc_rf_center(rf) t = rf.delay + rf_center if not hasattr(block.rf, 'use') or block.rf.use != 'refocusing': t_excitation[c_excitation] = current_dur + t c_excitation += 1 else: t_refocusing[c_refocusing] = current_dur + t c_refocusing += 1 if hasattr(block, 'adc'): k_time[k_counter:k_counter + block.adc.num_samples] = np.arange( block.adc.num_samples) * block.adc.dwell + block.adc.delay + current_dur + traj_recon_delay k_counter += block.adc.num_samples current_dur += calc_duration(block) gw = self.gradient_waveforms() i_excitation = np.round(t_excitation / self.grad_raster_time) i_refocusing = np.round(t_refocusing / self.grad_raster_time) k_traj = np.zeros(gw.shape) k = [0, 0, 0] for i in range(gw.shape[1]): k += gw[:, i] * self.grad_raster_time k_traj[:, i] = k if len(np.where(i_excitation == i + 1)[0]) >= 1: k = 0 k_traj[:, i] = np.nan if len(np.where(i_refocusing == i + 1)[0]) >= 1: k = -k k_traj_adc = [] for i in range(k_traj.shape[0]): k_traj_adc.append(np.interp(k_time, np.arange(1, k_traj.shape[1] + 1) * self.grad_raster_time, k_traj[i])) k_traj_adc = np.asarray(k_traj_adc) t_adc = k_time return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc def gradient_waveforms(self): duration, num_blocks, _ = self.duration() wave_length = math.ceil(duration / self.grad_raster_time) grad_channels = 3 grad_waveforms = np.zeros((grad_channels, wave_length)) grad_channels = ['gx', 'gy', 'gz'] t0 = 0 t0_n = 0 for i in range(num_blocks): block = self.get_block(i + 1) for j in range(len(grad_channels)): if hasattr(block, grad_channels[j]): grad = getattr(block, grad_channels[j]) if grad.type == 'grad': nt_start = round((grad.delay + grad.t[0]) / self.grad_raster_time) waveform = grad.waveform else: nt_start = round(grad.delay / self.grad_raster_time) if abs(grad.flat_time) > np.finfo(float).eps: t = np.cumsum([0, grad.rise_time, grad.flat_time, grad.fall_time]) trap_form = np.multiply([0, 1, 1, 0], grad.amplitude) else: t = np.cumsum([0, grad.rise_time, grad.fall_time]) trap_form = np.multiply([0, 1, 0], grad.amplitude) tn = math.floor(t[-1] / self.grad_raster_time) t = np.append(t, t[-1] + self.grad_raster_time) trap_form = np.append(trap_form, 0) if abs(grad.amplitude) > np.finfo(float).eps: waveform = points_to_waveform(t, trap_form, self.grad_raster_time) else: waveform = np.zeros(tn + 1) if waveform.size != np.sum(np.isfinite(waveform)): raise Warning('Not all elements of the generated waveform are finite') """ Matlab dynamically resizes arrays during slice assignment operation if assignment is out of bounds Numpy does not Following lines are a workaround """ l1, l2 = int(t0_n + nt_start), int(t0_n + nt_start + max(waveform.shape)) if l2 > grad_waveforms.shape[1]: grad_waveforms.resize((len(grad_channels), l2)) grad_waveforms[j, l1:l2] = waveform t0 += calc_duration(block) t0_n = round(t0 / self.grad_raster_time) return grad_waveforms def plot(self, type='Gradient', time_range=(0, np.inf), time_disp='s'): """ Show Matplotlib plot of all Events in the Sequence object. Parameters ---------- time_range : List Time range (x-axis limits) for plot to be shown. Default is 0 to infinity (entire plot shown). """ valid_plot_types = ['Gradient', 'Kspace'] valid_time_units = ['s', 'ms', 'us'] if type not in valid_plot_types: raise Exception() if time_disp not in valid_time_units: raise Exception() fig1, fig2 = plt.figure(1), plt.figure(2) f11, f12, f13 = fig1.add_subplot(311), fig1.add_subplot(312), fig1.add_subplot(313) f2 = [fig2.add_subplot(311), fig2.add_subplot(312), fig2.add_subplot(313)] t_factor_list = [1, 1e3, 1e6] t_factor = t_factor_list[valid_time_units.index(time_disp)] t0 = 0 for iB in range(1, len(self.block_events) + 1): block = self.get_block(iB) is_valid = time_range[0] <= t0 <= time_range[1] if is_valid: if hasattr(block, 'adc'): adc = block.adc t = adc.delay + [(x * adc.dwell) for x in range(0, int(adc.num_samples))] f11.plot((t0 + t), np.zeros(len(t)), 'rx') if hasattr(block, 'rf'): rf = block.rf tc, ic = calc_rf_center(rf) t = rf.t + rf.delay tc = tc + rf.delay f12.plot(t_factor * (t0 + t), abs(rf.signal)) f13.plot(t_factor * (t0 + t), np.angle( rf.signal * np.exp(1j * rf.phase_offset) * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)), t_factor * (t0 + tc), np.angle(rf.signal[ic] * np.exp(1j * rf.phase_offset) * np.exp( 1j * 2 * math.pi * rf.t[ic] * rf.freq_offset)), 'xb') grad_channels = ['gx', 'gy', 'gz'] for x in range(0, len(grad_channels)): if hasattr(block, grad_channels[x]): grad = getattr(block, grad_channels[x]) if grad.type == 'grad': # In place unpacking of grad.t with the starred expression t = grad.delay + [0, *(grad.t + (grad.t[1] - grad.t[0]) / 2), grad.t[-1] + grad.t[1] - grad.t[0]] waveform = np.array([grad.first, grad.last]) waveform = 1e-3 * np.insert(waveform, 1, grad.waveform) else: t = np.cumsum([0, grad.delay, grad.rise_time, grad.flat_time, grad.fall_time]) waveform = 1e-3 * grad.amplitude * np.array([0, 0, 1, 1, 0]) f2[x].plot(t_factor * (t0 + t), waveform) t0 += calc_duration(block) grad_plot_labels = ['x', 'y', 'z'] f11.set_ylabel('ADC') f12.set_ylabel('RF mag (Hz)') f13.set_ylabel('RF phase (rad)') [f2[x].set_ylabel(f'G{grad_plot_labels[x]} (kHz/m)') for x in range(3)] # Setting display limits disp_range = t_factor * np.array([time_range[0], min(t0, time_range[1])]) f11.set_xlim(disp_range) f12.set_xlim(disp_range) f13.set_xlim(disp_range) [x.set_xlim(disp_range) for x in f2] plt.show() diff --git a/pypulseq/Sequence/write_seq.py b/pypulseq/Sequence/write_seq.py index 26a9651..48aa8be 100755 --- a/pypulseq/Sequence/write_seq.py +++ b/pypulseq/Sequence/write_seq.py @@ -1,135 +1,135 @@ import numpy as np def write(self, file_name): """ Writes a .seq file from the Sequence object calling the method. >.0f is used when only decimals have to be displayed. >g is used when insignificant zeros have to be truncated. Parameters ---------- file_name : str File name of .seq file to be written. """ file_name += '.seq' if file_name[-4:] != '.seq' not in file_name else '' output_file = open(file_name, 'w') output_file.write('# Pulseq sequence file\n') output_file.write('# Created by PyPulseq\n\n') output_file.write('[VERSION]\n') output_file.write(f'major {self.version_major}\n') output_file.write(f'minor {self.version_minor}\n') output_file.write(f'revision {self.version_revision}\n') output_file.write('\n') if len(self.definitions) != 0: output_file.write('[DEFINITIONS]\n') keys = list(self.definitions.keys()) values = list(self.definitions.values()) for i in range(len(keys)): output_file.write(f'{keys[i]} ') if isinstance(values[i], str): output_file.write(f'{values[i]} ') else: for t in range(len(values[i])): output_file.write(f'{values[i][t]:.6g} ') output_file.write('\n') output_file.write('\n') output_file.write('# Format of blocks:\n') output_file.write('# # D RF GX GY GZ ADC\n') output_file.write('[BLOCKS]\n') id_format_width = '{:' + str(len(str(len(self.block_events)))) + 'd} ' id_format_str = id_format_width + '{:2d} {:2d} {:3d} {:3d} {:3d} {:2d}\n' for i in range(len(self.block_events)): s = id_format_str.format(*np.insert(self.block_events[i + 1], 0, (i + 1))) output_file.write(s) output_file.write('\n') if len(self.rf_library.keys) != 0: output_file.write('# Format of RF events:\n') output_file.write('# id amplitude mag_id phase_id delay freq phase\n') output_file.write('# .. Hz .... .... us Hz rad\n') output_file.write('[RF]\n') rf_lib_keys = self.rf_library.keys # id_format_str = '{:>1.0f} {:>12g} {:>1.0f} {:>1.0f} {:>g} {:>g}\n' - id_format_str = '{:d} {:12g} {:.0f} {:.0f} {:g} {:g} {:g}\n' + id_format_str = '{:.0f} {:12g} {:.0f} {:.0f} {:g} {:g} {:g}\n' for k in rf_lib_keys.keys(): lib_data1 = self.rf_library.data[k][0:3] lib_data2 = self.rf_library.data[k][4:7] delay = round(self.rf_library.data[k][3] * 1e6) s = id_format_str.format(k, *lib_data1, delay, *lib_data2) output_file.write(s) output_file.write('\n') grad_lib_values = np.array(list(self.grad_library.type.values())) arb_grad_mask = np.where(grad_lib_values == 'g')[0] trap_grad_mask = np.where(grad_lib_values == 't')[0] if len(arb_grad_mask) != 0: output_file.write('# Format of arbitrary gradients:\n') output_file.write('# id amplitude shape_id delay\n') output_file.write('# .. Hz/m .... us\n') output_file.write('[GRADIENTS]\n') id_format_str = '{:.0f} {:12g} {:.0f} {:.0f}\n' keys = np.array(list(self.grad_library.keys.keys())) for k in keys[arb_grad_mask]: s = id_format_str.format(k, *self.grad_library.data[k][:2], round(self.grad_library.data[k][2] * 1e6)) output_file.write(s) output_file.write('\n') if len(trap_grad_mask) != 0: output_file.write('# Format of trapezoid gradients:\n') output_file.write('# id amplitude rise flat fall delay\n') output_file.write('# .. Hz/m us us us us\n') output_file.write('[TRAP]\n') keys = np.array(list(self.grad_library.keys.keys())) id_format_str = '{:2g} {:12g} {:3g} {:4g} {:3g} {:3g}\n' for k in keys[trap_grad_mask]: data = self.grad_library.data[k] data[1:] = np.round(1e6 * data[1:]) s = id_format_str.format(k, *data) output_file.write(s) output_file.write('\n') if len(self.adc_library.keys) != 0: output_file.write('# Format of ADC events:\n') output_file.write('# id num dwell delay freq phase\n') output_file.write('# .. .. ns us Hz rad\n') output_file.write('[ADC]\n') keys = self.adc_library.keys id_format_str = '{:.0f} {:.0f} {:.0f} {:.0f} {:.0f} {:.6g}\n' for k in keys.values(): data = np.multiply(self.adc_library.data[k][0:5], [1, 1e9, 1e6, 1, 1]) s = id_format_str.format(k, *data) output_file.write(s) output_file.write('\n') if len(self.delay_library.keys) != 0: output_file.write('# Format of delays:\n') output_file.write('# id delay (us)\n') output_file.write('[DELAYS]\n') keys = self.delay_library.keys id_format_str = '{:.0f} {:.0f}\n' for k in keys.values(): s = id_format_str.format(k, *np.round(1e6 * self.delay_library.data[k])) output_file.write(s) output_file.write('\n') if len(self.shape_library.keys) != 0: output_file.write('# Sequence Shapes\n') output_file.write('[SHAPES]\n\n') keys = self.shape_library.keys for k in keys.values(): shape_data = self.shape_library.data[k] s = 'shape_id {:.0f}\n' s = s.format(k) output_file.write(s) s = 'num_samples {:.0f}\n' s = s.format(shape_data[0]) output_file.write(s) s = '{:.9g}\n' * len(shape_data[1:]) s = s.format(*shape_data[1:]) output_file.write(s) output_file.write('\n') diff --git a/pypulseq/make_block_pulse.py b/pypulseq/make_block_pulse.py index b0ce40b..da47daa 100644 --- a/pypulseq/make_block_pulse.py +++ b/pypulseq/make_block_pulse.py @@ -1,92 +1,91 @@ import math from types import SimpleNamespace import numpy as np from pypulseq.make_trap_pulse import make_trapezoid from pypulseq.opts import Opts def make_block_pulse(flip_angle, system=Opts(), duration=0, freq_offset=0, phase_offset=0, time_bw_product=0, bandwidth=0, max_grad=0, max_slew=0, slice_thickness=0, delay=0, use=None): """ Makes a Holder object for an RF pulse Event. Parameters ---------- kwargs : dict Key value mappings of RF Event parameters_params and values. nargout: int Number of output arguments to be returned. Default is 1, only RF Event is returned. Passing any number greater than 1 will return the Gz Event along with the RF Event. Returns ------- Tuple consisting of: rf : Holder RF Event configured based on supplied kwargs. gz : Holder Slice select trapezoidal gradient Event. """ valid_use_pulses = ['excitation', 'refocusing', 'inversion'] if use is not None and use not in valid_use_pulses: raise Exception() if duration == 0: if time_bw_product > 0: duration = time_bw_product / bandwidth elif bandwidth > 0: duration = 1 / (4 * bandwidth) else: raise ValueError('Either bandwidth or duration must be defined') BW = 1 / (4 * duration) N = round(duration / 1e-6) t = np.arange(N) * system.rf_raster_time signal = flip_angle / (2 * np.pi) / duration * np.ones(len(t)) rf = SimpleNamespace() rf.type = 'rf' rf.signal = signal rf.t = t rf.freq_offset = freq_offset rf.phase_offset = phase_offset rf.dead_time = system.rf_dead_time rf.ringdown_time = system.rf_ringdown_time rf.delay = delay if use is not None: rf.use = use if rf.dead_time > rf.delay: rf.delay = rf.dead_time try: if slice_thickness < 0: raise ValueError('Slice thickness must be provided') if max_grad > 0: system.max_grad = max_grad if max_slew > 0: system.max_slew = max_slew amplitude = BW / slice_thickness area = amplitude * duration - kwargs_for_trap = {'channel': 'z', 'system': system, 'flat_time': duration, 'flat_area': area} - gz = make_trapezoid(kwargs_for_trap) + gz = make_trapezoid(channel='z', system=system, flat_time=duration, flat_area=area) if rf.delay > gz.rise_time: gz.delay = math.ceil((rf.delay - gz.rise_time) / system.grad_raster_time) * system.grad_raster_time if rf.delay < (gz.rise_time + gz.delay): rf.delay = gz.rise_time + gz.delay except: gz = None if rf.ringdown_time > 0: t_fill = np.arange(1, round(rf.ringdown_time / 1e-6) + 1) * 1e-6 rf.t = np.concatenate((rf.t, (rf.t[-1] + t_fill))) rf.signal = np.concatenate((rf.signal, np.zeros(len(t_fill)))) return rf, gz