# -*- coding: utf-8 -*-
#!/bin/env python3
# Copyright (C) 2003-2007 Gaby Launay
# Author: Gaby Launay <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0
# This file is part of IMTreatment.
# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gc
import gzip
import os
from glob import glob
import warnings
try:
import cv2
IS_CV2 = True
except ImportError:
IS_CV2 = False
try:
import h5py
IS_H5PY = True
except ImportError:
IS_H5PY = False
import numpy as np
from ..core import (Points, Profile, ScalarField, SpatialScalarFields,
SpatialVectorFields, TemporalScalarFields,
TemporalVectorFields,
VectorField)
from ..vortex_detection import CritPoints
from ..utils import ProgressCounter, make_unit
from ..utils.types import ARRAYTYPES, STRINGTYPES
try:
import pickle as pickle
except:
import pickle
import scipy.io as spio
import scipy.misc as spmisc
from os import path
import re
[docs]def check_path(filepath, newfile=False):
"""
Normalize and check the validity of the given path to feed importation
functions.
"""
# check
if not isinstance(filepath, STRINGTYPES):
raise TypeError()
if not isinstance(newfile, bool):
raise ValueError()
# normalize
filepath = path.normpath(filepath)
# check validity
if newfile:
filepath, filename = path.split(filepath)
if not path.exists(filepath):
# split the path (to check existing part)
path_compos = []
p = filepath
while True:
p, f = path.split(p)
if f != "":
path_compos.append(f)
else:
if p != "":
path_compos.append(p)
break
# check validity recursively
valid_path = ""
while True:
if len(path_compos) == 0:
break
new_dir = path_compos.pop()
if new_dir == "":
break
new_tested_path = path.join(valid_path, new_dir)
if not path.exists(new_tested_path):
if valid_path == "":
valid_path = os.getcwd()
err_mess = r"'{}' directory/file not found in '{}' directory." \
.format(new_dir, valid_path)
raise FileNotFoundError(err_mess)
valid_path = new_tested_path
# returning
if newfile:
filepath = path.join(filepath, filename)
return filepath
[docs]def find_file_in_path(regs, dirpath, ask=False):
"""
Search recursively for a folder containing files matching a regular
expression, in the given root folder.
Parameters
----------
exts : list of string
List of regular expressions
dirpath : string
Root path to search from
ask : bool
If 'True', ask for the wanted folder and return only this one.
Returns
-------
folders : list of string
List of folder containings wanted files.
"""
# check
if not path.isdir(dirpath):
raise ValueError()
if not isinstance(regs, ARRAYTYPES):
raise TypeError()
regs = np.array(regs, dtype=str)
#
dir_paths = []
# recursive loop on folders
for root, dirs, files in os.walk(dirpath):
for f in files:
match = np.any([re.match(reg, f) for reg in regs])
if match:
break
if match:
dir_paths.append(root)
# choose
if ask and len(dir_paths) > 1:
print(("{} folders found :".format(len(dir_paths))))
for i, p in enumerate(dir_paths):
print((" {} : {}".format(i + 1, p)))
rep = 0
while rep not in np.arange(1, len(dir_paths)+1):
rep = input("Want to go with wich one ?\n")
try:
rep = int(rep)
except:
pass
dir_paths = [dir_paths[rep - 1]]
# return
return dir_paths
[docs]def matlab_parser(obj, name):
classic_types = (int, float, str, bool)
array_types = (list, float)
if obj is None:
return {name: "None"}
elif isinstance(obj, classic_types):
return {name: obj}
elif isinstance(obj, array_types):
simple = True
for val in obj:
if not isinstance(val, (classic_types, array_types)):
simple = False
if simple:
return {name: obj}
else:
return {name: [matlab_parser(obj[i], f"obj{i}")
for i in range(len(obj))]}
raise IOError("Matlab can't handle this kind of variable")
elif isinstance(obj, Points):
xy = obj.xy
xy = [list(comp) for comp in xy]
dic = matlab_parser(list(obj.v), 'v')
dic.update(matlab_parser(xy, 'xy'))
dic.update(matlab_parser(obj.unit_x.strUnit()[1:-1], 'unit_x'))
dic.update(matlab_parser(obj.unit_y.strUnit()[1:-1], 'unit_y'))
dic.update(matlab_parser(obj.unit_v.strUnit()[1:-1], 'unit_v'))
return {name: dic}
elif isinstance(obj, Profile):
mask = [int(val) for val in obj.mask]
dic = matlab_parser(list(obj.x), 'x')
dic.update(matlab_parser(list(obj.y), 'y'))
dic.update(matlab_parser(mask, 'mask'))
dic.update(matlab_parser(obj.unit_x.strUnit()[1:-1], 'unit_x'))
dic.update(matlab_parser(obj.unit_y.strUnit()[1:-1], 'unit_y'))
return {name: dic}
elif isinstance(obj, VectorField):
x = obj.axe_x
y = obj.axe_y
X, Y = np.meshgrid(x, y, indexing='ij')
X = [list(comp) for comp in X]
Y = [list(comp) for comp in Y]
comp_x = obj.comp_x
comp_x = [list(comp) for comp in comp_x]
comp_y = obj.comp_y
comp_y = [list(comp) for comp in comp_y]
mask = obj.mask
mask = [list(comp) for comp in mask]
dic = matlab_parser(list(obj.axe_x), 'axe_x')
dic.update(matlab_parser(list(obj.axe_y), 'axe_y'))
dic.update(matlab_parser(list(X), 'X'))
dic.update(matlab_parser(list(Y), 'Y'))
dic.update(matlab_parser(comp_x, 'comp_x'))
dic.update(matlab_parser(comp_y, 'comp_y'))
dic.update(matlab_parser(mask, 'mask'))
dic.update(matlab_parser(obj.unit_x.strUnit()[1:-1], 'unit_x'))
dic.update(matlab_parser(obj.unit_y.strUnit()[1:-1], 'unit_y'))
dic.update(matlab_parser(obj.unit_values.strUnit()[1:-1],
'unit_values'))
return {name: dic}
elif isinstance(obj, ScalarField):
x = obj.axe_x
y = obj.axe_y
X, Y = np.meshgrid(x, y, indexing='ij')
X = [list(comp) for comp in X]
Y = [list(comp) for comp in Y]
values = obj.values
values = [list(comp) for comp in values]
mask = obj.mask
mask = [list(comp) for comp in mask]
dic = matlab_parser(list(obj.axe_x), 'axe_x')
dic.update(matlab_parser(list(obj.axe_y), 'axe_y'))
dic.update(matlab_parser(list(X), 'X'))
dic.update(matlab_parser(list(Y), 'Y'))
dic.update(matlab_parser(values, 'values'))
dic.update(matlab_parser(mask, 'mask'))
dic.update(matlab_parser(obj.unit_x.strUnit()[1:-1], 'unit_x'))
dic.update(matlab_parser(obj.unit_y.strUnit()[1:-1], 'unit_y'))
dic.update(matlab_parser(obj.unit_values.strUnit()[1:-1],
'unit_values'))
return {name: dic}
elif isinstance(obj, CritPoints):
eps = obj.current_epsilon
if eps is not None:
foc_traj = list(obj.foc_traj)
foc_c_traj = list(obj.foc_c_traj)
node_i_traj = list(obj.node_i_traj)
node_o_traj = list(obj.node_o_traj)
sadd_traj = list(obj.sadd_traj)
else:
foc_traj = None
foc_c_traj = None
node_i_traj = None
node_o_traj = None
sadd_traj = None
foc = list(obj.foc)
foc_c = list(obj.foc_c)
node_i = list(obj.node_i)
node_o = list(obj.node_o)
sadd = list(obj.sadd)
times = list(obj.times)
unit_time = obj.unit_time.strUnit()
unit_x = obj.unit_x.strUnit()
unit_y = obj.unit_y.strUnit()
dic = matlab_parser(eps, "epsilon")
dic.update(matlab_parser(foc, "foc"))
dic.update(matlab_parser(foc_traj, "foc_traj"))
dic.update(matlab_parser(foc_c, "foc_c"))
dic.update(matlab_parser(foc_c_traj, "foc_c_traj"))
dic.update(matlab_parser(node_i, "node_i"))
dic.update(matlab_parser(node_o, "node_o"))
dic.update(matlab_parser(node_i_traj, "node_i_traj"))
dic.update(matlab_parser(node_o_traj, "node_o_traj"))
dic.update(matlab_parser(sadd, "sadd"))
dic.update(matlab_parser(sadd_traj, "sadd_traj"))
dic.update(matlab_parser(times, "times"))
dic.update(matlab_parser(unit_time, "unit_time"))
dic.update(matlab_parser(unit_x, "unit_x"))
dic.update(matlab_parser(unit_y, "unit_y"))
return {name: dic}
else:
raise IOError("Can't parser that : \n {}".format(obj))
[docs]def import_from_file(filepath, **kw):
"""
Load and return an object from the specified file using the JSON
format.
Additionnals arguments for the JSON decoder may be set with the **kw
argument. Such as'encoding' (to change the file
encoding, default='utf-8').
Parameters
----------
filepath : string
Path specifiing the file to load.
full_import : boolean
If 'True', everything is charged in memory, else, data are loaded in
memory when they are needed.
"""
# getting/guessing wanted files
filepath = check_path(filepath)
extension = path.splitext(filepath)[1]
# importing file
if extension == ".imt":
gc.disable()
with open(filepath, 'rb') as f:
obj = pickle.load(f)
gc.enable()
elif extension == ".cimt":
gc.disable()
with gzip.open(filepath, 'rb') as f:
obj = pickle.load(f)
gc.enable()
else:
raise IOError("File is not readable "
"(unknown extension : {})".format(extension))
return obj
[docs]def export_to_file(obj, filepath, compressed=True, **kw):
"""
Write the object in the specified file.
Additionnals arguments for the JSON encoder may be set with the **kw
argument.
If existing, specified file will be truncated. If not, it will
be created.
Parameters
----------
obj :
Object to store (common and IMT objects are supported).
filepath : string
Path specifiing where to save the object.
compressed : boolean, optional
If 'True' (default), the file is compressed using gzip.
"""
# checking parameters coherence
filepath = check_path(filepath, newfile=True)
if not isinstance(compressed, bool):
# creating/filling up the file
raise TypeError("'compressed' must be a boolean")
if compressed:
gc.disable()
if path.splitext(filepath)[1] != ".cimt":
filepath = filepath + ".cimt"
f = gzip.open(filepath, 'wb')
pickle.dump(obj, f, protocol=-1)
f.close()
gc.enable()
else:
gc.disable()
if path.splitext(filepath)[1] != ".imt":
filepath = filepath + ".imt"
f = open(filepath, 'wb')
pickle.dump(obj, f, protocol=-1)
f.close()
gc.enable()
[docs]def imts_to_imt(imts_path, imt_path, kind):
"""
Concatenate some .imt files to one .imt file.
Parameters
----------
imts_path : string
Path to the .imt files
imt_path : string
Path to store the new imt file.
kind : string
Kind of object for the new imt file
(can be 'TSF' for TemporalScalarFields, 'SSF' for SpatialScalarFields,
'TVF' for TemporalVectorFields, 'SVF' for SpatialVectorFields)
"""
# check parameters
imts_path = check_path(imts_path)
imt_path = check_path(imt_path, newfile=True)
if not isinstance(kind, STRINGTYPES):
raise TypeError()
# getting paths
paths = glob(imts_path + "/*")
# getting data type
if kind == 'TSF':
imts_type = 'SF'
fields = TemporalScalarFields()
elif kind == 'SSF':
imts_type = 'SF'
fields = SpatialScalarFields()
elif kind == 'TVF':
imts_type = 'VF'
fields = TemporalVectorFields()
elif kind == 'SVF':
imts_type = 'VF'
fields = SpatialVectorFields()
else:
raise ValueError()
# importing data
for tmp_path in paths:
basename = path.basename(tmp_path)
name, ext = path.splitext(basename)
if ext in ['.imt', '.cimt']:
field = import_from_file(tmp_path)
if imts_type == 'SF' and not isinstance(field, ScalarField):
continue
elif imts_type == 'VF' and not isinstance(field, VectorField):
continue
fields.add_field(field)
# saving data
export_to_file(fields, imt_path)
[docs]def import_from_matlab(filepath, obj, show_struct=False, **kwargs):
"""
Import data from a matlab (.m) file.
Data should be a dictionnary.
Parameters
----------
filepath : string
Path of the matlab file
obj : string in ['ScalarField', 'VectorField', 'Profile',
'Points']
Kind of object to import to.
show_struct: boolean
If True, just show the structure of the file
(and return it, so you can play with it)
kwargs :
Rest of the keyword arguments should indicate where to find
the necessary information in the matlab dictionnary.
Example
-------
With a matlab file containing a dictionnary with the
following entries: 'x', 'x_unit', 'y', 'y_unit', 'u', 'v'
>>> vf = import_from_matlab('data.m', 'VectorField',
... axe_x='x', axe_y='y',
... unit_x='x_unit', unit_y='y_unit',
... comp_x='u', comp_y='v')
"""
if not IS_H5PY:
raise Exception('h5py needs to be installed to import from matlab.')
filepath = check_path(filepath)
# Get matlab dictionnary
try:
data = spio.loadmat(filepath)
is_h5py = False
except NotImplementedError:
data = h5py.File(filepath, 'r')
is_h5py = True
if isinstance(data, np.ndarray):
keys = list(data.dtype.fields.keys())
else:
keys = list(data.keys())
# pivmat !
dataf = None
if 'Data' in keys and 'P' in keys:
dataf = data
data = dataf['Data']
if isinstance(data, np.ndarray):
keys = list(data.dtype.fields.keys())
else:
keys = list(data.keys())
# Create object
if obj == 'ScalarField':
res = ScalarField
default_dic = {'axe_x': 'axe_x', 'axe_y': 'axe_y', 'unit_x': 'unit_x',
'unit_y': 'unit_y', 'values': 'values', 'mask': 'mask',
'unit_values': 'unit_values'}
elif obj == 'VectorField':
res = VectorField
default_dic = {'axe_x': 'axe_x', 'axe_y': 'axe_y', 'comp_x': 'comp_x',
'comp_y': 'comp_y', 'mask': 'mask', 'unit_x': 'unit_x',
'unit_y': 'unit_y', 'unit_values': 'unit_values'}
elif obj == 'Profile':
res = Profile
default_dic = {'x': 'x', 'y': 'y', 'mask': 'mask', 'unit_x': 'unit_x',
'unit_y': 'unit_y'}
elif obj == 'Points':
res = Points
default_dic = {'xy': 'xy', 'v': 'v', 'unit_x': 'unit_x',
'unit_y': 'unit_y', 'unit_v': 'unit_v'}
default_dic.update(kwargs)
kwargs = default_dic
# Just show the structure
if show_struct:
if is_h5py:
print(f"Available keys in {filepath}:\n{keys})")
else:
print(f"Available keys in {filepath}:"
f"\n{keys}")
return data
# Create building dict
build_dic = {}
for key, entry in kwargs.items():
if entry in keys:
tdata = data[entry]
while len(tdata) == 1:
tdata = tdata[0]
if key in ['unit_x', 'unit_y', 'unit_values']:
if isinstance(tdata, str):
build_dic[key] = tdata
else:
build_dic[key] = ''.join(chr(n) for n in tdata)
else:
build_dic[key] = np.array(tdata)
if is_h5py and dataf is not None:
dataf.close()
# Fix unities
for key, item in build_dic.items():
if len(item) == 1:
build_dic[key] = build_dic[key][0]
# Fill the object
if obj in ['ScalarField', 'VectorField']:
ress = res()
ress.import_from_arrays(**build_dic)
else:
ress = res(**build_dic)
# Return
return ress
[docs]def export_to_matlab(obj, filepath, **kw):
filepath = check_path(filepath, newfile=True)
dic = matlab_parser(obj, "tmp")
spio.savemat(filepath, dic["tmp"], **kw)
[docs]def export_to_vtk(obj, filepath, axis=None, **kw):
"""
Export the field to a .vtk file, for Mayavi use.
Parameters
----------
filepath : string
Path where to write the vtk file.
axis : tuple of strings
By default, field axe are set to (x,y), if you want
different axis, you have to specified them here.
For example, "('z', 'y')", put the x field axis values
in vtk z axis, and y field axis in y vtk axis.
line : boolean (only for Points object)
If 'True', lines between points are writen instead of points.
"""
if isinstance(obj, ScalarField):
__export_sf_to_vtk(obj, filepath, axis)
elif isinstance(obj, VectorField):
__export_vf_to_vtk(obj, filepath, axis)
elif isinstance(obj, Points):
__export_pts_to_vtk(obj, filepath, **kw)
else:
raise TypeError("Cannot (yet) export this kind of object to vtk")
def __export_pts_to_vtk(pts, filepath, axis=None, line=False):
"""
Export the Points object to a .vtk file, for Mayavi use.
Parameters
----------
pts : Point object
.
filepath : string
Path where to write the vtk file.
axis : tuple of strings, optional
By default, points field axe are set to (x,y), if you want
different axis, you have to specified them here.
For example, "('z', 'y')", put the x points field axis values
in vtk z axis, and y points field axis in y vtk axis.
line : boolean, optional
If 'True', lines between points are writen instead of points.
"""
try:
import pyvtk
except ImportError:
raise Exception("You need to install pyvtk to use this "
"functionnality")
if not path.exists(path.dirname(filepath)):
raise ValueError("'filepath' is not a valid path")
if axis is None:
axis = ('x', 'y')
if not isinstance(axis, ARRAYTYPES):
raise TypeError("'axis' must be a 2x1 tuple")
if not isinstance(axis[0], STRINGTYPES) \
or not isinstance(axis[1], STRINGTYPES):
raise TypeError("'axis' must be a 2x1 tuple of strings")
if not axis[0] in ['x', 'y', 'z'] or not axis[1] in ['x', 'y', 'z']:
raise ValueError("'axis' strings must be 'x', 'y' or 'z'")
if axis[0] == axis[1]:
raise ValueError("'axis' strings must be different")
if not isinstance(line, bool):
raise TypeError("'line' must be a boolean")
v = pts.v
x = pts.xy[:, 0]
y = pts.xy[:, 1]
if v is None:
v = np.zeros(pts.xy.shape[0])
point_data = pyvtk.PointData(pyvtk.Scalars(v, 'Points values'))
x_vtk = np.zeros(pts.xy.shape[0])
y_vtk = np.zeros(pts.xy.shape[0])
z_vtk = np.zeros(pts.xy.shape[0])
if axis[0] == 'x':
x_vtk = x
elif axis[0] == 'y':
y_vtk = x
else:
z_vtk = x
if axis[1] == 'x':
x_vtk = y
elif axis[1] == 'y':
y_vtk = y
else:
z_vtk = y
pts = list(zip(x_vtk, y_vtk, z_vtk))
vertex = np.arange(x_vtk.shape[0])
if line:
grid = pyvtk.UnstructuredGrid(pts, line=vertex)
else:
grid = pyvtk.UnstructuredGrid(pts, vertex=vertex)
data = pyvtk.VtkData(grid, 'Scalar Field from python', point_data)
data.tofile(filepath)
def __export_sf_to_vtk(obj, filepath, axis=None):
"""
Export the scalar field to a .vtk file, for Mayavi use.
Parameters
----------
filepath : string
Path where to write the vtk file.
axis : tuple of strings
By default, scalar field axe are set to (x,y), if you want
different axis, you have to specified them here.
For example, "('z', 'y')", put the x scalar field axis values
in vtk z axis, and y scalar field axis in y vtk axis.
"""
import pyvtk
if not path.exists(path.dirname(filepath)):
raise ValueError("'filepath' is not a valid path")
if axis is None:
axis = ('x', 'y')
if not isinstance(axis, ARRAYTYPES):
raise TypeError("'axis' must be a 2x1 tuple")
if not isinstance(axis[0], STRINGTYPES) \
or not isinstance(axis[1], STRINGTYPES):
raise TypeError("'axis' must be a 2x1 tuple of strings")
if not axis[0] in ['x', 'y', 'z'] or not axis[1] in ['x', 'y', 'z']:
raise ValueError("'axis' strings must be 'x', 'y' or 'z'")
if axis[0] == axis[1]:
raise ValueError("'axis' strings must be different")
V = obj.values.flatten()
x = obj.axe_x
y = obj.axe_y
point_data = pyvtk.PointData(pyvtk.Scalars(V, 'Scalar Field'))
x_vtk = 0.
y_vtk = 0.
z_vtk = 0.
if axis[0] == 'x':
x_vtk = x
elif axis[0] == 'y':
y_vtk = x
else:
z_vtk = x
if axis[1] == 'x':
x_vtk = y
elif axis[1] == 'y':
y_vtk = y
else:
z_vtk = y
grid = pyvtk.RectilinearGrid(x_vtk, y_vtk, z_vtk)
data = pyvtk.VtkData(grid, 'Scalar Field from python', point_data)
data.tofile(filepath)
def __export_vf_to_vtk(obj, filepath, axis=None):
"""
Export the vector field to a .vtk file, for Mayavi use.
Parameters
----------
filepath : string
Path where to write the vtk file.
axis : tuple of strings
By default, scalar field axe are set to (x,y), if you want
different axis, you have to specified them here.
For example, "('z', 'y')", put the x scalar field axis values
in vtk z axis, and y scalar field axis in y vtk axis.
"""
import pyvtk
if not path.exists(path.dirname(filepath)):
raise ValueError("'filepath' is not a valid path")
if axis is None:
axis = ('x', 'y')
if not isinstance(axis, ARRAYTYPES):
raise TypeError("'axis' must be a 2x1 tuple")
if not isinstance(axis[0], STRINGTYPES) \
or not isinstance(axis[1], STRINGTYPES):
raise TypeError("'axis' must be a 2x1 tuple of strings")
if not axis[0] in ['x', 'y', 'z'] or not axis[1] in ['x', 'y', 'z']:
raise ValueError("'axis' strings must be 'x', 'y' or 'z'")
if axis[0] == axis[1]:
raise ValueError("'axis' strings must be different")
Vx, Vy = obj.comp_x, obj.comp_y
Vx = Vx.flatten()
Vy = Vy.flatten()
x, y = obj.axe_x, obj.axe_y
x_vtk = 0.
y_vtk = 0.
z_vtk = 0.
vx_vtk = np.zeros(Vx.shape)
vy_vtk = np.zeros(Vx.shape)
vz_vtk = np.zeros(Vx.shape)
if axis[0] == 'x':
x_vtk = x
vx_vtk = Vx
elif axis[0] == 'y':
y_vtk = x
vy_vtk = Vx
else:
z_vtk = x
vz_vtk = Vx
if axis[1] == 'x':
x_vtk = y
vx_vtk = Vy
elif axis[1] == 'y':
y_vtk = y
vy_vtk = Vy
else:
z_vtk = y
vz_vtk = Vy
vect = list(zip(vx_vtk, vy_vtk, vz_vtk))
point_data = pyvtk.PointData(pyvtk.Vectors(vect, "Vector field"))
grid = pyvtk.RectilinearGrid(x_vtk, y_vtk, z_vtk)
data = pyvtk.VtkData(grid, 'Vector Field from python', point_data)
data.tofile(filepath)
def _get_imx_buffers(filename):
"""
Return the buffers stored in the given file.
"""
try:
import ReadIM
except ModuleNotFoundError:
raise Exception("You need the 'ReadIM' module to import from davis"
" files")
vbuff, vatts = ReadIM.extra.get_Buffer_andAttributeList(filename)
arrays, vbuff2 = ReadIM.extra.buffer_as_array(vbuff)
arrays = np.array(arrays.transpose((0, 2, 1)))
atts = ReadIM.extra.att2dict(vatts)
fmt = vbuff.image_sub_type
vectorGrid = vbuff.vectorGrid
ReadIM.DestroyBuffer(vbuff)
ReadIM.DestroyBuffer(vbuff2)
return fmt, vectorGrid, arrays, atts
[docs]def import_from_IM7(filename, infos=False):
"""
Import a scalar field from a .IM7 file.
Parameters
----------
filename : string
Path to the IM7 file.
infos : boolean, optional
If 'True', also return a dictionary with informations on the im7
"""
if not isinstance(filename, STRINGTYPES):
raise TypeError("'filename' must be a string")
if not path.exists(filename):
raise ValueError("I did not find your file, boy")
_, ext = path.splitext(filename)
if not (ext == ".im7" or ext == ".IM7"):
raise ValueError("I need the file to be an IM7 file (not a {} file)"
.format(ext))
# Importing from buffer
fmt, vectorGrid, v_array, atts = _get_imx_buffers(filename)
if v_array.shape[0] == 2:
mask = v_array[0][:, ::1]
values = v_array[1][:, ::1]
elif v_array.shape[0] == 1:
values = v_array[0][:, ::1]
mask = np.zeros(values.shape, dtype=bool)
# Values and Mask
scale_i = atts['_SCALE_I']
scale_i = scale_i.split("\n")
scale_val = scale_i[0].split(' ')
unit_values = scale_i[1]
try:
values *= int(scale_val[0])
values += int(scale_val[1])
except ValueError:
values *= float(scale_val[0])
values += float(scale_val[1])
# X
scale_x = atts['_SCALE_X']
scale_x = scale_x.split("\n")
unit_x = scale_x[1]
scale_val = scale_x[0].split(' ')
x_init = float(scale_val[1])
dx = float(scale_val[0])
len_axe_x = values.shape[0]
if dx < 0:
axe_x = x_init + np.arange(len_axe_x - 1, -1, -1)*dx
values = values[::-1, :]
mask = mask[::-1, :]
else:
axe_x = x_init + np.arange(len_axe_x)*dx
# Y
scale_y = atts['_SCALE_Y']
scale_y = scale_y.split("\n")
unit_y = scale_y[1]
scale_val = scale_y[0].split(' ')
y_init = float(scale_val[1])
dy = float(scale_val[0])
len_axe_y = values.shape[1]
if dy < 0:
axe_y = y_init + np.arange(len_axe_y - 1, -1, -1)*dy
values = values[:, ::-1]
mask = mask[:, ::-1]
else:
axe_y = y_init + np.arange(len_axe_y)*dy
# returning
tmpsf = ScalarField()
tmpsf.import_from_arrays(axe_x=axe_x, axe_y=axe_y, values=values,
mask=mask,
unit_x=unit_x, unit_y=unit_y,
unit_values=unit_values)
if infos:
return tmpsf, atts
else:
return tmpsf
[docs]def import_from_IM7s(fieldspath, kind='TSF', fieldnumbers=None, incr=1):
"""
Import scalar fields from .IM7 files.
'fieldspath' should be a tuple of path to im7 files.
All im7 file present in the folder are imported.
Parameters
----------
fieldspath : string or tuple of string
kind : string, optional
Kind of object to create with IM7 files.
(can be 'TSF' for TemporalScalarFields
or 'SSF' for SpatialScalarFields).
fieldnumbers : 2x1 tuple of int
Interval of fields to import, default is all.
incr : integer
Incrementation between fields to take. Default is 1, meaning all
fields are taken.
"""
# check parameters
if isinstance(fieldspath, ARRAYTYPES):
if not isinstance(fieldspath[0], STRINGTYPES):
raise TypeError("'fieldspath' must be a string or a tuple of"
" string")
fieldspaths = np.asarray(fieldspath, dtype=str)
elif isinstance(fieldspath, STRINGTYPES):
fieldspath = check_path(fieldspath)
paths = np.asarray([f for f in glob(path.join(fieldspath, '*'))
if path.splitext(f)[-1] in ['.im7', '.IM7']],
dtype=str)
# if no file found, search recursively
if len(paths) == 0:
poss_paths = find_file_in_path(['.*.im7', '.*.IM7'], fieldspath,
ask=True)
if len(poss_paths) == 0:
raise ValueError()
paths = np.asarray([f
for f in glob(path.join(poss_paths[0], '*'))
if path.splitext(f)[-1] in ['.im7', '.IM7']],
dtype=str)
# Sort path by numbers
filenames = [path.basename(p) for p in paths]
ind_sort = np.argsort(filenames)
fieldspaths = paths[ind_sort]
else:
raise TypeError()
if fieldnumbers is not None:
if not isinstance(fieldnumbers, ARRAYTYPES):
raise TypeError("'fieldnumbers' must be a 2x1 array")
if not len(fieldnumbers) == 2:
raise TypeError("'fieldnumbers' must be a 2x1 array")
if not isinstance(fieldnumbers[0], int) \
or not isinstance(fieldnumbers[1], int):
raise TypeError("'fieldnumbers' must be an array of integers")
else:
fieldnumbers = [0, len(fieldspaths)]
if not isinstance(incr, int):
raise TypeError("'incr' must be an integer")
if incr <= 0:
raise ValueError("'incr' must be positive")
# Import
if kind == 'TSF':
fields = TemporalScalarFields()
elif kind == 'SSF':
fields = SpatialScalarFields()
else:
raise ValueError()
start = fieldnumbers[0]
end = fieldnumbers[1]
t = 0.
# loop on files
for p in fieldspaths[start:end:incr]:
tmp_sf, infos = import_from_IM7(p, infos=True)
try:
dt = infos['FrameDt0'].split()
except KeyError:
dt = [1., ""]
unit_time = make_unit(dt[1])
dt = float(dt[0])
t += dt*incr
if kind == 'TSF':
fields.add_field(tmp_sf, t, unit_time)
else:
fields.add_field(tmp_sf)
return fields
[docs]def import_from_VC7(filename, infos=False, add_fields=False):
"""
Import a vector field or a velocity field from a .VC7 file
Parameters
----------
filename : string
Path to the file to import.
infos : boolean, optional
If 'True', also return a dictionary with informations on the im7
add_fields : boolean, optional
If 'True', also return a tuple containing additional fields
contained in the vc7 field (peak ratio, correlation value, ...)
"""
# check parameters
filename = check_path(filename)
_, ext = path.splitext(filename)
if not (ext == ".vc7" or ext == ".VC7"):
raise ValueError("'filename' must be a vc7 file")
# Importing from buffer
fmt, vectorGrid, v_array, atts = _get_imx_buffers(filename)
# Values and Mask
if fmt == 2:
Vx = v_array[0]
Vy = v_array[1]
mask = np.zeros(Vx.shape, dtype=bool)
elif fmt == 3 or fmt == 1:
mask = np.logical_not(v_array[0])
mask2 = np.logical_not(v_array[9])
mask = np.logical_or(mask, mask2)
Vx = v_array[1]
Vy = v_array[2]
mask = np.logical_or(mask, np.logical_and(Vx == 0., Vy == 0.))
# additional fields if necessary
if add_fields and fmt in [1, 3]:
suppl_fields = []
for i in np.arange(4, v_array.shape[0]):
suppl_fields.append(np.transpose(np.array(v_array[i])))
# Get and apply scale on values
scale_i = atts['_SCALE_I']
scale_i = scale_i.split("\n")
unit_values = scale_i[1]
scale_val = scale_i[0].split(' ')
try:
Vx *= int(scale_val[0])
Vx += int(scale_val[1])
Vy *= int(scale_val[0])
Vy += int(scale_val[1])
except ValueError:
Vx *= float(scale_val[0])
Vx += float(scale_val[1])
Vy *= float(scale_val[0])
Vy += float(scale_val[1])
# Get and apply scale on X
scale_x = atts['_SCALE_X']
scale_x = scale_x.split("\n")
unit_x = scale_x[1]
scale_val = scale_x[0].split(' ')
x_init = float(scale_val[1])
dx = float(scale_val[0])*vectorGrid
len_axe_x = Vx.shape[0]
if dx < 0:
axe_x = x_init + np.arange(len_axe_x - 1, -1, -1)*dx
Vx = -Vx[::-1, :]
Vy = Vy[::-1, :]
mask = mask[::-1, :]
if add_fields:
for i in np.arange(len(suppl_fields)):
suppl_fields[i] = suppl_fields[i][::-1, :]
else:
axe_x = x_init + np.arange(len_axe_x)*dx
# Get and apply scale on Y
scale_y = atts['_SCALE_Y']
scale_y = scale_y.split("\n")
unit_y = scale_y[1]
scale_val = scale_y[0].split(' ')
y_init = float(scale_val[1])
dy = float(scale_val[0])*vectorGrid
len_axe_y = Vx.shape[1]
if dy < 0 or scale_y[1] == 'pixel':
axe_y = y_init + np.arange(len_axe_y - 1, -1, -1)*dy
Vx = Vx[:, ::-1]
Vy = -Vy[:, ::-1]
mask = mask[:, ::-1]
if add_fields:
for i in np.arange(len(suppl_fields)):
suppl_fields[i] = suppl_fields[i][:, ::-1]
else:
axe_y = y_init + np.arange(len_axe_y)*dy
# returning
tmpvf = VectorField()
tmpvf.import_from_arrays(axe_x, axe_y, Vx, Vy, mask=mask, unit_x=unit_x,
unit_y=unit_y, unit_values=unit_values)
if not infos and not add_fields:
return tmpvf
res = ()
res += (tmpvf,)
if infos:
res += (atts,)
if add_fields:
add_fields = []
for i in np.arange(len(suppl_fields)):
tmp_field = ScalarField()
tmp_field.import_from_arrays(axe_x, axe_y, suppl_fields[i],
unit_x=unit_x, unit_y=unit_y,
unit_values='')
add_fields.append(tmp_field)
res += (add_fields,)
return res
[docs]def import_from_VC7s(fieldspath, kind='TVF', fieldnumbers=None, incr=1,
add_fields=False, verbose=False):
"""
Import velocity fields from .VC7 files.
'fieldspath' should be a tuple of path to vc7 files.
All vc7 file present in the folder are imported.
Parameters
----------
fieldspath : string or tuple of string
If no '.vc7' are found directly under 'fieldspath', present folders are
recursively serached for '.vc7' files.
kind : string, optional
Kind of object to create with VC7 files.
(can be 'TVF' or 'SVF').
fieldnumbers : 2x1 tuple of int
Interval of fields to import, default is all.
incr : integer
Incrementation between fields to take. Default is 1, meaning all
fields are taken.
add_fields : boolean, optional
If 'True', also return a tuple containing additional fields
contained in the vc7 field (peak ratio, correlation value, ...).
Verbose : bool, optional
.
"""
# check and adpat 'fieldspath'
if isinstance(fieldspath, ARRAYTYPES):
if not isinstance(fieldspath[0], STRINGTYPES):
raise TypeError("'fieldspath' must be a string or a tuple of"
" string")
paths = fieldspath
elif isinstance(fieldspath, STRINGTYPES):
fieldspath = check_path(fieldspath)
paths = np.array([f for f in glob(path.join(fieldspath, '*'))
if path.splitext(f)[-1] in ['.vc7', '.VC7']])
# if no file found, search recursively
if len(paths) == 0:
poss_paths = find_file_in_path(['.*.vc7', '.*.VC7'], fieldspath,
ask=True)
if len(poss_paths) == 0:
raise ValueError()
paths = np.array([f for f in glob(path.join(poss_paths[0], '*'))
if path.splitext(f)[-1] in ['.vc7', '.VC7']])
# Sort path by numbers
filenames = [path.basename(p) for p in paths]
ind_sort = np.argsort(filenames)
paths = paths[ind_sort]
else:
raise TypeError()
# check and adapt 'fieldnumbers'
if fieldnumbers is not None:
if not isinstance(fieldnumbers, ARRAYTYPES):
raise TypeError("'fieldnumbers' must be a 2x1 array")
if not len(fieldnumbers) == 2:
raise TypeError("'fieldnumbers' must be a 2x1 array")
if not isinstance(fieldnumbers[0], int) \
or not isinstance(fieldnumbers[1], int):
raise TypeError("'fieldnumbers' must be an array of integers")
else:
fieldnumbers = [0, len(paths)]
# check and adpat 'incr'
if not isinstance(incr, int):
raise TypeError("'incr' must be an integer")
if incr <= 0:
raise ValueError("'incr' must be positive")
# Prepare containers
if kind == 'TVF':
fields = TemporalVectorFields()
elif kind == 'SVF':
fields = SpatialVectorFields()
else:
raise ValueError()
# initialize counter
start = fieldnumbers[0]
end = fieldnumbers[1]
nmb_files = int((end - start)/incr)
pc = ProgressCounter(init_mess="Begin importation of {} VC7 files"
.format(nmb_files),
nmb_max=nmb_files,
name_things="VC7 files",
perc_interv=10)
# loop on files
t = 0.
if add_fields:
tmp_vf, add_fields = import_from_VC7(paths[0], add_fields=True)
suppl_fields = [TemporalScalarFields() for field in add_fields]
for i, p in enumerate(paths[start:end:incr]):
if verbose:
pc.print_progress()
if add_fields:
tmp_vf, infos, add_fields = import_from_VC7(p, infos=True,
add_fields=True)
dt = infos['FrameDt0'].split()
unit_time = make_unit(dt[1])
dt = float(dt[0])
t += dt*incr
fields.add_field(tmp_vf, t, unit_time)
for i, f in enumerate(add_fields):
suppl_fields[i].add_field(f, t, unit_time)
else:
tmp_vf, infos = import_from_VC7(p, infos=True)
try:
dt = infos['FrameDt0'].split()
unit_time = make_unit(dt[1])
dt = float(dt[0])
except KeyError:
dt = 1.
unit_time = make_unit("")
t += dt*incr
fields.add_field(tmp_vf, t, unit_time)
# return
if add_fields:
return fields, suppl_fields
else:
return fields
[docs]def IM7_to_imt(im7_path, imt_path, kind='SF', compressed=True, **kwargs):
"""
Transfome an IM7 (davis) file into a, imt exploitable file.
Parameters
----------
im7_path : path to file or directory
Path to the IM7 file(s) , can be path to a single file or path to
a directory contening multiples files.
imt_path : path to file or directory
Path where to save imt files, has to be the same type of path
than 'im7_path' (path to file or path to directory)
kind : string
Kind of object to store (can be 'TSF' for TemporalScalarFields,
'SSF' for SpatialScalarFields or 'SF' for multiple ScalarField)
compressed : boolean, optional
If 'True' (default), the file is compressed using gzip.
kwargs : dict, optional
Additional arguments for 'import_from_***()'.
"""
# checking parameters
im7_path = check_path(im7_path)
imt_path = check_path(imt_path, newfile=True)
# checking if file or directory
if path.isdir(im7_path):
if kind in ['SSF', 'TSF']:
ST_SF = import_from_IM7s(im7_path, kind=kind, **kwargs)
export_to_file(ST_SF, imt_path)
elif kind in ['SF']:
paths = glob(im7_path + "/*")
for tmp_path in paths:
name_ext = path.basename(tmp_path)
name, ext = path.splitext(name_ext)
if ext not in ['.im7', '.IM7']:
continue
SF = import_from_IM7(tmp_path)
export_to_file(SF, imt_path + "/{}".format(name),
compressed=compressed)
elif path.isfile(im7_path):
SF = import_from_IM7(im7_path, **kwargs)
export_to_file(SF, imt_path)
else:
raise ValueError()
[docs]def VC7_to_imt(vc7_path, imt_path, kind='VF', compressed=True, **kwargs):
"""
Transfome an VC7 (davis) file into a, imt exploitable file.
Parameters
----------
vc7_path : path to file or directory
Path to the VC7 file(s) , can be path to a single file or path to
a directory contening multiples files.
imt_path : path to file
Path where to save imt file.
kind : string
Kind of object to store (can be 'TVF' for TemporalVectorFields,
'SVF' for SpatialVectorFields or 'VF' for multiple VectorField)
compressed : boolean, optional
If 'True' (default), the file is compressed using gzip.
kwargs : dict, optional
Additional arguments for 'import_from_***()'.
"""
# checking parameters
vc7_path = check_path(vc7_path)
imt_path = check_path(imt_path)
# checking if file or directory
if path.isdir(vc7_path):
if kind in ['SVF', 'TVF']:
ST_VF = import_from_VC7s(vc7_path, kind=kind, **kwargs)
export_to_file(ST_VF, imt_path)
elif kind in ['VF']:
paths = glob(vc7_path + "/*")
for tmp_path in paths:
name_ext = path.basename(tmp_path)
name, ext = path.splitext(name_ext)
if ext not in ['.vc7', '.VC7']:
continue
VF = import_from_VC7(tmp_path)
export_to_file(VF, imt_path + "/{}".format(name),
compressed=compressed)
elif path.isfile(vc7_path):
SF = import_from_IM7(vc7_path, **kwargs)
export_to_file(SF, imt_path)
else:
raise ValueError()
[docs]def import_from_picture(filepath, axe_x=None, axe_y=None, unit_x='', unit_y='',
unit_values='', dtype=float):
"""
Import a scalar field from a picture file.
Parameters
----------
filepath : string
Path to the picture file.
axe_x :
.
axe_y :
.
unit_x :
.
unit_y :
.
unit_values :
.
dtype :
Type of the stored values (default to float)
Returns
-------
tmp_sf :
.
"""
usable_ext = ['.png', '.PNG', '.jpg', '.JPG', '.jpeg', '.JPEG', '.bmp',
'.BMP']
filepath = check_path(filepath)
_, ext = path.splitext(filepath)
if ext not in usable_ext:
raise ValueError("I need the file to be an supported picture file"
"(not a {} file)".format(ext))
# importing from file
values = spmisc.imread(filepath, flatten=True).transpose()[:, ::-1]
values = np.asarray(values, dtype=dtype)
# set axis
if axe_x is None:
axe_x = np.arange(values.shape[0])
else:
if len(axe_x) != values.shape[0]:
raise ValueError()
if axe_y is None:
axe_y = np.arange(values.shape[1])
else:
if len(axe_y) != values.shape[1]:
raise ValueError()
# create SF
tmp_sf = ScalarField()
tmp_sf.import_from_arrays(axe_x, axe_y, values, unit_x=unit_x,
unit_y=unit_y, unit_values=unit_values,
dtype=dtype)
# return
return tmp_sf
[docs]def import_from_pictures(filepath, axe_x=None, axe_y=None, unit_x='',
unit_y='', unit_values='', times=None,
unit_times='', dtype=float, fieldnumbers=None, incr=1,
verbose=False):
"""
Import scalar fields from a bunch of picture files.
Parameters
----------
filepath : string
regex matching the files.
axe_x :
.
axe_y :
.
unit_x :
.
unit_y :
.
unit_values :
.
dtype :
Type of the stored values (default to float)
fieldnumbers: 2x1 array
Interval of fields to import, default is all.
incr : integer
Increment (incr=2 will load only 1 picture over 2).
Returns
-------
tmp_sf :
.
"""
# get paths
# filepath = check_path(filepath)
paths = glob(filepath)
paths = sorted(paths)
tmp_tsf = TemporalScalarFields()
# check times
if times is None:
times = np.arange(len(paths))
elif len(times) != len(paths):
raise ValueError()
# filter by field number
if fieldnumbers is None:
start = 0
end = len(paths)
else:
if fieldnumbers[0] < 0:
raise ValueError()
if fieldnumbers[1] > len(paths):
fieldnumbers[1] = len(paths)
start = fieldnumbers[0]
end = fieldnumbers[1]
if verbose:
pg = ProgressCounter(init_mess="Importing pictures",
nmb_max=int((end - start + 1)/incr),
name_things="pictures")
# loop on paths
for i in np.arange(start, end, incr):
tmp_sf = import_from_picture(paths[i], axe_x=axe_x, axe_y=axe_y,
unit_x=unit_x, unit_y=unit_y,
unit_values=unit_values,
dtype=dtype)
tmp_tsf.add_field(tmp_sf, times[i], unit_times=unit_times)
if verbose:
pg.print_progress()
# returning
return tmp_tsf
[docs]def import_from_video(filepath, dx=1, dy=1, unit_x='',
unit_y='', unit_values='', dt=1,
unit_times='', dtype=float, frame_inds=None, incr=1,
verbose=False):
"""
Import scalar fields from a video.
Parameters
----------
filepath : string
regex matching the files.
dx :
.
dy :
.
unit_x :
.
unit_y :
.
unit_values :
.
dtype :
Type of the stored values (default to float)
frame_inds: 2x1 array
Interval of fields to import, default is all.
incr : integer
Increment (incr=2 will load only 1 picture over 2).
Returns
-------
tmp_sf :
.
"""
# Chheck if cv2 is available
if not IS_CV2:
raise Exception('This feature needs opencv to be installed')
# Check file
filepath = check_path(filepath)
paths = glob(filepath)
paths = sorted(paths)
tmp_tsf = TemporalScalarFields()
# Open video stream
vid = cv2.VideoCapture()
vid.open(filepath)
max_frame = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
# filter by field number
if frame_inds is None:
frame_inds = [0, max_frame]
if frame_inds[1] > max_frame:
frame_inds[1] = max_frame
if verbose:
start = frame_inds[0]
end = frame_inds[1]
pg = ProgressCounter(init_mess="Importing video",
nmb_max=int((end - start + 1)/incr),
name_things="frames")
# loop on paths
tsf = TemporalScalarFields()
t = 0
for i in np.arange(0, frame_inds[1], 1):
if i < frame_inds[0] or i % incr != 0:
t += dt
vid.grab()
continue
success, im = vid.read()
if not success:
if frame_inds[1] != np.inf:
warnings.warn(f"Can't decode frame number {i}")
break
im = cv2.cvtColor(im, cv2.COLOR_RGB2GRAY)
im = im.transpose()[:, ::-1]
axe_x = np.arange(0, im.shape[0]*dx - 0.1*dx, dx)
axe_y = np.arange(0, im.shape[1]*dy - 0.1*dy, dy)
sf = ScalarField()
sf.import_from_arrays(axe_x, axe_y, im, mask=False,
unit_x=unit_x, unit_y=unit_y,
dtype=dtype)
tsf.add_field(sf, time=t, unit_times=unit_times, copy=False)
t += dt
if verbose:
pg.print_progress()
# returning
return tsf
[docs]def export_to_picture(SF, filepath):
"""
Export a scalar field to a picture file.
Parameters
----------
SF :
.
filepath : string
Path to the picture file.
"""
filepath = check_path(filepath)
values = SF.values[:, ::-1].transpose()
spmisc.imsave(filepath, values)
[docs]def export_to_pictures(SFs, filepath):
"""
Export scalar fields to a picture file.
Parameters
----------
SF :
.
filename : string
Path to the picture file. Should include a name for the image
(without the extension).
"""
#check
filepath = check_path(filepath, newfile=True)
# get
values = []
if isinstance(SFs, ARRAYTYPES):
for i in np.arange(len(SFs)):
values.append(SFs[i].values)
elif isinstance(SFs, (SpatialScalarFields, TemporalScalarFields)):
for i in np.arange(len(SFs.fields)):
values.append(SFs.fields[i].values[:, ::-1].transpose())
# save
for i, val in enumerate(values):
spmisc.imsave("{}_{:0>5}.png".format(filepath, i), val)
[docs]def export_to_video(SFs, filepath, fps=24, colormap=None):
"""
Export scalar fields to a video file.
Parameters
----------
SF :
.
filename : string
Path to the video file.
"""
#check
filepath = check_path(filepath, newfile=True)
# get
values = []
if isinstance(SFs, ARRAYTYPES):
for i in np.arange(len(SFs)):
values.append(SFs[i].values)
elif isinstance(SFs, (SpatialScalarFields, TemporalScalarFields)):
for i in np.arange(len(SFs.fields)):
values.append(SFs.fields[i].values[:, ::-1].transpose())
values = np.array(values)
# normalize between 0 and 255
maxi = np.max(values)
mini = np.min(values)
values = (values - mini)/(maxi - mini)
# save as a video using opencv
vid = cv2.VideoWriter(filename=filepath,
fourcc=cv2.VideoWriter_fourcc(*"MJPG"),
fps=fps,
frameSize=(values[0].shape[1],
values[0].shape[0]))
for val in values:
if colormap is None:
tmpval = cv2.cvtColor((val*255).astype('uint8'),
cv2.COLOR_GRAY2RGB)
else:
tmpval = np.array(colormap(val)[:, :, 0:3]*255,
dtype=np.uint8)
tmpval = tmpval[:, :, ::-1]
# # TEMP
# tmpval2 = cv2.cvtColor((val*255).astype('uint8'),
# cv2.COLOR_GRAY2RGB)
# tmpval[:, 0:int(tmpval.shape[1]/2), :] \
# = tmpval2[:, 0:int(tmpval.shape[1]/2), :]
# # TEMP - End
vid.write(tmpval)
vid.release()
[docs]def import_profile_from_ascii(filepath, x_col=1, y_col=2,
unit_x=make_unit(""), unit_y=make_unit(""),
**kwargs):
"""
Import a Profile object from an ascii file.
Parameters
----------
x_col, y_col : integer, optional
Colonne numbers for the given variables (begining at 1).
unit_x, unit_y : Unit objects, optional
Unities for the given variables.
**kwargs :
Possibles additional parameters are the same as those used in the
numpy function 'genfromtext()' :
'delimiter' to specify the delimiter between colonnes.
'skip_header' to specify the number of colonne to skip at file
begining
"""
# check
filepath = check_path(filepath)
# validating parameters
if not isinstance(x_col, int) or not isinstance(y_col, int):
raise TypeError("'x_col', 'y_col' must be integers")
if x_col < 1 or y_col < 1:
raise ValueError("Colonne number out of range")
# 'names' deletion, if specified (dangereux pour la suite)
if 'names' in kwargs:
kwargs.pop('names')
# extract data from file
data = np.genfromtxt(filepath, **kwargs)
# get axes
x = data[:, x_col-1]
y = data[:, y_col-1]
prof = Profile(x, y, mask=False, unit_x=unit_x, unit_y=unit_y)
return prof
[docs]def import_pts_from_ascii(filepath, x_col=1, y_col=2, v_col=None,
unit_x=make_unit(""), unit_y=make_unit(""),
unit_v=make_unit(""), **kwargs):
"""
Import a Points object from an ascii file.
Parameters
----------
x_col, y_col, v_col : integer, optional
Colonne numbers for the given variables
(begining at 1).
unit_x, unit_y, unit_v : Unit objects, optional
Unities for the given variables.
**kwargs :
Possibles additional parameters are the same as those used in the
numpy function 'genfromtext()' :
'delimiter' to specify the delimiter between colonnes.
'skip_header' to specify the number of colonne to skip at file
begining
...
"""
# check
filepath = check_path(filepath)
# validating parameters
if v_col is None:
v_col = 0
if not isinstance(x_col, int) or not isinstance(y_col, int)\
or not isinstance(v_col, int):
raise TypeError("'x_col', 'y_col' and 'v_col' must be integers")
if x_col < 1 or y_col < 1:
raise ValueError("Colonne number out of range")
# 'names' deletion, if specified (dangereux pour la suite)
if 'names' in kwargs:
kwargs.pop('names')
# extract data from file
data = np.genfromtxt(filepath, **kwargs)
# get axes
x = data[:, x_col-1]
y = data[:, y_col-1]
if v_col != 0:
v = data[:, v_col-1]
else:
v = None
return Points(list(zip(x, y)), v, unit_x, unit_y, unit_v)
[docs]def import_sf_from_ascii(filepath, x_col=1, y_col=2, vx_col=3,
unit_x=make_unit(""),
unit_y=make_unit(""),
unit_values=make_unit(""), **kwargs):
"""
Import a scalarfield from an ascii file.
Parameters
----------
x_col, y_col, vx_col: integer, optional
Colonne numbers for the given variables
(begining at 1).
unit_x, unit_y, unit_v : Unit objects, optional
Unities for the given variables.
**kwargs :
Possibles additional parameters are the same as those used in the
numpy function 'genfromtext()' :
'delimiter' to specify the delimiter between colonnes.
'skip_header' to specify the number of colonne to skip at file
begining
...
"""
# check
filepath = check_path(filepath)
# validating parameters
if not isinstance(x_col, int) or not isinstance(y_col, int)\
or not isinstance(vx_col, int):
raise TypeError("'x_col', 'y_col', 'vx_col' and 'vy_col' must "
"be integers")
if x_col < 1 or y_col < 1 or vx_col < 1:
raise ValueError("Colonne number out of range")
# 'names' deletion, if specified (dangereux pour la suite)
if 'names' in kwargs:
kwargs.pop('names')
# extract data from file
data = np.genfromtxt(filepath, **kwargs)
# get axes
x = data[:, x_col-1]
x_org = np.sort(np.unique(x))
y = data[:, y_col-1]
y_org = np.sort(np.unique(y))
vx = data[:, vx_col-1]
# check if structured or not
X1 = x.reshape(len(x_org), len(y_org))
X2 = x.reshape(len(y_org), len(x_org))
if np.allclose(np.mean(X1, axis=1), x_org):
vx_org = vx.reshape(len(x_org), len(y_org))
mask = np.isnan(vx_org)
elif np.allclose(np.mean(X2, axis=0), x_org):
vx_org = vx.reshape(len(y_org), len(x_org)).transpose()
vx_org = np.fliplr(vx_org)
mask = np.isnan(vx_org)
else:
# Masking all the initial fields (to handle missing values)
vx_org = np.zeros((y_org.shape[0], x_org.shape[0]))
vx_org_mask = np.ones(vx_org.shape)
vx_org = np.ma.masked_array(vx_org, vx_org_mask)
#loop on all 'v' values
x_ind = 0
y_ind = 0
for i in np.arange(vx.shape[0]):
x_tmp = x[i]
y_tmp = y[i]
vx_tmp = vx[i]
#find x index
if x_org[x_ind] != x_tmp:
x_ind = np.where(x_tmp == x_org)[0][0]
#find y index
if y_org[y_ind] != y_tmp:
y_ind = np.where(y_tmp == y_org)[0][0]
#put the value at its place
vx_org[y_ind, x_ind] = vx_tmp
# Treating 'nan' values
mask = np.logical_or(vx_org.mask, np.isnan(vx_org.data))
#store field in attributes
tmpsf = ScalarField()
tmpsf.import_from_arrays(x_org, y_org, vx_org, mask=mask, unit_x=unit_x,
unit_y=unit_y, unit_values=unit_values)
return tmpsf
[docs]def import_vf_from_ascii(filepath, x_col=1, y_col=2, vx_col=3,
vy_col=4, unit_x=make_unit(""),
unit_y=make_unit(""),
unit_values=make_unit(""), **kwargs):
"""
Import a vectorfield from an ascii file.
Parameters
----------
x_col, y_col, vx_col, vy_col : integer, optional
Colonne numbers for the given variables
(begining at 1).
unit_x, unit_y, unit_v : Unit objects, optional
Unities for the given variables.
**kwargs :
Possibles additional parameters are the same as those used in the
numpy function 'genfromtext()' :
'delimiter' to specify the delimiter between colonnes.
'skip_header' to specify the number of colonne to skip at file
begining
...
"""
# check
filepath = check_path(filepath)
# validating parameters
if not isinstance(x_col, int) or not isinstance(y_col, int)\
or not isinstance(vx_col, int):
raise TypeError("'x_col', 'y_col', 'vx_col' and 'vy_col' must "
"be integers")
if x_col < 1 or y_col < 1 or vx_col < 1:
raise ValueError("Colonne number out of range")
if vy_col is not None:
if not isinstance(vy_col, int):
raise TypeError("'x_col', 'y_col', 'vx_col' and 'vy_col' must "
"be integers")
if vy_col < 1:
raise ValueError("Colonne number out of range")
# 'names' deletion, if specified (dangereux pour la suite)
if 'names' in kwargs:
kwargs.pop('names')
# extract data from file
data = np.genfromtxt(filepath, **kwargs)
# get axes
x = data[:, x_col-1]
x_org = np.unique(x)
y = data[:, y_col-1]
y_org = np.unique(y)
vx = data[:, vx_col-1]
vy = data[:, vy_col-1]
# Masking all the initial fields (to handle missing values)
vx_org = np.zeros((x_org.shape[0], y_org.shape[0]))
vx_org_mask = np.ones(vx_org.shape)
vx_org = np.ma.masked_array(vx_org, vx_org_mask)
vy_org = np.zeros((x_org.shape[0], y_org.shape[0]))
vy_org_mask = np.ones(vy_org.shape)
vy_org = np.ma.masked_array(vy_org, vy_org_mask)
#loop on all 'v' values
for i in np.arange(vx.shape[0]):
x_tmp = x[i]
y_tmp = y[i]
vx_tmp = vx[i]
vy_tmp = vy[i]
#find x index
for j in np.arange(x_org.shape[0]):
if x_org[j] == x_tmp:
x_ind = j
#find y index
for j in np.arange(y_org.shape[0]):
if y_org[j] == y_tmp:
y_ind = j
#put the value at its place
vx_org[x_ind, y_ind] = vx_tmp
vy_org[x_ind, y_ind] = vy_tmp
# Treating 'nan' values
vx_org.mask = np.logical_or(vx_org.mask, np.isnan(vx_org.data))
vy_org.mask = np.logical_or(vy_org.mask, np.isnan(vy_org.data))
#store field in attributes
tmpvf = VectorField()
tmpvf.import_from_arrays(x_org, y_org, vx_org, vy_org,
mask=vx_org.mask,
unit_x=unit_x, unit_y=unit_y,
unit_values=unit_values)
return tmpvf
[docs]def import_vfs_from_ascii(filepath, kind='TVF', incr=1, interval=None,
x_col=1, y_col=2, vx_col=3,
vy_col=4, unit_x=make_unit(""),
unit_y=make_unit(""),
unit_values=make_unit(""), times=[],
unit_time=make_unit(''), **kwargs):
"""
Import velocityfields from an ascii file.
Parameters
----------
filepath : string
Pathname pattern to the ascii files.
incr : integer, optional
Increment value between two fields taken.
interval : 2x1 array, optional
Interval in which take fields.
x_col, y_col, vx_col, vy_col : integer, optional
Colonne numbers for the given variables (begining at 1).
unit_x, unit_y, unit_v : Unit objects, optional
Unities for the given variables.
times : array of number, optional
Times of the instantaneous fields.
unit_time : Unit object, optional
Time unit, 'second' by default.
**kwargs :
Possibles additional parameters are the same as those used in the
numpy function 'genfromtext()' :
'delimiter' to specify the delimiter between colonnes.
'skip_header' to specify the number of colonne to skip at file
begining
Note
----
txt files are taken in alpha-numerical order
('file2.txt' is taken before 'file20.txt').
So you should name your files properly.
"""
if not isinstance(incr, int):
raise TypeError("'incr' must be an integer")
if incr < 1:
raise ValueError("'incr' must be superior to 1")
if interval is not None:
if not isinstance(interval, ARRAYTYPES):
raise TypeError("'interval' must be an array")
if not len(interval) == 2:
raise ValueError("'interval' must be a 2x1 array")
if interval[0] > interval[1]:
interval = [interval[1], interval[0]]
filepath = check_path(filepath)
paths = glob.glob(filepath)
if interval is None:
interval = [0, len(paths)-1]
if interval[0] < 0 or interval[1] > len(paths):
raise ValueError("'interval' is out of bounds")
if times == []:
times = np.arange(len(paths))
if len(paths) != len(times):
raise ValueError("Not enough values in 'times'")
ref_path_len = len(paths[0])
if kind == 'TVF':
fields = TemporalVectorFields()
elif kind == 'SVF':
fields = SpatialVectorFields()
else:
raise ValueError()
for i in np.arange(interval[0], interval[1] + 1, incr):
path = paths[i]
if len(path) != ref_path_len:
raise Warning("You should check your files names,"
"i may have taken them in the wrong order.")
tmp_vf = VectorField()
tmp_vf.import_from_ascii(path, x_col, y_col, vx_col, vy_col,
unit_x, unit_y, unit_values, times[i],
unit_time, **kwargs)
fields.add_field(tmp_vf)
return fields
[docs]def export_to_ascii(obj, filepath):
"""
"""
# check
filepath = check_path(filepath, newfile=True)
# obj type
if isinstance(obj, VectorField):
# open file
f = open(filepath, 'w')
# write header
header = "# X {}\tY {}\tVx {}\tVy {}\n"\
.format(obj.unit_x.strUnit(),
obj.unit_y.strUnit(),
obj.unit_values.strUnit(),
obj.unit_values.strUnit())
f.write(header)
# write data
for i, x in enumerate(obj.axe_x):
for j, y in enumerate(obj.axe_y):
f.write("{}\t{}\t{}\t{}\n".format(x, y, obj.comp_x[i, j],
obj.comp_y[i, j]))
f.close()
elif isinstance(obj, ScalarField):
# open file
f = open(filepath, 'w')
# write header
header = "# X {}\tY {}\tValue {}\n"\
.format(obj.unit_x.strUnit(),
obj.unit_y.strUnit(),
obj.unit_values.strUnit())
f.write(header)
# write data
for i, x in enumerate(obj.axe_x):
for j, y in enumerate(obj.axe_y):
f.write("{}\t{}\t{}\n".format(x, y, obj.values[i, j]))
f.close()
elif isinstance(obj, Profile):
# open file
f = open(filepath, 'w')
# write header
header = "# X {}\tY {}\n"\
.format(obj.unit_x.strUnit(),
obj.unit_y.strUnit())
f.write(header)
# write data
for x, y in zip(obj.x, obj.y):
f.write("{}\t{}\n".format(x, y))
f.close()
elif isinstance(obj, Points):
# open file
f = open(filepath, 'w')
if len(obj.v) == 0:
# write header
header = "# X {}\tY {}\n"\
.format(obj.unit_x.strUnit(),
obj.unit_y.strUnit())
f.write(header)
# write data
for x, y in zip(obj.xy[:, 0], obj.xy[:, 1]):
f.write("{}\t{}\n".format(x, y))
else:
# write header
header = "# X {}\tY {}\tV {}\n"\
.format(obj.unit_x.strUnit(),
obj.unit_y.strUnit(),
obj.unit_v.strUnit())
f.write(header)
# write data
for x, y, v in zip(obj.xy[:, 0], obj.xy[:, 1], obj.v):
f.write("{}\t{}\t{}\n".format(x, y, v))
f.close()
elif isinstance(obj, CritPoints):
# get info from path
basename = os.path.splitext(filepath)[0]
try:
os.mkdir(basename)
except FileExistsError:
pass
t_unit = obj.unit_time.strUnit()
if t_unit == "[]":
t_unit = ""
for typ in obj.cp_types:
pts = obj.__getattribute__(typ)
for t, pt in zip(obj.times, pts):
export_to_ascii(pt, "{}/{}_t={}{}.txt".format(basename,
typ, t, t_unit))
else:
raise TypeError()