Page MenuHomec4science
No OneTemporary

File Metadata

Wed, May 1, 11:15

# -*- mode:python; coding: utf-8 -*-
# @file
# Copyright (©) 2016-2021 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <>.
Dumpers for the class tamaas.Model
from __future__ import print_function
from pathlib import Path
import json
import io
import numpy as np
from .. import ModelDumper, model_type, mpi, type_traits, ModelFactory, Model
from ._helper import (
step_dump, directory_dump, local_slice, _is_surface_field, _basic_types,
_reverse_trait_map = {
'model_type.' + t.__name__: mtype for mtype, t in type_traits.items()
def _get_attributes(model):
"Get model attributes"
return {
'model_type': str(model.type),
'system_size': model.system_size,
'discretization': model.global_shape,
def _create_model(attrs):
"Create a model from attribute dictionaty"
mtype = _reverse_trait_map[attrs['model_type']]
# netcdf4 converts 1-lists attributes to numbers
for attr in ['system_size', 'discretization']:
if not isinstance(attrs[attr], (list, np.ndarray)):
attrs[attr] = [attrs[attr]]
return ModelFactory.createModel(mtype,
class ModelJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Model):
model = obj
attrs = _get_attributes(model)
model_dict = {
'attrs': attrs,
'fields': {},
'operators': [],
for field in model:
model_dict['fields'][field] = model[field].tolist()
for op in model.operators:
return model_dict
return json.JSONEncoder.default(self, obj)
class JSONDumper(ModelDumper):
def __init__(self, file_descriptor):
super(JSONDumper, self).__init__()
self.fd = file_descriptor
def dump_to_file(self, fd, model):
json.dump(model, fd, cls=ModelJSONEncoder)
def dump(self, model):
if not isinstance(self.fd, io.TextIOBase):
with open(self.fd, 'w') as fd:
self.dump_to_file(fd, model)
self.dump_to_file(self.fd, model)
def read_from_file(self, fd):
properties = json.load(fd)
model = _create_model(properties['attrs'])
for name, field in properties['fields'].items():
v = np.asarray(field)
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[name] = v
return model
def read(self, file_descriptor):
if not isinstance(file_descriptor, io.TextIOBase):
with open(file_descriptor, 'r') as fd:
return self.read_from_file(fd)
return self.read_from_file(file_descriptor)
class FieldDumper(ModelDumper):
"""Abstract dumper for python classes using fields"""
postfix = ''
extension = ''
name_format = "{basename}{postfix}.{extension}"
def __init__(self, basename, *fields, **kwargs):
"""Construct with desired fields"""
super(FieldDumper, self).__init__()
self.basename = basename
self.fields = list(fields)
self.all_fields = kwargs.get('all_fields', False)
def add_field(self, field):
"""Add another field to the dump"""
if field not in self.fields:
def dump_to_file(self, file_descriptor, model):
"""Dump to a file (name or handle)"""
def get_fields(self, model):
"""Get the desired fields"""
if not self.all_fields:
requested_fields = self.fields
requested_fields = list(model)
return {field: model[field] for field in requested_fields}
def dump(self, model):
"Dump model"
self.dump_to_file(self.file_path, model)
def read(self, file_descriptor):
raise NotImplementedError('read() method not implemented in {}'
def file_path(self):
"""Get the default filename"""
return self.name_format.format(basename=self.basename,
class NumpyDumper(FieldDumper):
"""Dumper to compressed numpy files"""
extension = 'npz'
def dump_to_file(self, file_descriptor, model):
"""Saving to compressed multi-field Numpy format"""
if mpi.size() > 1:
raise RuntimeError("NumpyDumper does not function "
"at all in parallel")
np.savez_compressed(file_descriptor, attrs=_get_attributes(model),
def read(self, file_descriptor):
"Create model from Numpy file"
data = np.load(file_descriptor, mmap_mode='r')
model = _create_model(data['attrs'].item())
for k, v in filter(lambda k: k[0] != 'attrs', data.items()):
if model.type in _basic_types:
v = v.reshape(list(v.shape) + [1])
model[k] = v
return model
import h5py
class H5Dumper(FieldDumper):
"""Dumper to HDF5 file format"""
extension = 'h5'
def _hdf5_args(self):
if mpi.size() > 1:
from mpi4py import MPI # noqa
mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD)
comp_args = {} # compression does not work in parallel
mpi_args = {}
comp_args = dict(compression='gzip', compression_opts=7)
return mpi_args, comp_args
def dump_to_file(self, file_descriptor, model):
"""Saving to HDF5 with metadata about the model"""
# Setup for MPI
if not h5py.get_config().mpi and mpi.size() > 1:
raise RuntimeError("HDF5 does not have MPI support")
mpi_args, comp_args = self._hdf5_args()
with h5py.File(file_descriptor, 'w', **mpi_args) as handle:
# Writing data
for name, field in self.get_fields(model).items():
shape = list(field.shape)
if mpi.size() > 1:
xdim = 0 if _is_surface_field(field, model) else 1
shape[xdim] = mpi_args['comm'].allreduce(shape[xdim])
dset = handle.create_dataset(name, shape, field.dtype,
dset[local_slice(field, model)] = field
# Writing metadata
for name, attr in _get_attributes(model).items():
handle.attrs[name] = attr
def read(self, file_descriptor):
"Create model from HDF5 file"
mpi_args, _ = self._hdf5_args()
with h5py.File(file_descriptor, 'r', **mpi_args) as handle:
model = _create_model(handle.attrs)
for k, v in handle.items():
if model.type in _basic_types:
v = np.asarray(v).reshape(list(v.shape) + [1])
model[k] = v[local_slice(v, model)]
return model
except ImportError:
import uvw # noqa
class UVWDumper(FieldDumper):
"""Dumper to VTK files for elasto-plastic calculations"""
extension = 'vtr'
forbidden_fields = ['traction', 'gap']
def dump_to_file(self, file_descriptor, model):
"""Dump displacements, plastic deformations and stresses"""
if mpi.size() > 1:
raise RuntimeError("UVWDumper does not function "
"properly in parallel")
bdim = len(model.boundary_shape)
# Local MPI size
lsize = model.shape
gsize = mpi.global_shape(model.boundary_shape)
gshape = gsize
if len(lsize) > bdim:
gshape = [model.shape[0]] + gshape
# Space coordinates
coordinates = [np.linspace(0, L, N, endpoint=False)
for L, N in zip(model.system_size, gshape)]
# If model has subsurfce domain, z-coordinate is always first
dimension_indices = np.arange(bdim)
if len(lsize) > bdim:
dimension_indices += 1
dimension_indices = np.concatenate((dimension_indices, [0]))
coordinates[0] = \
np.linspace(0, model.system_size[0], gshape[0])
offset = np.zeros_like(dimension_indices)
offset[0] = mpi.local_offset(gsize)
rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \
else uvw.parallel.PRectilinearGrid
# Creating rectilinear grid with correct order for components
coordlist = [coordinates[i][o:o+lsize[i]]
for i, o in zip(dimension_indices, offset)]
grid = rectgrid(
file_descriptor, coordlist,
# Iterator over fields we want to dump
fields_it = filter(lambda t: t[0] not in self.forbidden_fields,
# We make fields periodic for visualization
for name, field in fields_it:
array = uvw.DataArray(field, dimension_indices, name)
class UVWGroupDumper(FieldDumper):
"Dumper to ParaViewData files"
extension = 'pvd'
def __init__(self, basename, *fields, **kwargs):
"""Construct with desired fields"""
super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs)
subdir = Path('paraview') / basename + '-VTR'
subdir.mkdir(parents=True, exist_ok=True)
self.uvw_dumper = UVWDumper(
Path(basename + '-VTR') / basename, *fields, **kwargs
) = uvw.ParaViewData(self.file_path, compression=True)
def dump_to_file(self, file_descriptor, model):
self.uvw_dumper.file_path.replace('paraview/', ''),
except ImportError:
from netCDF4 import Dataset
class NetCDFDumper(FieldDumper):
"""Dumper to netCDF4 files"""
extension = "nc"
boundary_fields = ['traction', 'gap']
def _file_setup(self, grp, model):
grp.createDimension('frame', None)
# Attibutes
for k, v in _get_attributes(model).items():
grp.setncattr(k, v)
# Local dimensions
voigt_dim = type_traits[model.type].voigt
components = type_traits[model.type].components
self._vec = grp.createDimension('spatial', components)
self._tens = grp.createDimension('Voigt', voigt_dim)
self.model_info = model.global_shape, model.type
global_boundary_shape = mpi.global_shape(model.boundary_shape)
# Create boundary dimensions
for label, size, length in zip(
grp.createDimension(label, size)
coord = grp.createVariable(label, 'f8', (label,))
coord[:] = np.linspace(0, length, size, endpoint=False)
grp, model,
lambda f: _is_surface_field(f[1], model),
global_boundary_shape, "xy"
# Create volume dimension
if model.type in {model_type.volume_1d, model_type.volume_2d}:
size = model.shape[0]
grp.createDimension("z", size)
coord = grp.createVariable("z", 'f8', ("z",))
coord[:] = np.linspace(0, model.system_size[0], size)
grp, model,
lambda f: not _is_surface_field(f[1], model),
model.global_shape, "zxy"
self.has_setup = True
def dump_to_file(self, file_descriptor, model):
if mpi.size() > 1:
raise RuntimeError("NetCDFDumper does not function "
"properly in parallel")
mode = 'a' if Path(file_descriptor).is_file() \
and getattr(self, 'has_setup', False) else 'w'
with Dataset(file_descriptor, mode,
parallel=mpi.size() > 1) as rootgrp:
if rootgrp.dimensions == {}:
self._file_setup(rootgrp, model)
if self.model_info != (model.global_shape, model.type):
raise Exception("Unexpected model {}".format(model))
self._dump_generic(rootgrp, model)
def _create_variables(self, grp, model, predicate,
shape, dimensions):
field_dim = len(shape)
fields = list(filter(predicate, self.get_fields(model).items()))
dim_labels = list(dimensions[:field_dim])
for label, data in fields:
local_dim = []
# If we have an extra component
if data.ndim > field_dim:
if data.shape[-1] == self._tens.size:
local_dim = []
elif data.shape[-1] == self._vec.size:
local_dim = []
raise ValueError(
"{} has unexpected number of components ({})"
.format(label, data.shape[-1]))
grp.createVariable(label, 'f8',
['frame'] + dim_labels + local_dim,
def _dump_generic(self, grp, model):
fields = self.get_fields(model).items()
new_frame = grp.dimensions['frame'].size
for label, data in fields:
var = grp[label]
slice_in_global = (new_frame,) + local_slice(data, model)
var[slice_in_global] = np.array(data, dtype=np.double)
def read(self, file_descriptor):
"Create model with last frame"
with Dataset(file_descriptor, 'r',
format='NETCDF4_CLASSIC') as rootgrp:
attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()}
model = _create_model(attrs)
dims = rootgrp.dimensions.keys()
for k, v in filter(lambda k: k[0] not in dims,
v = v[-1, :]
if model.type in _basic_types:
v = np.asarray(v).reshape(list(v.shape) + [1])
model[k] = v
return model
except ImportError:

Event Timeline