Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61701960
writer.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 8, 10:36
Size
6 KB
Mime Type
text/x-python
Expires
Fri, May 10, 10:36 (2 d)
Engine
blob
Format
Raw Data
Handle
17547015
Attached To
rUVW UVW
writer.py
View Options
import
xml.dom.minidom
as
dom
import
io
import
zlib
import
numpy
as
np
from
functools
import
reduce
from
operator
import
add
from
base64
import
b64encode
def
setAttributes
(
node
,
attributes
):
"""Set attributes of a node"""
for
item
in
attributes
.
items
():
node
.
setAttribute
(
*
item
)
def
encodeArray
(
array
,
level
):
def
compress
(
array
):
"""Compress array with zlib. Returns header and compressed data."""
raw_data
=
array
.
tobytes
()
max_block_size
=
2
**
15
# Enough blocks to span whole data
nblocks
=
len
(
raw_data
)
//
max_block_size
+
1
last_block_size
=
len
(
raw_data
)
%
max_block_size
# Compress regular blocks
compressed_data
=
[
zlib
.
compress
(
raw_data
[
i
*
max_block_size
:(
i
+
1
)
*
max_block_size
],
level
)
for
i
in
range
(
nblocks
-
1
)
]
# Compress last (smaller) block
compressed_data
.
append
(
zlib
.
compress
(
raw_data
[
-
last_block_size
:],
level
)
)
# Header data (cf https://vtk.org/Wiki/VTK_XML_Formats#Compressed_Data)
usize
=
max_block_size
psize
=
last_block_size
csize
=
[
len
(
x
)
for
x
in
compressed_data
]
header
=
np
.
array
([
nblocks
,
usize
,
psize
]
+
csize
,
dtype
=
np
.
uint32
)
return
header
.
tobytes
(),
b
""
.
join
(
compressed_data
)
def
raw
(
array
):
"""Returns header and array data in bytes."""
header
=
np
.
array
([
array
.
nbytes
],
dtype
=
np
.
uint32
)
return
header
.
tobytes
(),
array
.
tobytes
()
if
level
is
not
None
:
data
=
compress
(
array
)
else
:
data
=
raw
(
array
)
return
reduce
(
add
,
map
(
lambda
x
:
b64encode
(
x
)
.
decode
(),
data
))
class
Component
:
"""Generic component class capable of registering sub-components"""
def
__init__
(
self
,
name
,
parent_node
,
writer
):
self
.
writer
=
writer
self
.
document
=
writer
.
document
self
.
node
=
self
.
document
.
createElement
(
name
)
parent_node
.
appendChild
(
self
.
node
)
def
setAttributes
(
self
,
attributes
):
setAttributes
(
self
.
node
,
attributes
)
def
register
(
self
,
name
,
attributes
=
{}):
"""Register a sub-component"""
if
type
(
attributes
)
!=
dict
:
raise
Exception
(
'Cannot register attributes of type '
+
str
(
type
(
attributes
)))
sub_component
=
Component
(
name
,
self
.
node
,
self
.
writer
)
setAttributes
(
sub_component
.
node
,
attributes
)
return
sub_component
def
_addArrayNodeData
(
self
,
data_array
,
node
,
vtk_format
):
if
vtk_format
==
'ascii'
:
data_as_str
=
reduce
(
lambda
x
,
y
:
x
+
str
(
y
)
+
' '
,
data_array
.
flat_data
,
""
)
node
.
appendChild
(
self
.
document
.
createTextNode
(
data_as_str
))
elif
vtk_format
==
'binary'
:
node
.
appendChild
(
self
.
document
.
createTextNode
(
encodeArray
(
data_array
.
flat_data
,
level
=
self
.
writer
.
compression
)))
else
:
raise
Exception
(
'Unsupported VTK Format "{}"'
.
format
(
vtk_format
))
def
registerDataArray
(
self
,
data_array
,
vtk_format
=
'binary'
):
"""Register a DataArray object"""
array_component
=
Component
(
'DataArray'
,
self
.
node
,
self
.
writer
)
attributes
=
data_array
.
attributes
attributes
[
'format'
]
=
vtk_format
# Write array data
self
.
_addArrayNodeData
(
data_array
,
array_component
.
node
,
vtk_format
)
setAttributes
(
array_component
.
node
,
attributes
)
def
registerPDataArray
(
self
,
data_array
,
vtk_format
=
'binary'
):
"""Register a DataArray object in p-file"""
array_component
=
Component
(
'PDataArray'
,
self
.
node
,
self
.
writer
)
attributes
=
data_array
.
attributes
attributes
[
'format'
]
=
vtk_format
setAttributes
(
array_component
.
node
,
attributes
)
class
Writer
:
"""Generic XML handler for VTK files"""
def
__init__
(
self
,
vtk_format
,
compression
=
None
,
vtk_version
=
'0.1'
,
byte_order
=
'LittleEndian'
):
self
.
document
=
dom
.
getDOMImplementation
()
\
.
createDocument
(
None
,
'VTKFile'
,
None
)
self
.
root
=
self
.
document
.
documentElement
self
.
root
.
setAttribute
(
'type'
,
vtk_format
)
self
.
root
.
setAttribute
(
'version'
,
vtk_version
)
self
.
root
.
setAttribute
(
'byte_order'
,
byte_order
)
self
.
data_node
=
self
.
document
.
createElement
(
vtk_format
)
self
.
root
.
appendChild
(
self
.
data_node
)
self
.
size_indicator_bytes
=
np
.
dtype
(
np
.
uint32
)
.
itemsize
self
.
append_data_arrays
=
[]
if
compression
is
not
None
:
self
.
root
.
setAttribute
(
'compressor'
,
'vtkZLibDataCompressor'
)
if
type
(
compression
)
is
not
int
:
compression
=
-
1
else
:
if
compression
not
in
list
(
range
(
-
1
,
10
)):
raise
Exception
((
'compression level {} is not '
'recognized by zlib'
)
.
format
(
compression
))
self
.
compression
=
compression
def
setDataNodeAttributes
(
self
,
attributes
):
"""Set attributes for the entire dataset"""
setAttributes
(
self
.
data_node
,
attributes
)
def
registerPiece
(
self
,
attributes
=
{}):
"""Register a piece element"""
return
self
.
registerComponent
(
'Piece'
,
self
.
data_node
,
attributes
)
def
registerComponent
(
self
,
name
,
parent
,
attributes
=
{}):
comp
=
Component
(
name
,
parent
,
self
)
setAttributes
(
comp
.
node
,
attributes
)
return
comp
def
registerAppend
(
self
):
append_node
=
Component
(
'AppendedData'
,
self
.
root
,
self
)
setAttributes
(
append_node
.
node
,
{
'format'
:
'base64'
})
self
.
root
.
appendChild
(
append_node
.
node
)
data_str
=
b
"_"
for
data_array
in
self
.
append_data_arrays
:
data_str
+=
encodeArray
(
data_array
.
flat_data
)
text
=
self
.
document
.
createTextNode
(
data_str
.
decode
(
'ascii'
))
append_node
.
node
.
appendChild
(
text
)
def
write
(
self
,
fd
):
if
type
(
fd
)
==
str
:
with
open
(
fd
,
'w'
)
as
file
:
self
.
write
(
file
)
elif
issubclass
(
type
(
fd
),
io
.
TextIOBase
):
self
.
document
.
writexml
(
fd
,
indent
=
"
\n
"
,
addindent
=
" "
)
else
:
raise
RuntimeError
(
"Expected a path or "
+
"file handle, got {}"
.
format
(
type
(
fd
)))
def
__str__
(
self
):
"""Print XML to string"""
return
self
.
document
.
toprettyxml
()
Event Timeline
Log In to Comment