diff --git a/python/tamaas/dumpers/__init__.py b/python/tamaas/dumpers/__init__.py
index b76c2a3..33381fb 100644
--- a/python/tamaas/dumpers/__init__.py
+++ b/python/tamaas/dumpers/__init__.py
@@ -1,581 +1,579 @@
# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""Dumpers for the class :py:class:`Model `."""
from pathlib import Path
-from os import PathLike
import glob
import json
import io
import typing as ts
from collections.abc import Collection
import numpy as np
-from .. import (
+from .. import __version__, type_traits
+from .._tamaas import (
ModelDumper,
model_type,
mpi,
- type_traits,
ModelFactory,
Model,
- __version__,
)
from ._helper import (
step_dump,
directory_dump,
local_slice,
_is_surface_field,
_basic_types,
file_handler,
+ FileType,
+ NameType,
)
__all__ = [
"JSONDumper",
"FieldDumper",
"NumpyDumper",
]
-FileType = ts.Union[str, PathLike, io.TextIOBase]
-NameType = ts.Union[str, PathLike]
-
_reverse_trait_map = {
'model_type.' + t.__name__: mtype
for mtype, t in type_traits.items()
}
def _get_attributes(model: Model):
"""Get model attributes."""
return {
'model_type': str(model.type),
'system_size': model.system_size,
'discretization': model.global_shape,
'boundary_fields': model.boundary_fields,
'program': f"Tamaas {__version__}, DOI:10.21105/joss.02121",
}
def _create_model(attrs: ts.MutableMapping):
"""Create a model from attribute dictionary."""
mtype = _reverse_trait_map[attrs['model_type']]
# netcdf4 converts 1-lists attributes to numbers
for attr in ['system_size', 'discretization']:
if not isinstance(attrs[attr], Collection):
attrs[attr] = [attrs[attr]]
return ModelFactory.createModel(mtype, attrs['system_size'],
attrs['discretization'])
class MPIIncompatibilityError(RuntimeError):
"""Raised when code is not meant to be executed in MPI environment."""
class ModelError(ValueError):
"""Raised when unexpected model is passed to a dumper with a state."""
class ComponentsError(ValueError):
"""Raised when an unexpected number of components is encountred."""
class _ModelJSONEncoder(json.JSONEncoder):
"""Encode a model to JSON."""
def default(self, obj):
"""Encode model."""
if isinstance(obj, Model):
model = obj
attrs = _get_attributes(model)
model_dict = {
'attrs': attrs,
'fields': {},
'operators': [],
}
for field in model:
model_dict['fields'][field] = model[field].tolist()
for op in model.operators:
model_dict['operators'].append(op)
return model_dict
return json.JSONEncoder.default(self, obj)
class JSONDumper(ModelDumper):
"""Dumper to JSON."""
def __init__(self, file_descriptor: FileType):
"""Construct with file handle."""
super(JSONDumper, self).__init__()
self.fd = file_descriptor
@file_handler('w')
- def _dump_to_file(self, fd: FileType, model: Model):
+ def _dump_to_file(self, fd: ts.IO[str], model: Model):
json.dump(model, fd, cls=_ModelJSONEncoder)
def dump(self, model: Model):
"""Dump model."""
self._dump_to_file(self.fd, model)
@classmethod
@file_handler('r')
- def read(cls, fd: FileType):
+ def read(cls, fd: ts.IO[str]):
"""Read model from file."""
properties = json.load(fd)
model = _create_model(properties['attrs'])
for name, field in properties['fields'].items():
v = np.asarray(field)
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[name] = v
return model
class FieldDumper(ModelDumper):
"""Abstract dumper for python classes using fields."""
postfix = ""
extension = ""
name_format = "{basename}{postfix}.{extension}"
def __init__(self, basename: NameType, *fields, **kwargs):
"""Construct with desired fields."""
super(FieldDumper, self).__init__()
self.basename = basename
self.fields: ts.List[str] = list(fields)
self.all_fields: bool = kwargs.get('all_fields', False)
def add_field(self, field: str):
"""Add another field to the dump."""
if field not in self.fields:
self.fields.append(field)
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Dump to a file (path-like or file handle)."""
raise NotImplementedError()
def get_fields(self, model: Model):
"""Get the desired fields."""
if not self.all_fields:
requested_fields = self.fields
else:
requested_fields = list(model)
return {field: model[field] for field in requested_fields}
def dump(self, model: Model):
"""Dump model."""
self._dump_to_file(self.file_path, model)
@classmethod
def read(cls, file_descriptor: FileType):
"""Read model from file."""
raise NotImplementedError(
f'read() method not implemented in {cls.__name__}')
@classmethod
def read_sequence(cls, glob_pattern):
"""Read models from a file sequence."""
return map(cls.read, glob.iglob(glob_pattern))
@property
def file_path(self):
"""Get the default filename."""
return self.name_format.format(basename=self.basename,
postfix=self.postfix,
extension=self.extension)
@directory_dump('numpys')
@step_dump
class NumpyDumper(FieldDumper):
"""Dumper to compressed numpy files."""
extension = 'npz'
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Save to compressed multi-field Numpy format."""
if mpi.size() > 1:
raise MPIIncompatibilityError("NumpyDumper does not function "
"at all in parallel")
np.savez_compressed(file_descriptor,
attrs=json.dumps(_get_attributes(model)),
**self.get_fields(model))
@classmethod
def read(cls, file_descriptor: FileType):
"""Create model from Numpy file."""
data = np.load(file_descriptor, mmap_mode='r')
model = _create_model(json.loads(str(data['attrs'])))
for k, v in filter(lambda k: k[0] != 'attrs', data.items()):
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[k] = v
return model
try:
import h5py
__all__.append("H5Dumper")
@directory_dump('hdf5')
@step_dump
class H5Dumper(FieldDumper):
"""Dumper to HDF5 file format."""
extension = 'h5'
@staticmethod
def _hdf5_args():
if mpi.size() > 1:
from mpi4py import MPI # noqa
mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD)
comp_args = {} # compression does not work in parallel
else:
mpi_args = {}
comp_args = dict(compression='gzip', compression_opts=7)
return mpi_args, comp_args
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Save to HDF5 with metadata about the model."""
# Setup for MPI
if not h5py.get_config().mpi and mpi.size() > 1:
raise MPIIncompatibilityError("HDF5 does not have MPI support")
mpi_args, comp_args = self._hdf5_args()
with h5py.File(file_descriptor, 'w', **mpi_args) as handle:
# Writing data
for name, field in self.get_fields(model).items():
shape = list(field.shape)
if mpi.size() > 1:
xdim = 0 if _is_surface_field(field, model) else 1
shape[xdim] = mpi_args['comm'].allreduce(shape[xdim])
dset = handle.create_dataset(name, shape, field.dtype,
**comp_args)
s = local_slice(field, model, name in model.boundary_fields)
dset[s] = field
# Writing metadata
for name, attr in _get_attributes(model).items():
handle.attrs[name] = attr
@classmethod
def read(cls, file_descriptor: FileType):
"""Create model from HDF5 file."""
mpi_args, _ = cls._hdf5_args()
with h5py.File(file_descriptor, 'r', **mpi_args) as handle:
model = _create_model(handle.attrs)
for k, v in handle.items():
v = np.asanyarray(v)
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
surface_field = \
k in handle.attrs.get('boundary_fields', {}) \
or _is_surface_field(v, model)
s = local_slice(v, model, surface_field)
if (surface_field and v.ndim == len(model.boundary_shape)) \
or (not surface_field and v.ndim == len(model.shape)):
s = s + (np.newaxis, )
model[k] = v[s].copy()
return model
except ImportError:
pass
try:
import uvw # noqa
__all__ += [
"UVWDumper",
"UVWGroupDumper",
]
@directory_dump('paraview')
@step_dump
class UVWDumper(FieldDumper):
"""Dumper to VTK files for elasto-plastic calculations."""
extension = 'vtr'
def _dump_to_file(self, file_descriptor: FileType, model: Model):
"""Dump displacements, plastic deformations and stresses."""
if mpi.size() > 1:
raise MPIIncompatibilityError("UVWDumper does not function "
"properly in parallel")
bdim = len(model.boundary_shape)
# Local MPI size
lsize = model.shape
gsize = mpi.global_shape(model.boundary_shape)
gshape = gsize
if len(lsize) > bdim:
gshape = [model.shape[0]] + gshape
# Space coordinates
coordinates = [
np.linspace(0, L, N, endpoint=False)
for L, N in zip(model.system_size, gshape)
]
# If model has subsurfce domain, z-coordinate is always first
dimension_indices = np.arange(bdim)
if len(lsize) > bdim:
dimension_indices += 1
dimension_indices = np.concatenate((dimension_indices, [0]))
coordinates[0] = \
np.linspace(0, model.system_size[0], gshape[0])
offset = np.zeros_like(dimension_indices)
offset[0] = mpi.local_offset(gsize)
rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \
else uvw.parallel.PRectilinearGrid
# Creating rectilinear grid with correct order for components
coordlist = [
coordinates[i][o:o + lsize[i]]
for i, o in zip(dimension_indices, offset)
]
grid = rectgrid(
file_descriptor,
coordlist,
compression=True,
offsets=offset,
)
fields = self.get_fields(model).items()
# Iterator over fields we want to dump on system geometry
if model.type in {model_type.volume_1d, model_type.volume_2d}:
fields_it = filter(lambda t: not t[0] in model.boundary_fields,
fields)
else:
fields_it = iter(fields)
for name, field in fields_it:
array = uvw.DataArray(field, dimension_indices, name)
grid.addPointData(array)
grid.write()
@directory_dump('paraview')
class UVWGroupDumper(FieldDumper):
"""Dumper to ParaViewData files."""
extension = 'pvd'
def __init__(self, basename: NameType, *fields, **kwargs):
"""Construct with desired fields."""
super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs)
subdir = Path('paraview') / f'{basename}-VTR'
subdir.mkdir(parents=True, exist_ok=True)
self.uvw_dumper = UVWDumper(
Path(f'{basename}-VTR') / basename, *fields, **kwargs)
self.group = uvw.ParaViewData(self.file_path, compression=True)
def _dump_to_file(self, file_descriptor, model):
self.group.addFile(
self.uvw_dumper.file_path.replace('paraview/', ''),
timestep=self.uvw_dumper.count,
)
self.group.write()
self.uvw_dumper.dump(model)
except ImportError:
pass
try:
from netCDF4 import Dataset
__all__.append("cls")
@directory_dump('netcdf')
class NetCDFDumper(FieldDumper):
"""Dumper to netCDF4 files."""
extension = "nc"
time_dim = 'frame'
format = 'NETCDF4'
def _file_setup(self, grp, model: Model):
grp.createDimension(self.time_dim, None)
# Attibutes
for k, v in _get_attributes(model).items():
grp.setncattr(k, v)
# Local dimensions
voigt_dim = type_traits[model.type].voigt
components = type_traits[model.type].components
self._vec = grp.createDimension('spatial', components)
self._tens = grp.createDimension('Voigt', voigt_dim)
self.model_info = model.global_shape, model.type
global_boundary_shape = mpi.global_shape(model.boundary_shape)
# Create boundary dimensions
for label, size, length in zip("xy", global_boundary_shape,
model.boundary_system_size):
grp.createDimension(label, size)
coord = grp.createVariable(label, 'f8', (label, ))
coord[:] = np.linspace(0, length, size, endpoint=False)
self._create_variables(grp, model,
lambda f: _is_surface_field(f[1], model),
global_boundary_shape, "xy")
# Create volume dimension
if model.type in {model_type.volume_1d, model_type.volume_2d}:
size = model.shape[0]
grp.createDimension("z", size)
coord = grp.createVariable("z", 'f8', ("z", ))
coord[:] = np.linspace(0, model.system_size[0], size)
self._create_variables(
grp, model, lambda f: not _is_surface_field(f[1], model),
model.global_shape, "zxy")
self.has_setup = True
def _set_collective(self, rootgrp):
if mpi.size() == 1:
return
for v in rootgrp.variables.values():
if self.time_dim in v.dimensions:
v.set_collective(True)
- def _dump_to_file(self, file_descriptor: NameType, model: Model):
+ def _dump_to_file(self, file_descriptor: FileType, model: Model):
- mode = 'a' if Path(file_descriptor).is_file() \
- and getattr(self, 'has_setup', False) else 'w'
+ if not isinstance(file_descriptor, io.IOBase):
+ mode = 'a' if Path(file_descriptor).is_file() \
+ and getattr(self, 'has_setup', False) else 'w'
try:
with Dataset(file_descriptor,
mode,
format=self.format,
parallel=mpi.size() > 1) as rootgrp:
if rootgrp.dimensions == {}:
self._file_setup(rootgrp, model)
self._set_collective(rootgrp)
if self.model_info != (model.global_shape, model.type):
raise ModelError(f"Unexpected model {mode}")
self._dump_generic(rootgrp, model)
except ValueError:
raise MPIIncompatibilityError("NetCDF4 has no MPI support")
def _create_variables(self, grp, model, predicate, shape, dimensions):
field_dim = len(shape)
fields = list(filter(predicate, self.get_fields(model).items()))
dim_labels = list(dimensions[:field_dim])
for label, data in fields:
local_dim = []
# If we have an extra component
if data.ndim > field_dim:
if data.shape[-1] == self._tens.size:
local_dim = [self._tens.name]
elif data.shape[-1] == self._vec.size:
local_dim = [self._vec.name]
else:
raise ComponentsError(
f"{label} has unexpected number of components "
f"({data.shape[-1]})")
# Downcasting in case of 128 bit float
dtype = data.dtype if data.dtype.str[1:] != 'f16' else 'f8'
grp.createVariable(label,
dtype,
[self.time_dim] + dim_labels + local_dim,
zlib=mpi.size() == 0)
def _dump_generic(self, grp, model):
fields = self.get_fields(model).items()
new_frame = len(grp.dimensions[self.time_dim])
for label, data in fields:
var = grp[label]
slice_in_global = (new_frame, ) + local_slice(data, model)
var[slice_in_global] = np.array(data, dtype=var.dtype)
@classmethod
def _open_read(cls, fd):
return Dataset(fd, 'r', format=cls.format, parallel=mpi.size() > 1)
@staticmethod
def _create_model(rootgrp):
attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()}
return _create_model(attrs)
@staticmethod
def _set_model_fields(rootgrp, model, frame):
dims = rootgrp.dimensions.keys()
for k, v in filter(lambda k: k[0] not in dims,
rootgrp.variables.items()):
v = v[frame, :]
if model.type in _basic_types:
v = np.asarray(v).reshape(list(v.shape) + [1])
model[k] = v[local_slice(v, model)].copy()
@classmethod
- def read(cls, file_descriptor: NameType):
+ def read(cls, file_descriptor: FileType):
"""Create model with last frame."""
with cls._open_read(file_descriptor) as rootgrp:
model = cls._create_model(rootgrp)
cls._set_model_fields(rootgrp, model, -1)
return model
@classmethod
- def read_sequence(cls, file_descriptor: NameType):
+ def read_sequence(cls, file_descriptor: FileType):
with cls._open_read(file_descriptor) as rootgrp:
model = cls._create_model(rootgrp)
for frame in range(len(rootgrp.dimensions[cls.time_dim])):
cls._set_model_fields(rootgrp, model, frame)
yield model
except ImportError:
pass
diff --git a/python/tamaas/dumpers/_helper.py b/python/tamaas/dumpers/_helper.py
index c36e499..5c393a3 100644
--- a/python/tamaas/dumpers/_helper.py
+++ b/python/tamaas/dumpers/_helper.py
@@ -1,174 +1,178 @@
# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""
Helper functions for dumpers
"""
from os import PathLike
from functools import wraps
from pathlib import Path
import io
+import typing as ts
import numpy as np
-from .. import model_type, type_traits, mpi
+from .. import model_type, type_traits
+from .._tamaas import mpi
__all__ = ["step_dump", "directory_dump", "local_slice"]
_basic_types = [t for t, trait in type_traits.items() if trait.components == 1]
+FileType = ts.Union[str, PathLike, io.TextIOBase]
+NameType = ts.Union[str, PathLike]
def _is_surface_field(field, model):
def _to_global(shape):
if len(shape) == len(model.boundary_shape) + 1:
return mpi.global_shape(model.boundary_shape) + [shape[-1]]
return mpi.global_shape(shape)
def _compare_shape(a, b):
return a == b
b_shapes = [list(model[name].shape) for name in model.boundary_fields]
shape = list(field.shape)
return any(_compare_shape(shape, s) for s in b_shapes) \
or any(_compare_shape(shape, _to_global(s)) for s in b_shapes)
def local_slice(field, model, surface_field=None):
n = model.shape
bn = model.boundary_shape
gshape = mpi.global_shape(bn)
offsets = np.zeros_like(gshape)
offsets[0] = mpi.local_offset(gshape)
if not surface_field:
surface_field = _is_surface_field(field, model)
if not surface_field and len(n) > len(bn):
gshape = [n[0]] + gshape
offsets = np.concatenate(([0], offsets))
shape = bn if surface_field else n
if len(field.shape) > len(shape):
shape += field.shape[len(shape):]
def sgen(offset, size):
return slice(offset, offset + size, None)
def sgen_basic(offset, size):
return slice(offset, offset + size)
slice_gen = sgen_basic if model_type in _basic_types else sgen
return tuple(map(slice_gen, offsets, shape))
def step_dump(cls):
"""
Decorator for dumper with counter for steps
"""
orig_init = cls.__init__
orig_dump = cls.dump
@wraps(cls.__init__)
def __init__(obj, *args, **kwargs):
orig_init(obj, *args, **kwargs)
obj.count = 0
def postfix(obj):
return "_{:04d}".format(obj.count)
@wraps(cls.dump)
def dump(obj, *args, **kwargs):
orig_dump(obj, *args, **kwargs)
obj.count += 1
cls.__init__ = __init__
cls.dump = dump
cls.postfix = property(postfix)
return cls
def directory_dump(directory=""):
"Decorator for dumper in a directory"
directory = Path(directory)
def actual_decorator(cls):
orig_dump = cls.dump
orig_filepath = cls.file_path.fget
orig_init = cls.__init__
@wraps(cls.__init__)
def init(obj, *args, **kwargs):
orig_init(obj, *args, **kwargs)
obj.mkdir = kwargs.get('mkdir', True)
@wraps(cls.dump)
def dump(obj, *args, **kwargs):
if mpi.rank() == 0 and getattr(obj, 'mkdir'):
directory.mkdir(parents=True, exist_ok=True)
orig_dump(obj, *args, **kwargs)
@wraps(cls.file_path.fget)
def file_path(obj):
if getattr(obj, 'mkdir'):
return str(directory / orig_filepath(obj))
return orig_filepath(obj)
cls.__init__ = init
cls.dump = dump
cls.file_path = property(file_path)
return cls
return actual_decorator
def hdf5toVTK(inpath: PathLike, outname: str):
"""Convert HDF5 dump of a model to VTK."""
from . import UVWDumper, H5Dumper # noqa
UVWDumper(outname, all_fields=True) << H5Dumper.read(inpath)
def netCDFtoParaview(inpath: PathLike, outname: str):
"""Convert NetCDF dump of model sequence to Paraview."""
from . import UVWGroupDumper, NetCDFDumper # noqa
dumper = UVWGroupDumper(outname, all_fields=True)
for model in NetCDFDumper.read_sequence(inpath):
dumper << model
-def file_handler(mode):
+def file_handler(mode: str):
"""Decorate a function to accept path-like or file handles."""
def _handler(func):
@wraps(func)
- def _wrapped(self, fd, *args, **kwargs):
+ def _wrapped(self, fd: FileType, *args, **kwargs):
if isinstance(fd, (str, PathLike)):
- with open(fd, mode) as fd:
- return _wrapped(self, fd, *args, **kwargs)
+ with open(fd, mode) as fh:
+ return _wrapped(self, fh, *args, **kwargs)
elif isinstance(fd, io.TextIOBase):
return func(self, fd, *args, **kwargs)
raise TypeError(
f"Expected a path-like or file handle, got {type(fd)}")
return _wrapped
return _handler
diff --git a/python/tamaas/nonlinear_solvers/__init__.py b/python/tamaas/nonlinear_solvers/__init__.py
index 1e28a69..eaf949e 100644
--- a/python/tamaas/nonlinear_solvers/__init__.py
+++ b/python/tamaas/nonlinear_solvers/__init__.py
@@ -1,171 +1,176 @@
# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""Nonlinear solvers for plasticity problems.
Solvers in this module use :py:mod:`scipy.optimize` to solve the implicit
non-linear equation for plastic deformations with fixed contact pressures.
"""
from functools import wraps
from scipy.optimize import newton_krylov, root
from scipy.optimize.nonlin import NoConvergence
-from .. import EPSolver, Logger, LogLevel, mpi
-from .._tamaas import _tolerance_manager
-from .._tamaas import _DFSANESolver as DFSANECXXSolver
+from .._tamaas import (
+ EPSolver,
+ Logger,
+ LogLevel,
+ mpi,
+ _tolerance_manager,
+ _DFSANESolver as DFSANECXXSolver,
+)
__all__ = ['NLNoConvergence',
'DFSANESolver',
'DFSANECXXSolver',
'NewtonKrylovSolver',
'ToleranceManager']
class NLNoConvergence(Exception):
"""Convergence not reached exception."""
class ScipySolver(EPSolver):
"""Base class for solvers wrapping SciPy routines."""
def __init__(self, residual, model=None, callback=None):
"""Construct nonlinear solver with residual.
:param residual: plasticity residual object
:param model: Deprecated
:param callback: passed on to the Scipy solver
"""
super(ScipySolver, self).__init__(residual)
if mpi.size() > 1:
raise RuntimeError("Scipy solvers cannot be used with MPI; "
"DFSANECXXSolver can be used instead")
self.callback = callback
self._x = self.getStrainIncrement()
self._residual = self.getResidual()
self.options = {'ftol': 0, 'fatol': 1e-9}
def solve(self):
"""Solve R(δε) = 0 using a Scipy function."""
# For initial guess, compute the strain due to boundary tractions
# self._residual.computeResidual(self._x)
# self._x[...] = self._residual.getVector()
EPSolver.beforeSolve(self)
# Scipy root callback
def compute_residual(vec):
self._residual.computeResidual(vec)
return self._residual.getVector().copy()
# Solve
Logger().get(LogLevel.debug) << \
"Entering non-linear solve\n"
self._x[...] = self._scipy_solve(compute_residual)
Logger().get(LogLevel.debug) << \
"Non-linear solve returned"
# Computing displacements
self._residual.computeResidualDisplacement(self._x)
def reset(self):
"""Set solution vector to zero."""
self._x[...] = 0
def _scipy_solve(self, compute_residual):
"""Actually call the scipy solver.
:param compute_residual: function returning residual for given variable
"""
raise NotImplementedError()
class NewtonKrylovSolver(ScipySolver):
"""Solve using a finite-difference Newton-Krylov method."""
def _scipy_solve(self, compute_residual):
try:
return newton_krylov(compute_residual, self._x,
f_tol=self.tolerance,
verbose=True, callback=self.callback)
except NoConvergence:
raise NLNoConvergence("Newton-Krylov did not converge")
class DFSANESolver(ScipySolver):
"""Solve using a spectral residual jacobianless method.
See :doi:`10.1090/S0025-5718-06-01840-0` for details on method and
the relevant Scipy `documentation
`_ for
details on parameters.
"""
def _scipy_solve(self, compute_residual):
solution = root(compute_residual,
self._x,
method='df-sane',
options={'ftol': 0, 'fatol': self.tolerance},
callback=self.callback)
Logger().get(LogLevel.info) << \
"DF-SANE/Scipy: {} ({} iterations, {})".format(
solution.message,
solution.nit,
self.tolerance)
if not solution.success:
raise NLNoConvergence("DF-SANE/Scipy did not converge")
return solution.x.copy()
def ToleranceManager(start, end, rate):
"""Decorate solver to manage tolerance of non-linear solve step."""
def actual_decorator(cls):
orig_init = cls.__init__
orig_solve = cls.solve
orig_update_state = cls.updateState
@wraps(cls.__init__)
def __init__(obj, *args, **kwargs):
orig_init(obj, *args, **kwargs)
obj.setToleranceManager(_tolerance_manager(start, end, rate))
@wraps(cls.solve)
def new_solve(obj, *args, **kwargs):
ftol = obj.tolerance
ftol *= rate
obj.tolerance = max(ftol, end)
return orig_solve(obj, *args, **kwargs)
@wraps(cls.updateState)
def updateState(obj, *args, **kwargs):
obj.tolerance = start
return orig_update_state(obj, *args, **kwargs)
cls.__init__ = __init__
# cls.solve = new_solve
# cls.updateState = updateState
return cls
return actual_decorator
diff --git a/python/tamaas/utils.py b/python/tamaas/utils.py
index f71bb16..feb9af9 100644
--- a/python/tamaas/utils.py
+++ b/python/tamaas/utils.py
@@ -1,322 +1,322 @@
# -*- mode: python; coding: utf-8 -*-
# vim: set ft=python:
#
# Copyright (©) 2016-2023 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2023 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
"""Convenience utilities."""
import inspect
from collections import namedtuple
from contextlib import contextmanager
from typing import Iterable, Union
from functools import partial
from operator import contains
import numpy as np
-from . import (
+from ._tamaas import (
ContactSolver,
Model,
SurfaceGenerator1D,
SurfaceGenerator2D,
dtype,
set_log_level,
get_log_level,
LogLevel,
Logger,
)
__all__ = [
"log_context",
"publications",
"load_path",
"seeded_surfaces",
"hertz_surface",
"radial_average",
]
class NoConvergenceError(RuntimeError):
"""Convergence not reached exception."""
@contextmanager
def log_context(log_level: LogLevel):
"""Context manager to easily control Tamaas' logging level."""
current = get_log_level()
set_log_level(log_level)
try:
yield
finally:
set_log_level(current)
def publications(format_str="{pub.citation}\n\t{pub.doi}"):
"""Print publications associated with objects in use."""
Publication = namedtuple("Publication", ["citation", "doi"])
joss = Publication(
citation=(
"Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S. & Molinari, J.-F."
" Tamaas: a library for elastic-plastic contact of periodic rough"
" surfaces. Journal of Open Source Software 5, 2121 (2020)."),
doi="10.21105/joss.02121",
)
zenodo = Publication(
citation=(
"Frérot, L., Anciaux, G., Rey, V., Pham-Ba, S., "
"& Molinari, J.-F. Tamaas, a high-performance "
"library for periodic rough surface contact. Zenodo (2019)."),
doi="10.5281/zenodo.3479236",
)
_publications = {
k: v
for keys, v in [
(
(
"SurfaceGeneratorRandomPhase1D",
"SurfaceGeneratorRandomPhase2D",
),
[
Publication(
citation=(
"Wu, J.-J. Simulation of rough surfaces with FFT."
" Tribology International 33, 47–58 (2000)."),
doi="10.1016/S0301-679X(00)00016-5",
),
],
),
(
("SurfaceGeneratorFilter1D", "SurfaceGeneratorFilter2D"),
[
Publication(
citation=(
"Hu, Y. Z. & Tonder, K. Simulation of 3-D random"
" rough surface by 2-D digital filter and fourier"
" analysis. International Journal of Machine Tools"
" and Manufacture 32, 83–90 (1992)."),
doi="10.1016/0890-6955(92)90064-N",
),
],
),
(
("PolonskyKeerRey", ),
[
Publication(
citation=(
"Polonsky, I. A. & Keer, L. M. A numerical method"
" for solving rough contact problems based on the"
" multi-level multi-summation and conjugate"
" gradient techniques. Wear 231, 206–219 (1999)."),
doi="10.1016/S0043-1648(99)00113-1",
),
Publication(
citation=(
"Rey, V., Anciaux, G. & Molinari, J.-F. Normal"
" adhesive contact on rough surfaces: efficient"
" algorithm for FFT-based BEM resolution. Comput"
" Mech 1–13 (2017)."),
doi="10.1007/s00466-017-1392-5",
),
],
),
(
("DFSANESolver", "DFSANECXXSolver"),
[
Publication(
citation=(
"La Cruz, W., Martínez, J. & Raydan, M. Spectral"
" residual method without gradient information for"
" solving large-scale nonlinear systems of"
" equations. Math. Comp. 75, 1429–1448 (2006)."),
doi="10.1090/S0025-5718-06-01840-0",
),
],
),
(
("Residual", ),
[
Publication(
citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &"
" Anciaux, G. A Fourier-accelerated volume"
" integral method for elastoplastic contact."
" Computer Methods in Applied Mechanics and"
" Engineering 351, 951–976 (2019)."),
doi="10.1016/j.cma.2019.04.006",
),
],
),
(
("EPICSolver", ),
[
Publication(
citation=("Frérot, L., Bonnet, M., Molinari, J.-F. &"
" Anciaux, G. A Fourier-accelerated volume"
" integral method for elastoplastic contact."
" Computer Methods in Applied Mechanics and"
" Engineering 351, 951–976 (2019)."),
doi="10.1016/j.cma.2019.04.006",
),
Publication(
citation=(
"Jacq, C., Nélias, D., Lormand, G. & Girodin, D."
" Development of a Three-Dimensional"
" Semi-Analytical Elastic-Plastic Contact Code."
" Journal of Tribology 124, 653 (2002)."),
doi="10.1115/1.1467920",
),
],
),
(
("Condat", ),
[
Publication(
citation=(
"Condat, L. A Primal–Dual Splitting Method for"
" Convex Optimization Involving Lipschitzian,"
" Proximable and Linear Composite Terms. J Optim"
" Theory Appl 158, 460–479 (2012)."),
doi="10.1007/s10957-012-0245-9",
),
],
),
(
("KatoSaturated", ),
[
Publication(
citation=(
"Stanley, H. M. & Kato, T. An FFT-Based Method for"
" Rough Surface Contact. J. Tribol 119, 481–485"
" (1997)."),
doi="10.1115/1.2833523",
),
Publication(
citation=(
"Almqvist, A., Sahlin, F., Larsson, R. &"
" Glavatskih, S. On the dry elasto-plastic contact"
" of nominally flat surfaces. Tribology"
" International 40, 574–579 (2007)."),
doi="10.1016/j.triboint.2005.11.008",
),
],
),
] for k in keys
}
frame = inspect.stack()[1][0]
caller_vars = (frame.f_globals | frame.f_locals)
citable = filter(
partial(contains, _publications.keys()),
map(lambda x: type(x).__name__, caller_vars.values()),
)
citable = [joss, zenodo] \
+ list({pub for k in citable for pub in _publications[k]})
msg = "Please cite the following publications:\n\n"
msg += "\n".join(format_str.format(pub=pub) for pub in citable)
Logger().get(LogLevel.info) << msg
return citable
def load_path(
solver: ContactSolver,
loads: Iterable[Union[float, np.ndarray]],
verbose: bool = False,
callback=None,
) -> Iterable[Model]:
"""
Generate model objects solutions for a sequence of applied loads.
:param solver: a contact solver object
:param loads: an iterable sequence of loads
:param verbose: print info output of solver
:param callback: a callback executed after the yield
"""
log_level = LogLevel.info if verbose else LogLevel.warning
with log_context(log_level):
for load in loads:
if solver.solve(load) > solver.tolerance:
raise NoConvergenceError("Solver error exceeded tolerance")
yield solver.model
if callback is not None:
callback()
def seeded_surfaces(
generator: Union[SurfaceGenerator1D, SurfaceGenerator2D],
seeds: Iterable[int],
) -> Iterable[np.ndarray]:
"""
Generate rough surfaces with a prescribed seed sequence.
:param generator: surface generator object
:param seeds: random seed sequence
"""
for seed in seeds:
generator.random_seed = seed
yield generator.buildSurface()
def hertz_surface(system_size: Iterable[float], shape: Iterable[int],
radius: float) -> np.ndarray:
"""
Construct a parabolic surface.
:param system_size: size of the domain in each direction
:param shape: number of points in each direction
:param radius: radius of surface
"""
coords = map(
lambda L, N: np.linspace(0, L, N, endpoint=False, dtype=dtype),
system_size,
shape,
)
coords = np.meshgrid(*coords, "ij", sparse=True)
surface = (-1 / (2 * radius)) * sum(
(x - L / 2)**2 for x, L in zip(coords, system_size))
- return surface
+ return np.asanyarray(surface)
def radial_average(x: np.ndarray,
y: np.ndarray,
values: np.ndarray,
r: np.ndarray,
theta: np.ndarray,
method: str = 'linear',
endpoint: bool = False) -> np.ndarray:
"""Compute the radial average of a 2D field.
Averages radially for specified r values. See
:py:class:`scipy.interpolate.RegularGridInterpolator` for more details.
"""
try:
from scipy.interpolate import RegularGridInterpolator
except ImportError:
raise ImportError("Install scipy to use tamaas.utils.radial_average")
interpolator = RegularGridInterpolator((x, y), values, method=method)
rr, tt = np.meshgrid(r, theta, indexing='ij', sparse=True)
x, y = rr * np.cos(tt), rr * np.sin(tt)
X = np.vstack((x.flatten(), y.flatten())).T
return interpolator(X).reshape(x.shape).sum(axis=1) \
/ (theta.size - int(not endpoint))