diff --git a/python/tamaas/dumpers/__init__.py b/python/tamaas/dumpers/__init__.py
index 33381fb..b7c7d7f 100644
--- a/python/tamaas/dumpers/__init__.py
+++ b/python/tamaas/dumpers/__init__.py
@@ -1,579 +1,580 @@
# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""Dumpers for the class :py:class:`Model `."""
from pathlib import Path
import glob
import json
import io
import typing as ts
from collections.abc import Collection
import numpy as np
from .. import __version__, type_traits
from .._tamaas import (
ModelDumper,
model_type,
mpi,
ModelFactory,
Model,
)
from ._helper import (
step_dump,
directory_dump,
local_slice,
_is_surface_field,
_basic_types,
file_handler,
FileType,
NameType,
)
__all__ = [
"JSONDumper",
"FieldDumper",
"NumpyDumper",
]
_reverse_trait_map = {
'model_type.' + t.__name__: mtype
for mtype, t in type_traits.items()
}
def _get_attributes(model: Model):
"""Get model attributes."""
return {
'model_type': str(model.type),
'system_size': model.system_size,
'discretization': model.global_shape,
'boundary_fields': model.boundary_fields,
'program': f"Tamaas {__version__}, DOI:10.21105/joss.02121",
}
def _create_model(attrs: ts.MutableMapping):
"""Create a model from attribute dictionary."""
mtype = _reverse_trait_map[attrs['model_type']]
# netcdf4 converts 1-lists attributes to numbers
for attr in ['system_size', 'discretization']:
if not isinstance(attrs[attr], Collection):
attrs[attr] = [attrs[attr]]
return ModelFactory.createModel(mtype, attrs['system_size'],
attrs['discretization'])
class MPIIncompatibilityError(RuntimeError):
"""Raised when code is not meant to be executed in MPI environment."""
class ModelError(ValueError):
"""Raised when unexpected model is passed to a dumper with a state."""
class ComponentsError(ValueError):
"""Raised when an unexpected number of components is encountred."""
class _ModelJSONEncoder(json.JSONEncoder):
"""Encode a model to JSON."""
def default(self, obj):
"""Encode model."""
if isinstance(obj, Model):
model = obj
attrs = _get_attributes(model)
model_dict = {
'attrs': attrs,
'fields': {},
'operators': [],
}
for field in model:
model_dict['fields'][field] = model[field].tolist()
for op in model.operators:
model_dict['operators'].append(op)
return model_dict
return json.JSONEncoder.default(self, obj)
class JSONDumper(ModelDumper):
"""Dumper to JSON."""
def __init__(self, file_descriptor: FileType):
"""Construct with file handle."""
super(JSONDumper, self).__init__()
self.fd = file_descriptor
@file_handler('w')
def _dump_to_file(self, fd: ts.IO[str], model: Model):
json.dump(model, fd, cls=_ModelJSONEncoder)
def dump(self, model: Model):
"""Dump model."""
self._dump_to_file(self.fd, model)
@classmethod
@file_handler('r')
def read(cls, fd: ts.IO[str]):
"""Read model from file."""
properties = json.load(fd)
model = _create_model(properties['attrs'])
for name, field in properties['fields'].items():
v = np.asarray(field)
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[name] = v
return model
class FieldDumper(ModelDumper):
"""Abstract dumper for python classes using fields."""
postfix = ""
extension = ""
name_format = "{basename}{postfix}.{extension}"
def __init__(self, basename: NameType, *fields, **kwargs):
"""Construct with desired fields."""
super(FieldDumper, self).__init__()
self.basename = basename
self.fields: ts.List[str] = list(fields)
self.all_fields: bool = kwargs.get('all_fields', False)
def add_field(self, field: str):
"""Add another field to the dump."""
if field not in self.fields:
self.fields.append(field)
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Dump to a file (path-like or file handle)."""
raise NotImplementedError()
def get_fields(self, model: Model):
"""Get the desired fields."""
if not self.all_fields:
requested_fields = self.fields
else:
requested_fields = list(model)
return {field: model[field] for field in requested_fields}
def dump(self, model: Model):
"""Dump model."""
self._dump_to_file(self.file_path, model)
@classmethod
def read(cls, file_descriptor: FileType):
"""Read model from file."""
raise NotImplementedError(
f'read() method not implemented in {cls.__name__}')
@classmethod
def read_sequence(cls, glob_pattern):
"""Read models from a file sequence."""
return map(cls.read, glob.iglob(glob_pattern))
@property
def file_path(self):
"""Get the default filename."""
return self.name_format.format(basename=self.basename,
postfix=self.postfix,
extension=self.extension)
@directory_dump('numpys')
@step_dump
class NumpyDumper(FieldDumper):
"""Dumper to compressed numpy files."""
extension = 'npz'
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Save to compressed multi-field Numpy format."""
if mpi.size() > 1:
raise MPIIncompatibilityError("NumpyDumper does not function "
"at all in parallel")
np.savez_compressed(file_descriptor,
attrs=json.dumps(_get_attributes(model)),
**self.get_fields(model))
@classmethod
def read(cls, file_descriptor: FileType):
"""Create model from Numpy file."""
data = np.load(file_descriptor, mmap_mode='r')
model = _create_model(json.loads(str(data['attrs'])))
for k, v in filter(lambda k: k[0] != 'attrs', data.items()):
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[k] = v
return model
try:
import h5py
__all__.append("H5Dumper")
@directory_dump('hdf5')
@step_dump
class H5Dumper(FieldDumper):
"""Dumper to HDF5 file format."""
extension = 'h5'
@staticmethod
def _hdf5_args():
if mpi.size() > 1:
from mpi4py import MPI # noqa
mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD)
comp_args = {} # compression does not work in parallel
else:
mpi_args = {}
comp_args = dict(compression='gzip', compression_opts=7)
return mpi_args, comp_args
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Save to HDF5 with metadata about the model."""
# Setup for MPI
if not h5py.get_config().mpi and mpi.size() > 1:
raise MPIIncompatibilityError("HDF5 does not have MPI support")
mpi_args, comp_args = self._hdf5_args()
with h5py.File(file_descriptor, 'w', **mpi_args) as handle:
# Writing data
for name, field in self.get_fields(model).items():
shape = list(field.shape)
if mpi.size() > 1:
xdim = 0 if _is_surface_field(field, model) else 1
shape[xdim] = mpi_args['comm'].allreduce(shape[xdim])
dset = handle.create_dataset(name, shape, field.dtype,
**comp_args)
s = local_slice(field, model, name in model.boundary_fields)
dset[s] = field
# Writing metadata
for name, attr in _get_attributes(model).items():
handle.attrs[name] = attr
@classmethod
def read(cls, file_descriptor: FileType):
"""Create model from HDF5 file."""
mpi_args, _ = cls._hdf5_args()
with h5py.File(file_descriptor, 'r', **mpi_args) as handle:
model = _create_model(handle.attrs)
for k, v in handle.items():
v = np.asanyarray(v)
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
surface_field = \
k in handle.attrs.get('boundary_fields', {}) \
or _is_surface_field(v, model)
s = local_slice(v, model, surface_field)
if (surface_field and v.ndim == len(model.boundary_shape)) \
or (not surface_field and v.ndim == len(model.shape)):
s = s + (np.newaxis, )
model[k] = v[s].copy()
return model
except ImportError:
pass
try:
import uvw # noqa
__all__ += [
"UVWDumper",
"UVWGroupDumper",
]
@directory_dump('paraview')
@step_dump
class UVWDumper(FieldDumper):
"""Dumper to VTK files for elasto-plastic calculations."""
extension = 'vtr'
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Dump displacements, plastic deformations and stresses."""
if mpi.size() > 1:
raise MPIIncompatibilityError("UVWDumper does not function "
"properly in parallel")
bdim = len(model.boundary_shape)
# Local MPI size
lsize = model.shape
gsize = mpi.global_shape(model.boundary_shape)
gshape = gsize
if len(lsize) > bdim:
gshape = [model.shape[0]] + gshape
# Space coordinates
coordinates = [
np.linspace(0, L, N, endpoint=False)
for L, N in zip(model.system_size, gshape)
]
# If model has subsurfce domain, z-coordinate is always first
dimension_indices = np.arange(bdim)
if len(lsize) > bdim:
dimension_indices += 1
- dimension_indices = np.concatenate((dimension_indices, [0]))
+ dimension_indices = np.concatenate((dimension_indices,
+ np.asarray([0])))
coordinates[0] = \
np.linspace(0, model.system_size[0], gshape[0])
offset = np.zeros_like(dimension_indices)
offset[0] = mpi.local_offset(gsize)
rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \
else uvw.parallel.PRectilinearGrid
# Creating rectilinear grid with correct order for components
coordlist = [
coordinates[i][o:o + lsize[i]]
for i, o in zip(dimension_indices, offset)
]
grid = rectgrid(
file_descriptor,
coordlist,
compression=True,
offsets=offset,
)
fields = self.get_fields(model).items()
# Iterator over fields we want to dump on system geometry
if model.type in {model_type.volume_1d, model_type.volume_2d}:
fields_it = filter(lambda t: not t[0] in model.boundary_fields,
fields)
else:
fields_it = iter(fields)
for name, field in fields_it:
array = uvw.DataArray(field, dimension_indices, name)
grid.addPointData(array)
grid.write()
@directory_dump('paraview')
class UVWGroupDumper(FieldDumper):
"""Dumper to ParaViewData files."""
extension = 'pvd'
def __init__(self, basename: NameType, *fields, **kwargs):
"""Construct with desired fields."""
super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs)
subdir = Path('paraview') / f'{basename}-VTR'
subdir.mkdir(parents=True, exist_ok=True)
self.uvw_dumper = UVWDumper(
Path(f'{basename}-VTR') / basename, *fields, **kwargs)
self.group = uvw.ParaViewData(self.file_path, compression=True)
def _dump_to_file(self, file_descriptor, model):
self.group.addFile(
self.uvw_dumper.file_path.replace('paraview/', ''),
timestep=self.uvw_dumper.count,
)
self.group.write()
self.uvw_dumper.dump(model)
except ImportError:
pass
try:
from netCDF4 import Dataset
__all__.append("cls")
@directory_dump('netcdf')
class NetCDFDumper(FieldDumper):
"""Dumper to netCDF4 files."""
extension = "nc"
time_dim = 'frame'
format = 'NETCDF4'
def _file_setup(self, grp, model: Model):
grp.createDimension(self.time_dim, None)
# Attibutes
for k, v in _get_attributes(model).items():
grp.setncattr(k, v)
# Local dimensions
voigt_dim = type_traits[model.type].voigt
components = type_traits[model.type].components
self._vec = grp.createDimension('spatial', components)
self._tens = grp.createDimension('Voigt', voigt_dim)
self.model_info = model.global_shape, model.type
global_boundary_shape = mpi.global_shape(model.boundary_shape)
# Create boundary dimensions
for label, size, length in zip("xy", global_boundary_shape,
model.boundary_system_size):
grp.createDimension(label, size)
coord = grp.createVariable(label, 'f8', (label, ))
coord[:] = np.linspace(0, length, size, endpoint=False)
self._create_variables(grp, model,
lambda f: _is_surface_field(f[1], model),
global_boundary_shape, "xy")
# Create volume dimension
if model.type in {model_type.volume_1d, model_type.volume_2d}:
size = model.shape[0]
grp.createDimension("z", size)
coord = grp.createVariable("z", 'f8', ("z", ))
coord[:] = np.linspace(0, model.system_size[0], size)
self._create_variables(
grp, model, lambda f: not _is_surface_field(f[1], model),
model.global_shape, "zxy")
self.has_setup = True
def _set_collective(self, rootgrp):
if mpi.size() == 1:
return
for v in rootgrp.variables.values():
if self.time_dim in v.dimensions:
v.set_collective(True)
def _dump_to_file(self, file_descriptor: FileType, model: Model):
if not isinstance(file_descriptor, io.IOBase):
mode = 'a' if Path(file_descriptor).is_file() \
and getattr(self, 'has_setup', False) else 'w'
try:
with Dataset(file_descriptor,
mode,
format=self.format,
parallel=mpi.size() > 1) as rootgrp:
if rootgrp.dimensions == {}:
self._file_setup(rootgrp, model)
self._set_collective(rootgrp)
if self.model_info != (model.global_shape, model.type):
raise ModelError(f"Unexpected model {mode}")
self._dump_generic(rootgrp, model)
except ValueError:
raise MPIIncompatibilityError("NetCDF4 has no MPI support")
def _create_variables(self, grp, model, predicate, shape, dimensions):
field_dim = len(shape)
fields = list(filter(predicate, self.get_fields(model).items()))
dim_labels = list(dimensions[:field_dim])
for label, data in fields:
local_dim = []
# If we have an extra component
if data.ndim > field_dim:
if data.shape[-1] == self._tens.size:
local_dim = [self._tens.name]
elif data.shape[-1] == self._vec.size:
local_dim = [self._vec.name]
else:
raise ComponentsError(
f"{label} has unexpected number of components "
f"({data.shape[-1]})")
# Downcasting in case of 128 bit float
dtype = data.dtype if data.dtype.str[1:] != 'f16' else 'f8'
grp.createVariable(label,
dtype,
[self.time_dim] + dim_labels + local_dim,
zlib=mpi.size() == 0)
def _dump_generic(self, grp, model):
fields = self.get_fields(model).items()
new_frame = len(grp.dimensions[self.time_dim])
for label, data in fields:
var = grp[label]
slice_in_global = (new_frame, ) + local_slice(data, model)
var[slice_in_global] = np.array(data, dtype=var.dtype)
@classmethod
def _open_read(cls, fd):
return Dataset(fd, 'r', format=cls.format, parallel=mpi.size() > 1)
@staticmethod
def _create_model(rootgrp):
attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()}
return _create_model(attrs)
@staticmethod
def _set_model_fields(rootgrp, model, frame):
dims = rootgrp.dimensions.keys()
for k, v in filter(lambda k: k[0] not in dims,
rootgrp.variables.items()):
v = v[frame, :]
if model.type in _basic_types:
v = np.asarray(v).reshape(list(v.shape) + [1])
model[k] = v[local_slice(v, model)].copy()
@classmethod
def read(cls, file_descriptor: FileType):
"""Create model with last frame."""
with cls._open_read(file_descriptor) as rootgrp:
model = cls._create_model(rootgrp)
cls._set_model_fields(rootgrp, model, -1)
return model
@classmethod
def read_sequence(cls, file_descriptor: FileType):
with cls._open_read(file_descriptor) as rootgrp:
model = cls._create_model(rootgrp)
for frame in range(len(rootgrp.dimensions[cls.time_dim])):
cls._set_model_fields(rootgrp, model, frame)
yield model
except ImportError:
pass
diff --git a/python/tamaas/dumpers/_helper.py b/python/tamaas/dumpers/_helper.py
index 5c393a3..38f8e9a 100644
--- a/python/tamaas/dumpers/_helper.py
+++ b/python/tamaas/dumpers/_helper.py
@@ -1,178 +1,178 @@
# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""
Helper functions for dumpers
"""
from os import PathLike
from functools import wraps
from pathlib import Path
import io
import typing as ts
import numpy as np
from .. import model_type, type_traits
from .._tamaas import mpi
__all__ = ["step_dump", "directory_dump", "local_slice"]
_basic_types = [t for t, trait in type_traits.items() if trait.components == 1]
-FileType = ts.Union[str, PathLike, io.TextIOBase]
-NameType = ts.Union[str, PathLike]
+FileType = ts.Union[str, PathLike[str], io.IOBase]
+NameType = ts.Union[str, PathLike[str]]
def _is_surface_field(field, model):
def _to_global(shape):
if len(shape) == len(model.boundary_shape) + 1:
return mpi.global_shape(model.boundary_shape) + [shape[-1]]
return mpi.global_shape(shape)
def _compare_shape(a, b):
return a == b
b_shapes = [list(model[name].shape) for name in model.boundary_fields]
shape = list(field.shape)
return any(_compare_shape(shape, s) for s in b_shapes) \
or any(_compare_shape(shape, _to_global(s)) for s in b_shapes)
def local_slice(field, model, surface_field=None):
n = model.shape
bn = model.boundary_shape
gshape = mpi.global_shape(bn)
offsets = np.zeros_like(gshape)
offsets[0] = mpi.local_offset(gshape)
if not surface_field:
surface_field = _is_surface_field(field, model)
if not surface_field and len(n) > len(bn):
gshape = [n[0]] + gshape
offsets = np.concatenate(([0], offsets))
shape = bn if surface_field else n
if len(field.shape) > len(shape):
shape += field.shape[len(shape):]
def sgen(offset, size):
return slice(offset, offset + size, None)
def sgen_basic(offset, size):
return slice(offset, offset + size)
slice_gen = sgen_basic if model_type in _basic_types else sgen
return tuple(map(slice_gen, offsets, shape))
def step_dump(cls):
"""
Decorator for dumper with counter for steps
"""
orig_init = cls.__init__
orig_dump = cls.dump
@wraps(cls.__init__)
def __init__(obj, *args, **kwargs):
orig_init(obj, *args, **kwargs)
obj.count = 0
def postfix(obj):
return "_{:04d}".format(obj.count)
@wraps(cls.dump)
def dump(obj, *args, **kwargs):
orig_dump(obj, *args, **kwargs)
obj.count += 1
cls.__init__ = __init__
cls.dump = dump
cls.postfix = property(postfix)
return cls
def directory_dump(directory=""):
"Decorator for dumper in a directory"
directory = Path(directory)
def actual_decorator(cls):
orig_dump = cls.dump
orig_filepath = cls.file_path.fget
orig_init = cls.__init__
@wraps(cls.__init__)
def init(obj, *args, **kwargs):
orig_init(obj, *args, **kwargs)
obj.mkdir = kwargs.get('mkdir', True)
@wraps(cls.dump)
def dump(obj, *args, **kwargs):
if mpi.rank() == 0 and getattr(obj, 'mkdir'):
directory.mkdir(parents=True, exist_ok=True)
orig_dump(obj, *args, **kwargs)
@wraps(cls.file_path.fget)
def file_path(obj):
if getattr(obj, 'mkdir'):
return str(directory / orig_filepath(obj))
return orig_filepath(obj)
cls.__init__ = init
cls.dump = dump
cls.file_path = property(file_path)
return cls
return actual_decorator
def hdf5toVTK(inpath: PathLike, outname: str):
"""Convert HDF5 dump of a model to VTK."""
from . import UVWDumper, H5Dumper # noqa
UVWDumper(outname, all_fields=True) << H5Dumper.read(inpath)
def netCDFtoParaview(inpath: PathLike, outname: str):
"""Convert NetCDF dump of model sequence to Paraview."""
from . import UVWGroupDumper, NetCDFDumper # noqa
dumper = UVWGroupDumper(outname, all_fields=True)
for model in NetCDFDumper.read_sequence(inpath):
dumper << model
def file_handler(mode: str):
"""Decorate a function to accept path-like or file handles."""
def _handler(func):
@wraps(func)
def _wrapped(self, fd: FileType, *args, **kwargs):
if isinstance(fd, (str, PathLike)):
with open(fd, mode) as fh:
return _wrapped(self, fh, *args, **kwargs)
elif isinstance(fd, io.TextIOBase):
return func(self, fd, *args, **kwargs)
raise TypeError(
f"Expected a path-like or file handle, got {type(fd)}")
return _wrapped
return _handler
diff --git a/python/tamaas/utils.py b/python/tamaas/utils.py
index feb9af9..180b847 100644
--- a/python/tamaas/utils.py
+++ b/python/tamaas/utils.py
@@ -1,322 +1,321 @@
# -*- mode: python; coding: utf-8 -*-
# vim: set ft=python:
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""Convenience utilities."""
import inspect
from collections import namedtuple
from contextlib import contextmanager
from typing import Iterable, Union
from functools import partial
from operator import contains
import numpy as np
from ._tamaas import (
ContactSolver,
Model,
SurfaceGenerator1D,
SurfaceGenerator2D,
dtype,
set_log_level,
get_log_level,
LogLevel,
Logger,
)
__all__ = [
"log_context",
"publications",
"load_path",
"seeded_surfaces",
"hertz_surface",
"radial_average",
]
class NoConvergenceError(RuntimeError):
"""Convergence not reached exception."""
@contextmanager
def log_context(log_level: LogLevel):
"""Context manager to easily control Tamaas' logging level."""
current = get_log_level()
set_log_level(log_level)
try:
yield
finally:
set_log_level(current)
def publications(format_str="{pub.citation}\n\t{pub.doi}"):
"""Print publications associated with objects in use."""
Publication = namedtuple("Publication", ["citation", "doi"])
joss = Publication(
citation=(
"Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S. & Molinari, J.-F."
" Tamaas: a library for elastic-plastic contact of periodic rough"
" surfaces. Journal of Open Source Software 5, 2121 (2020)."),
doi="10.21105/joss.02121",
)
zenodo = Publication(
citation=(
"Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S., "
"& Molinari, J.-F. Tamaas, a high-performance "
"library for periodic rough surface contact. Zenodo (2019)."),
doi="10.5281/zenodo.3479236",
)
_publications = {
k: v
for keys, v in [
(
(
"SurfaceGeneratorRandomPhase1D",
"SurfaceGeneratorRandomPhase2D",
),
[
Publication(
citation=(
"Wu, J.-J. Simulation of rough surfaces with FFT."
" Tribology International 33, 47–58 (2000)."),
doi="10.1016/S0301-679X(00)00016-5",
),
],
),
(
("SurfaceGeneratorFilter1D", "SurfaceGeneratorFilter2D"),
[
Publication(
citation=(
"Hu, Y. Z. & Tonder, K. Simulation of 3-D random"
" rough surface by 2-D digital filter and fourier"
" analysis. International Journal of Machine Tools"
" and Manufacture 32, 83–90 (1992)."),
doi="10.1016/0890-6955(92)90064-N",
),
],
),
(
("PolonskyKeerRey", ),
[
Publication(
citation=(
"Polonsky, I. A. & Keer, L. M. A numerical method"
" for solving rough contact problems based on the"
" multi-level multi-summation and conjugate"
" gradient techniques. Wear 231, 206–219 (1999)."),
doi="10.1016/S0043-1648(99)00113-1",
),
Publication(
citation=(
"Rey, V., Anciaux, G. & Molinari, J.-F. Normal"
" adhesive contact on rough surfaces: efficient"
" algorithm for FFT-based BEM resolution. Comput"
" Mech 1–13 (2017)."),
doi="10.1007/s00466-017-1392-5",
),
],
),
(
("DFSANESolver", "DFSANECXXSolver"),
[
Publication(
citation=(
"La Cruz, W., Martínez, J. & Raydan, M. Spectral"
" residual method without gradient information for"
" solving large-scale nonlinear systems of"
" equations. Math. Comp. 75, 1429–1448 (2006)."),
doi="10.1090/S0025-5718-06-01840-0",
),
],
),
(
("Residual", ),
[
Publication(
citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &"
" Anciaux, G. A Fourier-accelerated volume"
" integral method for elastoplastic contact."
" Computer Methods in Applied Mechanics and"
" Engineering 351, 951–976 (2019)."),
doi="10.1016/j.cma.2019.04.006",
),
],
),
(
("EPICSolver", ),
[
Publication(
citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &"
" Anciaux, G. A Fourier-accelerated volume"
" integral method for elastoplastic contact."
" Computer Methods in Applied Mechanics and"
" Engineering 351, 951–976 (2019)."),
doi="10.1016/j.cma.2019.04.006",
),
Publication(
citation=(
"Jacq, C., Nélias, D., Lormand, G. & Girodin, D."
" Development of a Three-Dimensional"
" Semi-Analytical Elastic-Plastic Contact Code."
" Journal of Tribology 124, 653 (2002)."),
doi="10.1115/1.1467920",
),
],
),
(
("Condat", ),
[
Publication(
citation=(
"Condat, L. A Primal–Dual Splitting Method for"
" Convex Optimization Involving Lipschitzian,"
" Proximable and Linear Composite Terms. J Optim"
" Theory Appl 158, 460–479 (2012)."),
doi="10.1007/s10957-012-0245-9",
),
],
),
(
("KatoSaturated", ),
[
Publication(
citation=(
"Stanley, H. M. & Kato, T. An FFT-Based Method for"
" Rough Surface Contact. J. Tribol 119, 481–485"
" (1997)."),
doi="10.1115/1.2833523",
),
Publication(
citation=(
"Almqvist, A., Sahlin, F., Larsson, R. &"
" Glavatskih, S. On the dry elasto-plastic contact"
" of nominally flat surfaces. Tribology"
" International 40, 574–579 (2007)."),
doi="10.1016/j.triboint.2005.11.008",
),
],
),
] for k in keys
}
frame = inspect.stack()[1][0]
caller_vars = (frame.f_globals | frame.f_locals)
citable = filter(
partial(contains, _publications.keys()),
map(lambda x: type(x).__name__, caller_vars.values()),
)
citable = [joss, zenodo] \
+ list({pub for k in citable for pub in _publications[k]})
msg = "Please cite the following publications:\n\n"
msg += "\n".join(format_str.format(pub=pub) for pub in citable)
Logger().get(LogLevel.info) << msg
return citable
def load_path(
solver: ContactSolver,
loads: Iterable[Union[float, np.ndarray]],
verbose: bool = False,
callback=None,
) -> Iterable[Model]:
"""
Generate model objects solutions for a sequence of applied loads.
:param solver: a contact solver object
:param loads: an iterable sequence of loads
:param verbose: print info output of solver
:param callback: a callback executed after the yield
"""
log_level = LogLevel.info if verbose else LogLevel.warning
with log_context(log_level):
for load in loads:
if solver.solve(load) > solver.tolerance:
raise NoConvergenceError("Solver error exceeded tolerance")
yield solver.model
if callback is not None:
callback()
def seeded_surfaces(
generator: Union[SurfaceGenerator1D, SurfaceGenerator2D],
seeds: Iterable[int],
) -> Iterable[np.ndarray]:
"""
Generate rough surfaces with a prescribed seed sequence.
:param generator: surface generator object
:param seeds: random seed sequence
"""
for seed in seeds:
generator.random_seed = seed
yield generator.buildSurface()
def hertz_surface(system_size: Iterable[float], shape: Iterable[int],
radius: float) -> np.ndarray:
"""
Construct a parabolic surface.
:param system_size: size of the domain in each direction
:param shape: number of points in each direction
:param radius: radius of surface
"""
- coords = map(
- lambda L, N: np.linspace(0, L, N, endpoint=False, dtype=dtype),
- system_size,
- shape,
- )
+ coords = [
+ np.linspace(0, L, N, endpoint=False, dtype=dtype)
+ for L, N in zip(system_size, shape)
+ ]
coords = np.meshgrid(*coords, "ij", sparse=True)
surface = (-1 / (2 * radius)) * sum(
(x - L / 2)**2 for x, L in zip(coords, system_size))
return np.asanyarray(surface)
def radial_average(x: np.ndarray,
y: np.ndarray,
values: np.ndarray,
r: np.ndarray,
theta: np.ndarray,
method: str = 'linear',
endpoint: bool = False) -> np.ndarray:
"""Compute the radial average of a 2D field.
Averages radially for specified r values. See
:py:class:`scipy.interpolate.RegularGridInterpolator` for more details.
"""
try:
from scipy.interpolate import RegularGridInterpolator
except ImportError:
raise ImportError("Install scipy to use tamaas.utils.radial_average")
interpolator = RegularGridInterpolator((x, y), values, method=method)
rr, tt = np.meshgrid(r, theta, indexing='ij', sparse=True)
x, y = rr * np.cos(tt), rr * np.sin(tt)
X = np.vstack((x.flatten(), y.flatten())).T
return interpolator(X).reshape(x.shape).sum(axis=1) \
/ (theta.size - int(not endpoint))