diff --git a/python/tamaas/dumpers/__init__.py b/python/tamaas/dumpers/__init__.py index b76c2a3..33381fb 100644 --- a/python/tamaas/dumpers/__init__.py +++ b/python/tamaas/dumpers/__init__.py @@ -1,581 +1,579 @@ # -*- mode:python; coding: utf-8 -*- # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Dumpers for the class :py:class:`Model `.""" from pathlib import Path -from os import PathLike import glob import json import io import typing as ts from collections.abc import Collection import numpy as np -from .. import ( +from .. import __version__, type_traits +from .._tamaas import ( ModelDumper, model_type, mpi, - type_traits, ModelFactory, Model, - __version__, ) from ._helper import ( step_dump, directory_dump, local_slice, _is_surface_field, _basic_types, file_handler, + FileType, + NameType, ) __all__ = [ "JSONDumper", "FieldDumper", "NumpyDumper", ] -FileType = ts.Union[str, PathLike, io.TextIOBase] -NameType = ts.Union[str, PathLike] - _reverse_trait_map = { 'model_type.' + t.__name__: mtype for mtype, t in type_traits.items() } def _get_attributes(model: Model): """Get model attributes.""" return { 'model_type': str(model.type), 'system_size': model.system_size, 'discretization': model.global_shape, 'boundary_fields': model.boundary_fields, 'program': f"Tamaas {__version__}, DOI:10.21105/joss.02121", } def _create_model(attrs: ts.MutableMapping): """Create a model from attribute dictionary.""" mtype = _reverse_trait_map[attrs['model_type']] # netcdf4 converts 1-lists attributes to numbers for attr in ['system_size', 'discretization']: if not isinstance(attrs[attr], Collection): attrs[attr] = [attrs[attr]] return ModelFactory.createModel(mtype, attrs['system_size'], attrs['discretization']) class MPIIncompatibilityError(RuntimeError): """Raised when code is not meant to be executed in MPI environment.""" class ModelError(ValueError): """Raised when unexpected model is passed to a dumper with a state.""" class ComponentsError(ValueError): """Raised when an unexpected number of components is encountred.""" class _ModelJSONEncoder(json.JSONEncoder): """Encode a model to JSON.""" def default(self, obj): """Encode model.""" if isinstance(obj, Model): model = obj attrs = _get_attributes(model) model_dict = { 'attrs': attrs, 'fields': {}, 'operators': [], } for field in model: model_dict['fields'][field] = model[field].tolist() for op in model.operators: model_dict['operators'].append(op) return model_dict return json.JSONEncoder.default(self, obj) class JSONDumper(ModelDumper): """Dumper to JSON.""" def __init__(self, file_descriptor: FileType): """Construct with file handle.""" super(JSONDumper, self).__init__() self.fd = file_descriptor @file_handler('w') - def _dump_to_file(self, fd: FileType, model: Model): + def _dump_to_file(self, fd: ts.IO[str], model: Model): json.dump(model, fd, cls=_ModelJSONEncoder) def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.fd, model) @classmethod @file_handler('r') - def read(cls, fd: FileType): + def read(cls, fd: ts.IO[str]): """Read model from file.""" properties = json.load(fd) model = _create_model(properties['attrs']) for name, field in properties['fields'].items(): v = np.asarray(field) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[name] = v return model class FieldDumper(ModelDumper): """Abstract dumper for python classes using fields.""" postfix = "" extension = "" name_format = "{basename}{postfix}.{extension}" def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(FieldDumper, self).__init__() self.basename = basename self.fields: ts.List[str] = list(fields) self.all_fields: bool = kwargs.get('all_fields', False) def add_field(self, field: str): """Add another field to the dump.""" if field not in self.fields: self.fields.append(field) def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump to a file (path-like or file handle).""" raise NotImplementedError() def get_fields(self, model: Model): """Get the desired fields.""" if not self.all_fields: requested_fields = self.fields else: requested_fields = list(model) return {field: model[field] for field in requested_fields} def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.file_path, model) @classmethod def read(cls, file_descriptor: FileType): """Read model from file.""" raise NotImplementedError( f'read() method not implemented in {cls.__name__}') @classmethod def read_sequence(cls, glob_pattern): """Read models from a file sequence.""" return map(cls.read, glob.iglob(glob_pattern)) @property def file_path(self): """Get the default filename.""" return self.name_format.format(basename=self.basename, postfix=self.postfix, extension=self.extension) @directory_dump('numpys') @step_dump class NumpyDumper(FieldDumper): """Dumper to compressed numpy files.""" extension = 'npz' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to compressed multi-field Numpy format.""" if mpi.size() > 1: raise MPIIncompatibilityError("NumpyDumper does not function " "at all in parallel") np.savez_compressed(file_descriptor, attrs=json.dumps(_get_attributes(model)), **self.get_fields(model)) @classmethod def read(cls, file_descriptor: FileType): """Create model from Numpy file.""" data = np.load(file_descriptor, mmap_mode='r') model = _create_model(json.loads(str(data['attrs']))) for k, v in filter(lambda k: k[0] != 'attrs', data.items()): if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[k] = v return model try: import h5py __all__.append("H5Dumper") @directory_dump('hdf5') @step_dump class H5Dumper(FieldDumper): """Dumper to HDF5 file format.""" extension = 'h5' @staticmethod def _hdf5_args(): if mpi.size() > 1: from mpi4py import MPI # noqa mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD) comp_args = {} # compression does not work in parallel else: mpi_args = {} comp_args = dict(compression='gzip', compression_opts=7) return mpi_args, comp_args def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to HDF5 with metadata about the model.""" # Setup for MPI if not h5py.get_config().mpi and mpi.size() > 1: raise MPIIncompatibilityError("HDF5 does not have MPI support") mpi_args, comp_args = self._hdf5_args() with h5py.File(file_descriptor, 'w', **mpi_args) as handle: # Writing data for name, field in self.get_fields(model).items(): shape = list(field.shape) if mpi.size() > 1: xdim = 0 if _is_surface_field(field, model) else 1 shape[xdim] = mpi_args['comm'].allreduce(shape[xdim]) dset = handle.create_dataset(name, shape, field.dtype, **comp_args) s = local_slice(field, model, name in model.boundary_fields) dset[s] = field # Writing metadata for name, attr in _get_attributes(model).items(): handle.attrs[name] = attr @classmethod def read(cls, file_descriptor: FileType): """Create model from HDF5 file.""" mpi_args, _ = cls._hdf5_args() with h5py.File(file_descriptor, 'r', **mpi_args) as handle: model = _create_model(handle.attrs) for k, v in handle.items(): v = np.asanyarray(v) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) surface_field = \ k in handle.attrs.get('boundary_fields', {}) \ or _is_surface_field(v, model) s = local_slice(v, model, surface_field) if (surface_field and v.ndim == len(model.boundary_shape)) \ or (not surface_field and v.ndim == len(model.shape)): s = s + (np.newaxis, ) model[k] = v[s].copy() return model except ImportError: pass try: import uvw # noqa __all__ += [ "UVWDumper", "UVWGroupDumper", ] @directory_dump('paraview') @step_dump class UVWDumper(FieldDumper): """Dumper to VTK files for elasto-plastic calculations.""" extension = 'vtr' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump displacements, plastic deformations and stresses.""" if mpi.size() > 1: raise MPIIncompatibilityError("UVWDumper does not function " "properly in parallel") bdim = len(model.boundary_shape) # Local MPI size lsize = model.shape gsize = mpi.global_shape(model.boundary_shape) gshape = gsize if len(lsize) > bdim: gshape = [model.shape[0]] + gshape # Space coordinates coordinates = [ np.linspace(0, L, N, endpoint=False) for L, N in zip(model.system_size, gshape) ] # If model has subsurfce domain, z-coordinate is always first dimension_indices = np.arange(bdim) if len(lsize) > bdim: dimension_indices += 1 dimension_indices = np.concatenate((dimension_indices, [0])) coordinates[0] = \ np.linspace(0, model.system_size[0], gshape[0]) offset = np.zeros_like(dimension_indices) offset[0] = mpi.local_offset(gsize) rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \ else uvw.parallel.PRectilinearGrid # Creating rectilinear grid with correct order for components coordlist = [ coordinates[i][o:o + lsize[i]] for i, o in zip(dimension_indices, offset) ] grid = rectgrid( file_descriptor, coordlist, compression=True, offsets=offset, ) fields = self.get_fields(model).items() # Iterator over fields we want to dump on system geometry if model.type in {model_type.volume_1d, model_type.volume_2d}: fields_it = filter(lambda t: not t[0] in model.boundary_fields, fields) else: fields_it = iter(fields) for name, field in fields_it: array = uvw.DataArray(field, dimension_indices, name) grid.addPointData(array) grid.write() @directory_dump('paraview') class UVWGroupDumper(FieldDumper): """Dumper to ParaViewData files.""" extension = 'pvd' def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs) subdir = Path('paraview') / f'{basename}-VTR' subdir.mkdir(parents=True, exist_ok=True) self.uvw_dumper = UVWDumper( Path(f'{basename}-VTR') / basename, *fields, **kwargs) self.group = uvw.ParaViewData(self.file_path, compression=True) def _dump_to_file(self, file_descriptor, model): self.group.addFile( self.uvw_dumper.file_path.replace('paraview/', ''), timestep=self.uvw_dumper.count, ) self.group.write() self.uvw_dumper.dump(model) except ImportError: pass try: from netCDF4 import Dataset __all__.append("cls") @directory_dump('netcdf') class NetCDFDumper(FieldDumper): """Dumper to netCDF4 files.""" extension = "nc" time_dim = 'frame' format = 'NETCDF4' def _file_setup(self, grp, model: Model): grp.createDimension(self.time_dim, None) # Attibutes for k, v in _get_attributes(model).items(): grp.setncattr(k, v) # Local dimensions voigt_dim = type_traits[model.type].voigt components = type_traits[model.type].components self._vec = grp.createDimension('spatial', components) self._tens = grp.createDimension('Voigt', voigt_dim) self.model_info = model.global_shape, model.type global_boundary_shape = mpi.global_shape(model.boundary_shape) # Create boundary dimensions for label, size, length in zip("xy", global_boundary_shape, model.boundary_system_size): grp.createDimension(label, size) coord = grp.createVariable(label, 'f8', (label, )) coord[:] = np.linspace(0, length, size, endpoint=False) self._create_variables(grp, model, lambda f: _is_surface_field(f[1], model), global_boundary_shape, "xy") # Create volume dimension if model.type in {model_type.volume_1d, model_type.volume_2d}: size = model.shape[0] grp.createDimension("z", size) coord = grp.createVariable("z", 'f8', ("z", )) coord[:] = np.linspace(0, model.system_size[0], size) self._create_variables( grp, model, lambda f: not _is_surface_field(f[1], model), model.global_shape, "zxy") self.has_setup = True def _set_collective(self, rootgrp): if mpi.size() == 1: return for v in rootgrp.variables.values(): if self.time_dim in v.dimensions: v.set_collective(True) - def _dump_to_file(self, file_descriptor: NameType, model: Model): + def _dump_to_file(self, file_descriptor: FileType, model: Model): - mode = 'a' if Path(file_descriptor).is_file() \ - and getattr(self, 'has_setup', False) else 'w' + if not isinstance(file_descriptor, io.IOBase): + mode = 'a' if Path(file_descriptor).is_file() \ + and getattr(self, 'has_setup', False) else 'w' try: with Dataset(file_descriptor, mode, format=self.format, parallel=mpi.size() > 1) as rootgrp: if rootgrp.dimensions == {}: self._file_setup(rootgrp, model) self._set_collective(rootgrp) if self.model_info != (model.global_shape, model.type): raise ModelError(f"Unexpected model {mode}") self._dump_generic(rootgrp, model) except ValueError: raise MPIIncompatibilityError("NetCDF4 has no MPI support") def _create_variables(self, grp, model, predicate, shape, dimensions): field_dim = len(shape) fields = list(filter(predicate, self.get_fields(model).items())) dim_labels = list(dimensions[:field_dim]) for label, data in fields: local_dim = [] # If we have an extra component if data.ndim > field_dim: if data.shape[-1] == self._tens.size: local_dim = [self._tens.name] elif data.shape[-1] == self._vec.size: local_dim = [self._vec.name] else: raise ComponentsError( f"{label} has unexpected number of components " f"({data.shape[-1]})") # Downcasting in case of 128 bit float dtype = data.dtype if data.dtype.str[1:] != 'f16' else 'f8' grp.createVariable(label, dtype, [self.time_dim] + dim_labels + local_dim, zlib=mpi.size() == 0) def _dump_generic(self, grp, model): fields = self.get_fields(model).items() new_frame = len(grp.dimensions[self.time_dim]) for label, data in fields: var = grp[label] slice_in_global = (new_frame, ) + local_slice(data, model) var[slice_in_global] = np.array(data, dtype=var.dtype) @classmethod def _open_read(cls, fd): return Dataset(fd, 'r', format=cls.format, parallel=mpi.size() > 1) @staticmethod def _create_model(rootgrp): attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()} return _create_model(attrs) @staticmethod def _set_model_fields(rootgrp, model, frame): dims = rootgrp.dimensions.keys() for k, v in filter(lambda k: k[0] not in dims, rootgrp.variables.items()): v = v[frame, :] if model.type in _basic_types: v = np.asarray(v).reshape(list(v.shape) + [1]) model[k] = v[local_slice(v, model)].copy() @classmethod - def read(cls, file_descriptor: NameType): + def read(cls, file_descriptor: FileType): """Create model with last frame.""" with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) cls._set_model_fields(rootgrp, model, -1) return model @classmethod - def read_sequence(cls, file_descriptor: NameType): + def read_sequence(cls, file_descriptor: FileType): with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) for frame in range(len(rootgrp.dimensions[cls.time_dim])): cls._set_model_fields(rootgrp, model, frame) yield model except ImportError: pass diff --git a/python/tamaas/dumpers/_helper.py b/python/tamaas/dumpers/_helper.py index c36e499..5c393a3 100644 --- a/python/tamaas/dumpers/_helper.py +++ b/python/tamaas/dumpers/_helper.py @@ -1,174 +1,178 @@ # -*- mode:python; coding: utf-8 -*- # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Helper functions for dumpers """ from os import PathLike from functools import wraps from pathlib import Path import io +import typing as ts import numpy as np -from .. import model_type, type_traits, mpi +from .. import model_type, type_traits +from .._tamaas import mpi __all__ = ["step_dump", "directory_dump", "local_slice"] _basic_types = [t for t, trait in type_traits.items() if trait.components == 1] +FileType = ts.Union[str, PathLike, io.TextIOBase] +NameType = ts.Union[str, PathLike] def _is_surface_field(field, model): def _to_global(shape): if len(shape) == len(model.boundary_shape) + 1: return mpi.global_shape(model.boundary_shape) + [shape[-1]] return mpi.global_shape(shape) def _compare_shape(a, b): return a == b b_shapes = [list(model[name].shape) for name in model.boundary_fields] shape = list(field.shape) return any(_compare_shape(shape, s) for s in b_shapes) \ or any(_compare_shape(shape, _to_global(s)) for s in b_shapes) def local_slice(field, model, surface_field=None): n = model.shape bn = model.boundary_shape gshape = mpi.global_shape(bn) offsets = np.zeros_like(gshape) offsets[0] = mpi.local_offset(gshape) if not surface_field: surface_field = _is_surface_field(field, model) if not surface_field and len(n) > len(bn): gshape = [n[0]] + gshape offsets = np.concatenate(([0], offsets)) shape = bn if surface_field else n if len(field.shape) > len(shape): shape += field.shape[len(shape):] def sgen(offset, size): return slice(offset, offset + size, None) def sgen_basic(offset, size): return slice(offset, offset + size) slice_gen = sgen_basic if model_type in _basic_types else sgen return tuple(map(slice_gen, offsets, shape)) def step_dump(cls): """ Decorator for dumper with counter for steps """ orig_init = cls.__init__ orig_dump = cls.dump @wraps(cls.__init__) def __init__(obj, *args, **kwargs): orig_init(obj, *args, **kwargs) obj.count = 0 def postfix(obj): return "_{:04d}".format(obj.count) @wraps(cls.dump) def dump(obj, *args, **kwargs): orig_dump(obj, *args, **kwargs) obj.count += 1 cls.__init__ = __init__ cls.dump = dump cls.postfix = property(postfix) return cls def directory_dump(directory=""): "Decorator for dumper in a directory" directory = Path(directory) def actual_decorator(cls): orig_dump = cls.dump orig_filepath = cls.file_path.fget orig_init = cls.__init__ @wraps(cls.__init__) def init(obj, *args, **kwargs): orig_init(obj, *args, **kwargs) obj.mkdir = kwargs.get('mkdir', True) @wraps(cls.dump) def dump(obj, *args, **kwargs): if mpi.rank() == 0 and getattr(obj, 'mkdir'): directory.mkdir(parents=True, exist_ok=True) orig_dump(obj, *args, **kwargs) @wraps(cls.file_path.fget) def file_path(obj): if getattr(obj, 'mkdir'): return str(directory / orig_filepath(obj)) return orig_filepath(obj) cls.__init__ = init cls.dump = dump cls.file_path = property(file_path) return cls return actual_decorator def hdf5toVTK(inpath: PathLike, outname: str): """Convert HDF5 dump of a model to VTK.""" from . import UVWDumper, H5Dumper # noqa UVWDumper(outname, all_fields=True) << H5Dumper.read(inpath) def netCDFtoParaview(inpath: PathLike, outname: str): """Convert NetCDF dump of model sequence to Paraview.""" from . import UVWGroupDumper, NetCDFDumper # noqa dumper = UVWGroupDumper(outname, all_fields=True) for model in NetCDFDumper.read_sequence(inpath): dumper << model -def file_handler(mode): +def file_handler(mode: str): """Decorate a function to accept path-like or file handles.""" def _handler(func): @wraps(func) - def _wrapped(self, fd, *args, **kwargs): + def _wrapped(self, fd: FileType, *args, **kwargs): if isinstance(fd, (str, PathLike)): - with open(fd, mode) as fd: - return _wrapped(self, fd, *args, **kwargs) + with open(fd, mode) as fh: + return _wrapped(self, fh, *args, **kwargs) elif isinstance(fd, io.TextIOBase): return func(self, fd, *args, **kwargs) raise TypeError( f"Expected a path-like or file handle, got {type(fd)}") return _wrapped return _handler diff --git a/python/tamaas/nonlinear_solvers/__init__.py b/python/tamaas/nonlinear_solvers/__init__.py index 1e28a69..eaf949e 100644 --- a/python/tamaas/nonlinear_solvers/__init__.py +++ b/python/tamaas/nonlinear_solvers/__init__.py @@ -1,171 +1,176 @@ # -*- mode:python; coding: utf-8 -*- # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Nonlinear solvers for plasticity problems. Solvers in this module use :py:mod:`scipy.optimize` to solve the implicit non-linear equation for plastic deformations with fixed contact pressures. """ from functools import wraps from scipy.optimize import newton_krylov, root from scipy.optimize.nonlin import NoConvergence -from .. import EPSolver, Logger, LogLevel, mpi -from .._tamaas import _tolerance_manager -from .._tamaas import _DFSANESolver as DFSANECXXSolver +from .._tamaas import ( + EPSolver, + Logger, + LogLevel, + mpi, + _tolerance_manager, + _DFSANESolver as DFSANECXXSolver, +) __all__ = ['NLNoConvergence', 'DFSANESolver', 'DFSANECXXSolver', 'NewtonKrylovSolver', 'ToleranceManager'] class NLNoConvergence(Exception): """Convergence not reached exception.""" class ScipySolver(EPSolver): """Base class for solvers wrapping SciPy routines.""" def __init__(self, residual, model=None, callback=None): """Construct nonlinear solver with residual. :param residual: plasticity residual object :param model: Deprecated :param callback: passed on to the Scipy solver """ super(ScipySolver, self).__init__(residual) if mpi.size() > 1: raise RuntimeError("Scipy solvers cannot be used with MPI; " "DFSANECXXSolver can be used instead") self.callback = callback self._x = self.getStrainIncrement() self._residual = self.getResidual() self.options = {'ftol': 0, 'fatol': 1e-9} def solve(self): """Solve R(δε) = 0 using a Scipy function.""" # For initial guess, compute the strain due to boundary tractions # self._residual.computeResidual(self._x) # self._x[...] = self._residual.getVector() EPSolver.beforeSolve(self) # Scipy root callback def compute_residual(vec): self._residual.computeResidual(vec) return self._residual.getVector().copy() # Solve Logger().get(LogLevel.debug) << \ "Entering non-linear solve\n" self._x[...] = self._scipy_solve(compute_residual) Logger().get(LogLevel.debug) << \ "Non-linear solve returned" # Computing displacements self._residual.computeResidualDisplacement(self._x) def reset(self): """Set solution vector to zero.""" self._x[...] = 0 def _scipy_solve(self, compute_residual): """Actually call the scipy solver. :param compute_residual: function returning residual for given variable """ raise NotImplementedError() class NewtonKrylovSolver(ScipySolver): """Solve using a finite-difference Newton-Krylov method.""" def _scipy_solve(self, compute_residual): try: return newton_krylov(compute_residual, self._x, f_tol=self.tolerance, verbose=True, callback=self.callback) except NoConvergence: raise NLNoConvergence("Newton-Krylov did not converge") class DFSANESolver(ScipySolver): """Solve using a spectral residual jacobianless method. See :doi:`10.1090/S0025-5718-06-01840-0` for details on method and the relevant Scipy `documentation `_ for details on parameters. """ def _scipy_solve(self, compute_residual): solution = root(compute_residual, self._x, method='df-sane', options={'ftol': 0, 'fatol': self.tolerance}, callback=self.callback) Logger().get(LogLevel.info) << \ "DF-SANE/Scipy: {} ({} iterations, {})".format( solution.message, solution.nit, self.tolerance) if not solution.success: raise NLNoConvergence("DF-SANE/Scipy did not converge") return solution.x.copy() def ToleranceManager(start, end, rate): """Decorate solver to manage tolerance of non-linear solve step.""" def actual_decorator(cls): orig_init = cls.__init__ orig_solve = cls.solve orig_update_state = cls.updateState @wraps(cls.__init__) def __init__(obj, *args, **kwargs): orig_init(obj, *args, **kwargs) obj.setToleranceManager(_tolerance_manager(start, end, rate)) @wraps(cls.solve) def new_solve(obj, *args, **kwargs): ftol = obj.tolerance ftol *= rate obj.tolerance = max(ftol, end) return orig_solve(obj, *args, **kwargs) @wraps(cls.updateState) def updateState(obj, *args, **kwargs): obj.tolerance = start return orig_update_state(obj, *args, **kwargs) cls.__init__ = __init__ # cls.solve = new_solve # cls.updateState = updateState return cls return actual_decorator diff --git a/python/tamaas/utils.py b/python/tamaas/utils.py index f71bb16..feb9af9 100644 --- a/python/tamaas/utils.py +++ b/python/tamaas/utils.py @@ -1,322 +1,322 @@ # -*- mode: python; coding: utf-8 -*- # vim: set ft=python: # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convenience utilities.""" import inspect from collections import namedtuple from contextlib import contextmanager from typing import Iterable, Union from functools import partial from operator import contains import numpy as np -from . import ( +from ._tamaas import ( ContactSolver, Model, SurfaceGenerator1D, SurfaceGenerator2D, dtype, set_log_level, get_log_level, LogLevel, Logger, ) __all__ = [ "log_context", "publications", "load_path", "seeded_surfaces", "hertz_surface", "radial_average", ] class NoConvergenceError(RuntimeError): """Convergence not reached exception.""" @contextmanager def log_context(log_level: LogLevel): """Context manager to easily control Tamaas' logging level.""" current = get_log_level() set_log_level(log_level) try: yield finally: set_log_level(current) def publications(format_str="{pub.citation}\n\t{pub.doi}"): """Print publications associated with objects in use.""" Publication = namedtuple("Publication", ["citation", "doi"]) joss = Publication( citation=( "Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S. & Molinari, J.-F." " Tamaas: a library for elastic-plastic contact of periodic rough" " surfaces. Journal of Open Source Software 5, 2121 (2020)."), doi="10.21105/joss.02121", ) zenodo = Publication( citation=( "Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S., " "& Molinari, J.-F. Tamaas, a high-performance " "library for periodic rough surface contact. Zenodo (2019)."), doi="10.5281/zenodo.3479236", ) _publications = { k: v for keys, v in [ ( ( "SurfaceGeneratorRandomPhase1D", "SurfaceGeneratorRandomPhase2D", ), [ Publication( citation=( "Wu, J.-J. Simulation of rough surfaces with FFT." " Tribology International 33, 47–58 (2000)."), doi="10.1016/S0301-679X(00)00016-5", ), ], ), ( ("SurfaceGeneratorFilter1D", "SurfaceGeneratorFilter2D"), [ Publication( citation=( "Hu, Y. Z. & Tonder, K. Simulation of 3-D random" " rough surface by 2-D digital filter and fourier" " analysis. International Journal of Machine Tools" " and Manufacture 32, 83–90 (1992)."), doi="10.1016/0890-6955(92)90064-N", ), ], ), ( ("PolonskyKeerRey", ), [ Publication( citation=( "Polonsky, I. A. & Keer, L. M. A numerical method" " for solving rough contact problems based on the" " multi-level multi-summation and conjugate" " gradient techniques. Wear 231, 206–219 (1999)."), doi="10.1016/S0043-1648(99)00113-1", ), Publication( citation=( "Rey, V., Anciaux, G. & Molinari, J.-F. Normal" " adhesive contact on rough surfaces: efficient" " algorithm for FFT-based BEM resolution. Comput" " Mech 1–13 (2017)."), doi="10.1007/s00466-017-1392-5", ), ], ), ( ("DFSANESolver", "DFSANECXXSolver"), [ Publication( citation=( "La Cruz, W., Martínez, J. & Raydan, M. Spectral" " residual method without gradient information for" " solving large-scale nonlinear systems of" " equations. Math. Comp. 75, 1429–1448 (2006)."), doi="10.1090/S0025-5718-06-01840-0", ), ], ), ( ("Residual", ), [ Publication( citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &" " Anciaux, G. A Fourier-accelerated volume" " integral method for elastoplastic contact." " Computer Methods in Applied Mechanics and" " Engineering 351, 951–976 (2019)."), doi="10.1016/j.cma.2019.04.006", ), ], ), ( ("EPICSolver", ), [ Publication( citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &" " Anciaux, G. A Fourier-accelerated volume" " integral method for elastoplastic contact." " Computer Methods in Applied Mechanics and" " Engineering 351, 951–976 (2019)."), doi="10.1016/j.cma.2019.04.006", ), Publication( citation=( "Jacq, C., Nélias, D., Lormand, G. & Girodin, D." " Development of a Three-Dimensional" " Semi-Analytical Elastic-Plastic Contact Code." " Journal of Tribology 124, 653 (2002)."), doi="10.1115/1.1467920", ), ], ), ( ("Condat", ), [ Publication( citation=( "Condat, L. A Primal–Dual Splitting Method for" " Convex Optimization Involving Lipschitzian," " Proximable and Linear Composite Terms. J Optim" " Theory Appl 158, 460–479 (2012)."), doi="10.1007/s10957-012-0245-9", ), ], ), ( ("KatoSaturated", ), [ Publication( citation=( "Stanley, H. M. & Kato, T. An FFT-Based Method for" " Rough Surface Contact. J. Tribol 119, 481–485" " (1997)."), doi="10.1115/1.2833523", ), Publication( citation=( "Almqvist, A., Sahlin, F., Larsson, R. &" " Glavatskih, S. On the dry elasto-plastic contact" " of nominally flat surfaces. Tribology" " International 40, 574–579 (2007)."), doi="10.1016/j.triboint.2005.11.008", ), ], ), ] for k in keys } frame = inspect.stack()[1][0] caller_vars = (frame.f_globals | frame.f_locals) citable = filter( partial(contains, _publications.keys()), map(lambda x: type(x).__name__, caller_vars.values()), ) citable = [joss, zenodo] \ + list({pub for k in citable for pub in _publications[k]}) msg = "Please cite the following publications:\n\n" msg += "\n".join(format_str.format(pub=pub) for pub in citable) Logger().get(LogLevel.info) << msg return citable def load_path( solver: ContactSolver, loads: Iterable[Union[float, np.ndarray]], verbose: bool = False, callback=None, ) -> Iterable[Model]: """ Generate model objects solutions for a sequence of applied loads. :param solver: a contact solver object :param loads: an iterable sequence of loads :param verbose: print info output of solver :param callback: a callback executed after the yield """ log_level = LogLevel.info if verbose else LogLevel.warning with log_context(log_level): for load in loads: if solver.solve(load) > solver.tolerance: raise NoConvergenceError("Solver error exceeded tolerance") yield solver.model if callback is not None: callback() def seeded_surfaces( generator: Union[SurfaceGenerator1D, SurfaceGenerator2D], seeds: Iterable[int], ) -> Iterable[np.ndarray]: """ Generate rough surfaces with a prescribed seed sequence. :param generator: surface generator object :param seeds: random seed sequence """ for seed in seeds: generator.random_seed = seed yield generator.buildSurface() def hertz_surface(system_size: Iterable[float], shape: Iterable[int], radius: float) -> np.ndarray: """ Construct a parabolic surface. :param system_size: size of the domain in each direction :param shape: number of points in each direction :param radius: radius of surface """ coords = map( lambda L, N: np.linspace(0, L, N, endpoint=False, dtype=dtype), system_size, shape, ) coords = np.meshgrid(*coords, "ij", sparse=True) surface = (-1 / (2 * radius)) * sum( (x - L / 2)**2 for x, L in zip(coords, system_size)) - return surface + return np.asanyarray(surface) def radial_average(x: np.ndarray, y: np.ndarray, values: np.ndarray, r: np.ndarray, theta: np.ndarray, method: str = 'linear', endpoint: bool = False) -> np.ndarray: """Compute the radial average of a 2D field. Averages radially for specified r values. See :py:class:`scipy.interpolate.RegularGridInterpolator` for more details. """ try: from scipy.interpolate import RegularGridInterpolator except ImportError: raise ImportError("Install scipy to use tamaas.utils.radial_average") interpolator = RegularGridInterpolator((x, y), values, method=method) rr, tt = np.meshgrid(r, theta, indexing='ij', sparse=True) x, y = rr * np.cos(tt), rr * np.sin(tt) X = np.vstack((x.flatten(), y.flatten())).T return interpolator(X).reshape(x.shape).sum(axis=1) \ / (theta.size - int(not endpoint))