diff --git a/python/tamaas/dumpers/__init__.py b/python/tamaas/dumpers/__init__.py index 33381fb..b7c7d7f 100644 --- a/python/tamaas/dumpers/__init__.py +++ b/python/tamaas/dumpers/__init__.py @@ -1,579 +1,580 @@ # -*- mode:python; coding: utf-8 -*- # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Dumpers for the class :py:class:`Model `.""" from pathlib import Path import glob import json import io import typing as ts from collections.abc import Collection import numpy as np from .. import __version__, type_traits from .._tamaas import ( ModelDumper, model_type, mpi, ModelFactory, Model, ) from ._helper import ( step_dump, directory_dump, local_slice, _is_surface_field, _basic_types, file_handler, FileType, NameType, ) __all__ = [ "JSONDumper", "FieldDumper", "NumpyDumper", ] _reverse_trait_map = { 'model_type.' + t.__name__: mtype for mtype, t in type_traits.items() } def _get_attributes(model: Model): """Get model attributes.""" return { 'model_type': str(model.type), 'system_size': model.system_size, 'discretization': model.global_shape, 'boundary_fields': model.boundary_fields, 'program': f"Tamaas {__version__}, DOI:10.21105/joss.02121", } def _create_model(attrs: ts.MutableMapping): """Create a model from attribute dictionary.""" mtype = _reverse_trait_map[attrs['model_type']] # netcdf4 converts 1-lists attributes to numbers for attr in ['system_size', 'discretization']: if not isinstance(attrs[attr], Collection): attrs[attr] = [attrs[attr]] return ModelFactory.createModel(mtype, attrs['system_size'], attrs['discretization']) class MPIIncompatibilityError(RuntimeError): """Raised when code is not meant to be executed in MPI environment.""" class ModelError(ValueError): """Raised when unexpected model is passed to a dumper with a state.""" class ComponentsError(ValueError): """Raised when an unexpected number of components is encountred.""" class _ModelJSONEncoder(json.JSONEncoder): """Encode a model to JSON.""" def default(self, obj): """Encode model.""" if isinstance(obj, Model): model = obj attrs = _get_attributes(model) model_dict = { 'attrs': attrs, 'fields': {}, 'operators': [], } for field in model: model_dict['fields'][field] = model[field].tolist() for op in model.operators: model_dict['operators'].append(op) return model_dict return json.JSONEncoder.default(self, obj) class JSONDumper(ModelDumper): """Dumper to JSON.""" def __init__(self, file_descriptor: FileType): """Construct with file handle.""" super(JSONDumper, self).__init__() self.fd = file_descriptor @file_handler('w') def _dump_to_file(self, fd: ts.IO[str], model: Model): json.dump(model, fd, cls=_ModelJSONEncoder) def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.fd, model) @classmethod @file_handler('r') def read(cls, fd: ts.IO[str]): """Read model from file.""" properties = json.load(fd) model = _create_model(properties['attrs']) for name, field in properties['fields'].items(): v = np.asarray(field) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[name] = v return model class FieldDumper(ModelDumper): """Abstract dumper for python classes using fields.""" postfix = "" extension = "" name_format = "{basename}{postfix}.{extension}" def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(FieldDumper, self).__init__() self.basename = basename self.fields: ts.List[str] = list(fields) self.all_fields: bool = kwargs.get('all_fields', False) def add_field(self, field: str): """Add another field to the dump.""" if field not in self.fields: self.fields.append(field) def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump to a file (path-like or file handle).""" raise NotImplementedError() def get_fields(self, model: Model): """Get the desired fields.""" if not self.all_fields: requested_fields = self.fields else: requested_fields = list(model) return {field: model[field] for field in requested_fields} def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.file_path, model) @classmethod def read(cls, file_descriptor: FileType): """Read model from file.""" raise NotImplementedError( f'read() method not implemented in {cls.__name__}') @classmethod def read_sequence(cls, glob_pattern): """Read models from a file sequence.""" return map(cls.read, glob.iglob(glob_pattern)) @property def file_path(self): """Get the default filename.""" return self.name_format.format(basename=self.basename, postfix=self.postfix, extension=self.extension) @directory_dump('numpys') @step_dump class NumpyDumper(FieldDumper): """Dumper to compressed numpy files.""" extension = 'npz' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to compressed multi-field Numpy format.""" if mpi.size() > 1: raise MPIIncompatibilityError("NumpyDumper does not function " "at all in parallel") np.savez_compressed(file_descriptor, attrs=json.dumps(_get_attributes(model)), **self.get_fields(model)) @classmethod def read(cls, file_descriptor: FileType): """Create model from Numpy file.""" data = np.load(file_descriptor, mmap_mode='r') model = _create_model(json.loads(str(data['attrs']))) for k, v in filter(lambda k: k[0] != 'attrs', data.items()): if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[k] = v return model try: import h5py __all__.append("H5Dumper") @directory_dump('hdf5') @step_dump class H5Dumper(FieldDumper): """Dumper to HDF5 file format.""" extension = 'h5' @staticmethod def _hdf5_args(): if mpi.size() > 1: from mpi4py import MPI # noqa mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD) comp_args = {} # compression does not work in parallel else: mpi_args = {} comp_args = dict(compression='gzip', compression_opts=7) return mpi_args, comp_args def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to HDF5 with metadata about the model.""" # Setup for MPI if not h5py.get_config().mpi and mpi.size() > 1: raise MPIIncompatibilityError("HDF5 does not have MPI support") mpi_args, comp_args = self._hdf5_args() with h5py.File(file_descriptor, 'w', **mpi_args) as handle: # Writing data for name, field in self.get_fields(model).items(): shape = list(field.shape) if mpi.size() > 1: xdim = 0 if _is_surface_field(field, model) else 1 shape[xdim] = mpi_args['comm'].allreduce(shape[xdim]) dset = handle.create_dataset(name, shape, field.dtype, **comp_args) s = local_slice(field, model, name in model.boundary_fields) dset[s] = field # Writing metadata for name, attr in _get_attributes(model).items(): handle.attrs[name] = attr @classmethod def read(cls, file_descriptor: FileType): """Create model from HDF5 file.""" mpi_args, _ = cls._hdf5_args() with h5py.File(file_descriptor, 'r', **mpi_args) as handle: model = _create_model(handle.attrs) for k, v in handle.items(): v = np.asanyarray(v) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) surface_field = \ k in handle.attrs.get('boundary_fields', {}) \ or _is_surface_field(v, model) s = local_slice(v, model, surface_field) if (surface_field and v.ndim == len(model.boundary_shape)) \ or (not surface_field and v.ndim == len(model.shape)): s = s + (np.newaxis, ) model[k] = v[s].copy() return model except ImportError: pass try: import uvw # noqa __all__ += [ "UVWDumper", "UVWGroupDumper", ] @directory_dump('paraview') @step_dump class UVWDumper(FieldDumper): """Dumper to VTK files for elasto-plastic calculations.""" extension = 'vtr' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump displacements, plastic deformations and stresses.""" if mpi.size() > 1: raise MPIIncompatibilityError("UVWDumper does not function " "properly in parallel") bdim = len(model.boundary_shape) # Local MPI size lsize = model.shape gsize = mpi.global_shape(model.boundary_shape) gshape = gsize if len(lsize) > bdim: gshape = [model.shape[0]] + gshape # Space coordinates coordinates = [ np.linspace(0, L, N, endpoint=False) for L, N in zip(model.system_size, gshape) ] # If model has subsurfce domain, z-coordinate is always first dimension_indices = np.arange(bdim) if len(lsize) > bdim: dimension_indices += 1 - dimension_indices = np.concatenate((dimension_indices, [0])) + dimension_indices = np.concatenate((dimension_indices, + np.asarray([0]))) coordinates[0] = \ np.linspace(0, model.system_size[0], gshape[0]) offset = np.zeros_like(dimension_indices) offset[0] = mpi.local_offset(gsize) rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \ else uvw.parallel.PRectilinearGrid # Creating rectilinear grid with correct order for components coordlist = [ coordinates[i][o:o + lsize[i]] for i, o in zip(dimension_indices, offset) ] grid = rectgrid( file_descriptor, coordlist, compression=True, offsets=offset, ) fields = self.get_fields(model).items() # Iterator over fields we want to dump on system geometry if model.type in {model_type.volume_1d, model_type.volume_2d}: fields_it = filter(lambda t: not t[0] in model.boundary_fields, fields) else: fields_it = iter(fields) for name, field in fields_it: array = uvw.DataArray(field, dimension_indices, name) grid.addPointData(array) grid.write() @directory_dump('paraview') class UVWGroupDumper(FieldDumper): """Dumper to ParaViewData files.""" extension = 'pvd' def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs) subdir = Path('paraview') / f'{basename}-VTR' subdir.mkdir(parents=True, exist_ok=True) self.uvw_dumper = UVWDumper( Path(f'{basename}-VTR') / basename, *fields, **kwargs) self.group = uvw.ParaViewData(self.file_path, compression=True) def _dump_to_file(self, file_descriptor, model): self.group.addFile( self.uvw_dumper.file_path.replace('paraview/', ''), timestep=self.uvw_dumper.count, ) self.group.write() self.uvw_dumper.dump(model) except ImportError: pass try: from netCDF4 import Dataset __all__.append("cls") @directory_dump('netcdf') class NetCDFDumper(FieldDumper): """Dumper to netCDF4 files.""" extension = "nc" time_dim = 'frame' format = 'NETCDF4' def _file_setup(self, grp, model: Model): grp.createDimension(self.time_dim, None) # Attibutes for k, v in _get_attributes(model).items(): grp.setncattr(k, v) # Local dimensions voigt_dim = type_traits[model.type].voigt components = type_traits[model.type].components self._vec = grp.createDimension('spatial', components) self._tens = grp.createDimension('Voigt', voigt_dim) self.model_info = model.global_shape, model.type global_boundary_shape = mpi.global_shape(model.boundary_shape) # Create boundary dimensions for label, size, length in zip("xy", global_boundary_shape, model.boundary_system_size): grp.createDimension(label, size) coord = grp.createVariable(label, 'f8', (label, )) coord[:] = np.linspace(0, length, size, endpoint=False) self._create_variables(grp, model, lambda f: _is_surface_field(f[1], model), global_boundary_shape, "xy") # Create volume dimension if model.type in {model_type.volume_1d, model_type.volume_2d}: size = model.shape[0] grp.createDimension("z", size) coord = grp.createVariable("z", 'f8', ("z", )) coord[:] = np.linspace(0, model.system_size[0], size) self._create_variables( grp, model, lambda f: not _is_surface_field(f[1], model), model.global_shape, "zxy") self.has_setup = True def _set_collective(self, rootgrp): if mpi.size() == 1: return for v in rootgrp.variables.values(): if self.time_dim in v.dimensions: v.set_collective(True) def _dump_to_file(self, file_descriptor: FileType, model: Model): if not isinstance(file_descriptor, io.IOBase): mode = 'a' if Path(file_descriptor).is_file() \ and getattr(self, 'has_setup', False) else 'w' try: with Dataset(file_descriptor, mode, format=self.format, parallel=mpi.size() > 1) as rootgrp: if rootgrp.dimensions == {}: self._file_setup(rootgrp, model) self._set_collective(rootgrp) if self.model_info != (model.global_shape, model.type): raise ModelError(f"Unexpected model {mode}") self._dump_generic(rootgrp, model) except ValueError: raise MPIIncompatibilityError("NetCDF4 has no MPI support") def _create_variables(self, grp, model, predicate, shape, dimensions): field_dim = len(shape) fields = list(filter(predicate, self.get_fields(model).items())) dim_labels = list(dimensions[:field_dim]) for label, data in fields: local_dim = [] # If we have an extra component if data.ndim > field_dim: if data.shape[-1] == self._tens.size: local_dim = [self._tens.name] elif data.shape[-1] == self._vec.size: local_dim = [self._vec.name] else: raise ComponentsError( f"{label} has unexpected number of components " f"({data.shape[-1]})") # Downcasting in case of 128 bit float dtype = data.dtype if data.dtype.str[1:] != 'f16' else 'f8' grp.createVariable(label, dtype, [self.time_dim] + dim_labels + local_dim, zlib=mpi.size() == 0) def _dump_generic(self, grp, model): fields = self.get_fields(model).items() new_frame = len(grp.dimensions[self.time_dim]) for label, data in fields: var = grp[label] slice_in_global = (new_frame, ) + local_slice(data, model) var[slice_in_global] = np.array(data, dtype=var.dtype) @classmethod def _open_read(cls, fd): return Dataset(fd, 'r', format=cls.format, parallel=mpi.size() > 1) @staticmethod def _create_model(rootgrp): attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()} return _create_model(attrs) @staticmethod def _set_model_fields(rootgrp, model, frame): dims = rootgrp.dimensions.keys() for k, v in filter(lambda k: k[0] not in dims, rootgrp.variables.items()): v = v[frame, :] if model.type in _basic_types: v = np.asarray(v).reshape(list(v.shape) + [1]) model[k] = v[local_slice(v, model)].copy() @classmethod def read(cls, file_descriptor: FileType): """Create model with last frame.""" with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) cls._set_model_fields(rootgrp, model, -1) return model @classmethod def read_sequence(cls, file_descriptor: FileType): with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) for frame in range(len(rootgrp.dimensions[cls.time_dim])): cls._set_model_fields(rootgrp, model, frame) yield model except ImportError: pass diff --git a/python/tamaas/dumpers/_helper.py b/python/tamaas/dumpers/_helper.py index 5c393a3..38f8e9a 100644 --- a/python/tamaas/dumpers/_helper.py +++ b/python/tamaas/dumpers/_helper.py @@ -1,178 +1,178 @@ # -*- mode:python; coding: utf-8 -*- # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Helper functions for dumpers """ from os import PathLike from functools import wraps from pathlib import Path import io import typing as ts import numpy as np from .. import model_type, type_traits from .._tamaas import mpi __all__ = ["step_dump", "directory_dump", "local_slice"] _basic_types = [t for t, trait in type_traits.items() if trait.components == 1] -FileType = ts.Union[str, PathLike, io.TextIOBase] -NameType = ts.Union[str, PathLike] +FileType = ts.Union[str, PathLike[str], io.IOBase] +NameType = ts.Union[str, PathLike[str]] def _is_surface_field(field, model): def _to_global(shape): if len(shape) == len(model.boundary_shape) + 1: return mpi.global_shape(model.boundary_shape) + [shape[-1]] return mpi.global_shape(shape) def _compare_shape(a, b): return a == b b_shapes = [list(model[name].shape) for name in model.boundary_fields] shape = list(field.shape) return any(_compare_shape(shape, s) for s in b_shapes) \ or any(_compare_shape(shape, _to_global(s)) for s in b_shapes) def local_slice(field, model, surface_field=None): n = model.shape bn = model.boundary_shape gshape = mpi.global_shape(bn) offsets = np.zeros_like(gshape) offsets[0] = mpi.local_offset(gshape) if not surface_field: surface_field = _is_surface_field(field, model) if not surface_field and len(n) > len(bn): gshape = [n[0]] + gshape offsets = np.concatenate(([0], offsets)) shape = bn if surface_field else n if len(field.shape) > len(shape): shape += field.shape[len(shape):] def sgen(offset, size): return slice(offset, offset + size, None) def sgen_basic(offset, size): return slice(offset, offset + size) slice_gen = sgen_basic if model_type in _basic_types else sgen return tuple(map(slice_gen, offsets, shape)) def step_dump(cls): """ Decorator for dumper with counter for steps """ orig_init = cls.__init__ orig_dump = cls.dump @wraps(cls.__init__) def __init__(obj, *args, **kwargs): orig_init(obj, *args, **kwargs) obj.count = 0 def postfix(obj): return "_{:04d}".format(obj.count) @wraps(cls.dump) def dump(obj, *args, **kwargs): orig_dump(obj, *args, **kwargs) obj.count += 1 cls.__init__ = __init__ cls.dump = dump cls.postfix = property(postfix) return cls def directory_dump(directory=""): "Decorator for dumper in a directory" directory = Path(directory) def actual_decorator(cls): orig_dump = cls.dump orig_filepath = cls.file_path.fget orig_init = cls.__init__ @wraps(cls.__init__) def init(obj, *args, **kwargs): orig_init(obj, *args, **kwargs) obj.mkdir = kwargs.get('mkdir', True) @wraps(cls.dump) def dump(obj, *args, **kwargs): if mpi.rank() == 0 and getattr(obj, 'mkdir'): directory.mkdir(parents=True, exist_ok=True) orig_dump(obj, *args, **kwargs) @wraps(cls.file_path.fget) def file_path(obj): if getattr(obj, 'mkdir'): return str(directory / orig_filepath(obj)) return orig_filepath(obj) cls.__init__ = init cls.dump = dump cls.file_path = property(file_path) return cls return actual_decorator def hdf5toVTK(inpath: PathLike, outname: str): """Convert HDF5 dump of a model to VTK.""" from . import UVWDumper, H5Dumper # noqa UVWDumper(outname, all_fields=True) << H5Dumper.read(inpath) def netCDFtoParaview(inpath: PathLike, outname: str): """Convert NetCDF dump of model sequence to Paraview.""" from . import UVWGroupDumper, NetCDFDumper # noqa dumper = UVWGroupDumper(outname, all_fields=True) for model in NetCDFDumper.read_sequence(inpath): dumper << model def file_handler(mode: str): """Decorate a function to accept path-like or file handles.""" def _handler(func): @wraps(func) def _wrapped(self, fd: FileType, *args, **kwargs): if isinstance(fd, (str, PathLike)): with open(fd, mode) as fh: return _wrapped(self, fh, *args, **kwargs) elif isinstance(fd, io.TextIOBase): return func(self, fd, *args, **kwargs) raise TypeError( f"Expected a path-like or file handle, got {type(fd)}") return _wrapped return _handler diff --git a/python/tamaas/utils.py b/python/tamaas/utils.py index feb9af9..180b847 100644 --- a/python/tamaas/utils.py +++ b/python/tamaas/utils.py @@ -1,322 +1,321 @@ # -*- mode: python; coding: utf-8 -*- # vim: set ft=python: # # Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne), # Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) # Copyright (©) 2020-2023 Lucas Frérot # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Convenience utilities.""" import inspect from collections import namedtuple from contextlib import contextmanager from typing import Iterable, Union from functools import partial from operator import contains import numpy as np from ._tamaas import ( ContactSolver, Model, SurfaceGenerator1D, SurfaceGenerator2D, dtype, set_log_level, get_log_level, LogLevel, Logger, ) __all__ = [ "log_context", "publications", "load_path", "seeded_surfaces", "hertz_surface", "radial_average", ] class NoConvergenceError(RuntimeError): """Convergence not reached exception.""" @contextmanager def log_context(log_level: LogLevel): """Context manager to easily control Tamaas' logging level.""" current = get_log_level() set_log_level(log_level) try: yield finally: set_log_level(current) def publications(format_str="{pub.citation}\n\t{pub.doi}"): """Print publications associated with objects in use.""" Publication = namedtuple("Publication", ["citation", "doi"]) joss = Publication( citation=( "Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S. & Molinari, J.-F." " Tamaas: a library for elastic-plastic contact of periodic rough" " surfaces. Journal of Open Source Software 5, 2121 (2020)."), doi="10.21105/joss.02121", ) zenodo = Publication( citation=( "Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S., " "& Molinari, J.-F. Tamaas, a high-performance " "library for periodic rough surface contact. Zenodo (2019)."), doi="10.5281/zenodo.3479236", ) _publications = { k: v for keys, v in [ ( ( "SurfaceGeneratorRandomPhase1D", "SurfaceGeneratorRandomPhase2D", ), [ Publication( citation=( "Wu, J.-J. Simulation of rough surfaces with FFT." " Tribology International 33, 47–58 (2000)."), doi="10.1016/S0301-679X(00)00016-5", ), ], ), ( ("SurfaceGeneratorFilter1D", "SurfaceGeneratorFilter2D"), [ Publication( citation=( "Hu, Y. Z. & Tonder, K. Simulation of 3-D random" " rough surface by 2-D digital filter and fourier" " analysis. International Journal of Machine Tools" " and Manufacture 32, 83–90 (1992)."), doi="10.1016/0890-6955(92)90064-N", ), ], ), ( ("PolonskyKeerRey", ), [ Publication( citation=( "Polonsky, I. A. & Keer, L. M. A numerical method" " for solving rough contact problems based on the" " multi-level multi-summation and conjugate" " gradient techniques. Wear 231, 206–219 (1999)."), doi="10.1016/S0043-1648(99)00113-1", ), Publication( citation=( "Rey, V., Anciaux, G. & Molinari, J.-F. Normal" " adhesive contact on rough surfaces: efficient" " algorithm for FFT-based BEM resolution. Comput" " Mech 1–13 (2017)."), doi="10.1007/s00466-017-1392-5", ), ], ), ( ("DFSANESolver", "DFSANECXXSolver"), [ Publication( citation=( "La Cruz, W., Martínez, J. & Raydan, M. Spectral" " residual method without gradient information for" " solving large-scale nonlinear systems of" " equations. Math. Comp. 75, 1429–1448 (2006)."), doi="10.1090/S0025-5718-06-01840-0", ), ], ), ( ("Residual", ), [ Publication( citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &" " Anciaux, G. A Fourier-accelerated volume" " integral method for elastoplastic contact." " Computer Methods in Applied Mechanics and" " Engineering 351, 951–976 (2019)."), doi="10.1016/j.cma.2019.04.006", ), ], ), ( ("EPICSolver", ), [ Publication( citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &" " Anciaux, G. A Fourier-accelerated volume" " integral method for elastoplastic contact." " Computer Methods in Applied Mechanics and" " Engineering 351, 951–976 (2019)."), doi="10.1016/j.cma.2019.04.006", ), Publication( citation=( "Jacq, C., Nélias, D., Lormand, G. & Girodin, D." " Development of a Three-Dimensional" " Semi-Analytical Elastic-Plastic Contact Code." " Journal of Tribology 124, 653 (2002)."), doi="10.1115/1.1467920", ), ], ), ( ("Condat", ), [ Publication( citation=( "Condat, L. A Primal–Dual Splitting Method for" " Convex Optimization Involving Lipschitzian," " Proximable and Linear Composite Terms. J Optim" " Theory Appl 158, 460–479 (2012)."), doi="10.1007/s10957-012-0245-9", ), ], ), ( ("KatoSaturated", ), [ Publication( citation=( "Stanley, H. M. & Kato, T. An FFT-Based Method for" " Rough Surface Contact. J. Tribol 119, 481–485" " (1997)."), doi="10.1115/1.2833523", ), Publication( citation=( "Almqvist, A., Sahlin, F., Larsson, R. &" " Glavatskih, S. On the dry elasto-plastic contact" " of nominally flat surfaces. Tribology" " International 40, 574–579 (2007)."), doi="10.1016/j.triboint.2005.11.008", ), ], ), ] for k in keys } frame = inspect.stack()[1][0] caller_vars = (frame.f_globals | frame.f_locals) citable = filter( partial(contains, _publications.keys()), map(lambda x: type(x).__name__, caller_vars.values()), ) citable = [joss, zenodo] \ + list({pub for k in citable for pub in _publications[k]}) msg = "Please cite the following publications:\n\n" msg += "\n".join(format_str.format(pub=pub) for pub in citable) Logger().get(LogLevel.info) << msg return citable def load_path( solver: ContactSolver, loads: Iterable[Union[float, np.ndarray]], verbose: bool = False, callback=None, ) -> Iterable[Model]: """ Generate model objects solutions for a sequence of applied loads. :param solver: a contact solver object :param loads: an iterable sequence of loads :param verbose: print info output of solver :param callback: a callback executed after the yield """ log_level = LogLevel.info if verbose else LogLevel.warning with log_context(log_level): for load in loads: if solver.solve(load) > solver.tolerance: raise NoConvergenceError("Solver error exceeded tolerance") yield solver.model if callback is not None: callback() def seeded_surfaces( generator: Union[SurfaceGenerator1D, SurfaceGenerator2D], seeds: Iterable[int], ) -> Iterable[np.ndarray]: """ Generate rough surfaces with a prescribed seed sequence. :param generator: surface generator object :param seeds: random seed sequence """ for seed in seeds: generator.random_seed = seed yield generator.buildSurface() def hertz_surface(system_size: Iterable[float], shape: Iterable[int], radius: float) -> np.ndarray: """ Construct a parabolic surface. :param system_size: size of the domain in each direction :param shape: number of points in each direction :param radius: radius of surface """ - coords = map( - lambda L, N: np.linspace(0, L, N, endpoint=False, dtype=dtype), - system_size, - shape, - ) + coords = [ + np.linspace(0, L, N, endpoint=False, dtype=dtype) + for L, N in zip(system_size, shape) + ] coords = np.meshgrid(*coords, "ij", sparse=True) surface = (-1 / (2 * radius)) * sum( (x - L / 2)**2 for x, L in zip(coords, system_size)) return np.asanyarray(surface) def radial_average(x: np.ndarray, y: np.ndarray, values: np.ndarray, r: np.ndarray, theta: np.ndarray, method: str = 'linear', endpoint: bool = False) -> np.ndarray: """Compute the radial average of a 2D field. Averages radially for specified r values. See :py:class:`scipy.interpolate.RegularGridInterpolator` for more details. """ try: from scipy.interpolate import RegularGridInterpolator except ImportError: raise ImportError("Install scipy to use tamaas.utils.radial_average") interpolator = RegularGridInterpolator((x, y), values, method=method) rr, tt = np.meshgrid(r, theta, indexing='ij', sparse=True) x, y = rr * np.cos(tt), rr * np.sin(tt) X = np.vstack((x.flatten(), y.flatten())).T return interpolator(X).reshape(x.shape).sum(axis=1) \ / (theta.size - int(not endpoint))