# -*- coding: utf-8 -*-
#!/bin/env python3
# Copyright (C) 2003-2007 Gaby Launay
# Author: Gaby Launay <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0
# This file is part of IMTreatment.
# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import matplotlib.pyplot as plt
from .. import plotlib as pplt
import numpy as np
import unum
import copy
from ..utils import make_unit, ProgressCounter
from ..utils.types import ARRAYTYPES, INTEGERTYPES, NUMBERTYPES, STRINGTYPES
from . import field as fld
from . import fields as flds
from . import scalarfield as sf
from . import points as pts
from . import profile as prof
from . import vectorfield as vf
[docs]class TemporalFields(flds.Fields, fld.Field):
"""
Class representing a set of time evolving fields.
All fields added to this object has to have the same axis system.
"""
def __init__(self):
fld.Field.__init__(self)
flds.Fields.__init__(self)
self.fields = []
self.__times = np.array([], dtype=float)
self.__unit_times = make_unit("")
self.field_type = None
self.axe_x = []
self.axe_y = []
self.unit_x = make_unit('')
self.unit_y = make_unit('')
self.unit_times = make_unit('')
def __add__(self, other):
if isinstance(other, self.fields[0].__class__):
tmp_TF = self.copy()
for i in np.arange(len(tmp_TF.fields)):
tmp_TF.fields[i] += other
return tmp_TF
elif isinstance(other, self.__class__):
tmp_tf = self.copy()
if np.all(self.times == other.times):
for i in np.arange(len(self.fields)):
tmp_tf.fields[i] += other.fields[i]
else:
for i in np.arange(len(other.fields)):
tmp_tf.add_field(other.fields[i])
return tmp_tf
else:
tmp_TF = self.copy()
for i in np.arange(len(tmp_TF.fields)):
tmp_TF.fields[i] += other
return tmp_TF
# else:
# raise TypeError("cannot concatenate {} with"
# " {}.".format(self.__class__, type(other)))
def __sub__(self, other):
return self.__add__(-other)
def __neg__(self):
tmp_tf = self.copy()
for i in np.arange(len(self.fields)):
tmp_tf.fields[i] = -tmp_tf.fields[i]
return tmp_tf
def __mul__(self, other):
if isinstance(other, self.__class__):
if not len(self) == len(other):
raise Exception()
if not np.all(self.axe_x == other.axe_x) \
and np.all(self.axe_y == other.axe_y):
raise Exception()
if not np.all(self.times == other.times):
raise Exception()
vfs = self.__class__()
for i in np.arange(len(self.fields)):
vfs.add_field(self.fields[i]*other.fields[i])
return vfs
elif isinstance(other, (NUMBERTYPES, unum.Unum)):
final_vfs = self.__class__()
for field in self.fields:
final_vfs.add_field(field*other)
return final_vfs
else:
raise TypeError("You can only multiply a temporal velocity field "
"by numbers")
__rmul__ = __mul__
def __truediv__(self, other):
if isinstance(other, self.__class__):
if not len(self) == len(other):
raise Exception()
if not np.all(self.axe_x == other.axe_x) \
and np.all(self.axe_y == other.axe_y):
raise Exception()
if not np.all(self.times == other.times):
raise Exception()
vfs = self.__class__()
for i in np.arange(len(self.fields)):
vfs.add_field(self.fields[i]/other.fields[i])
return vfs
elif isinstance(other, self.fields[0].__class__):
if not np.all(self.axe_x == other.axe_x) \
and np.all(self.axe_y == other.axe_y):
raise Exception()
vfs = self.__class__()
for i in np.arange(len(self.fields)):
vfs.add_field(self.fields[i]/other)
return vfs
elif isinstance(other, (NUMBERTYPES, unum.Unum)):
final_vfs = self.__class__()
for i, field in enumerate(self.fields):
final_vfs.add_field(other, time=self.times[i],
unit_times=self.unit_times)
return final_vfs
else:
raise TypeError("")
__div__ = __truediv__
def __pow__(self, number):
if not isinstance(number, NUMBERTYPES):
raise TypeError("You only can use a number for the power "
"on a Vectorfield")
final_vfs = self.__class__()
for field in self.fields:
final_vfs.add_field(np.power(field, number))
return final_vfs
def __iter__(self):
for i in np.arange(len(self.fields)):
yield self.times[i], self.fields[i]
def __eq__(self, obj):
if not isinstance(obj, self.__class__):
return False
if not fld.Field.__eq__(self, obj):
return False
if not flds.Fields.__eq__(self, obj):
return False
for attr in ['fields', 'times', 'axe_x', 'axe_y']:
if not np.all(self.__getattribute__(attr) ==
obj.__getattribute__(attr)):
return False
for attr in ['field_type', 'unit_x', 'unit_y', 'unit_times']:
if not (self.__getattribute__(attr) ==
obj.__getattribute__(attr)):
return False
return True
@fld.Field.axe_x.setter
def axe_x(self, value):
fld.Field.axe_x.fset(self, value)
for field in self.fields:
field.axe_x = value
field.xy_scale = self.xy_scale
@fld.Field.axe_y.setter
def axe_y(self, value):
fld.Field.axe_y.fset(self, value)
for field in self.fields:
field.axe_y = value
field.xy_scale = self.xy_scale
@fld.Field.unit_x.setter
def unit_x(self, value):
fld.Field.unit_x.fset(self, value)
for field in self.fields:
field.unit_x = value
@fld.Field.unit_y.setter
def unit_y(self, value):
fld.Field.unit_y.fset(self, value)
for field in self.fields:
field.unit_y = value
@property
def mask(self):
dim = (len(self.fields), self.shape[0], self.shape[1])
mask_f = np.empty(dim, dtype=bool)
for i, field in enumerate(self.fields):
mask_f[i, :, :] = field.mask[:, :]
return mask_f
@property
def mask_as_sf(self):
dim = len(self.fields)
mask_f = np.empty(dim, dtype=object)
for i, field in enumerate(self.fields):
mask_f[i] = field.mask_as_sf
return mask_f
@property
def mask_cum(self):
cum_mask = np.sum(self.mask, axis=0)
cum_mask = cum_mask == len(self.mask)
return cum_mask
@property
def mask_cum_as_sf(self):
cum_mask = self.mask_cum
tmp_sf = sf.ScalarField()
tmp_sf.import_from_arrays(self.axe_x, self.axe_y, cum_mask, mask=None,
unit_x=self.unit_x, unit_y=self.unit_y,
unit_values='')
return tmp_sf
@property
def times(self):
return self.__times
@times.setter
def times(self, values):
if not isinstance(values, ARRAYTYPES):
raise TypeError()
if len(self.fields) != len(values):
raise ValueError("New number of time ({}) do not corespond to "
"the number of fields ({})"
.format(len(values), len(self.fields)))
self.__times = values
@times.deleter
def times(self):
raise Exception("Nope, can't do that")
@property
def dt(self):
return self.times[1] - self.times[0]
@property
def unit_times(self):
return self.__unit_times
@unit_times.setter
def unit_times(self, new_unit_times):
if isinstance(new_unit_times, unum.Unum):
if new_unit_times.asNumber() == 1:
self.__unit_times = new_unit_times
else:
raise ValueError()
elif isinstance(new_unit_times, STRINGTYPES):
self.__unit_times == make_unit(new_unit_times)
else:
raise TypeError()
@unit_times.deleter
def unit_times(self):
raise Exception("Nope, can't do that")
@property
def unit_values(self):
if len(self.fields) != 0:
return self[0].unit_values
[docs] def get_mean_field(self, nmb_min=1, dtype=None):
"""
Calculate the mean velocity field, from all the fields.
Parameters
----------
nmb_min : integer, optional
Minimum number of values used to make a mean. else, the value is
masked
dtype : type
Specify the output values type (default to the same one as fields).
"""
# checks
if len(self.fields) == 0:
raise ValueError("There is no fields in this object")
if self.field_type == vf.VectorField:
value = [0., 0.]
else:
value = 0.
if not dtype:
dtype = self.fields[0]._values_dtype
#
result_f = self.fields[0].copy()
result_f.change_dtype(float)
result_f.fill(kind='value', value=value, crop=False, inplace=True)
mask_cum = np.zeros(self.shape, dtype=int)
mask_cum[np.logical_not(self.fields[0].mask)] += 1
i = 0
for field in self.fields[1::]:
i += 1
# print("{}, {}".format(i, field.unit_values))
added_field = field.copy()
added_field.fill(kind='value', value=0., inplace=True)
result_f += added_field
mask_cum[np.logical_not(field.mask)] += 1
mask = mask_cum <= nmb_min
result_f.mask = mask
result_f.change_dtype(dtype)
fact = mask_cum
fact[mask] = 1
result_f /= fact
result_f.xy_scale = self.fields[0].xy_scale
return result_f
[docs] def get_interpolated_field(self, time):
"""
Return the interpolated field happening at the time 'time'.
"""
# check
assert isinstance(time, NUMBERTYPES)
assert time >= self.times[0]
assert time <= self.times[-1]
# if time is in self.times
if np.any(self.times == time):
return self.fields[self.times == time][0]
# else, get the surrounding fields
ind_time = np.argwhere(self.times > time)[0][0]
denom = self.times[ind_time] - self.times[ind_time - 1]
coef1 = (self.times[ind_time] - time)/denom
coef2 = (time - self.times[ind_time - 1])/denom
new_field = self.fields[ind_time]*coef2 + \
self.fields[ind_time - 1]*coef1
new_field.time = time
# returning
assert isinstance(new_field, self.fields[0].__class__)
return new_field
[docs] def get_fluctuant_fields(self, nmb_min_mean=1):
"""
Calculate the fluctuant fields (fields minus mean field).
Parameters
----------
nmb_min_mean : number, optional
Parameter for mean computation (see 'get_mean_field' doc).
Returns
-------
fluct_fields : TemporalScalarFields or TemporalVectorFields object
Contening fluctuant fields.
"""
fluct_fields = self.__class__()
mean_field = self.get_mean_field(nmb_min=nmb_min_mean)
for i, field in enumerate(self.fields):
fluct_fields.add_field(field - mean_field, time=self.times[i],
unit_times=self.unit_times)
return fluct_fields
[docs] def get_spatial_spectrum(self, component, direction, intervx=None,
intervy=None, intervtime=None, welch_seglen=None,
scaling='base', fill='linear'):
"""
Return a spatial spectrum.
If more than one time are specified, spectrums are averaged.
Parameters
----------
component : string
Should be an attribute name of the stored fields.
direction : string
Direction in which perform the spectrum ('x' or 'y').
intervx and intervy : 2x1 arrays of number, optional
To chose the zone where to calculate the spectrum.
If not specified, the biggest possible interval is choosen.
intervtime : 2x1 array, optional
Interval of time on which averaged the spectrum.
welch_seglen : integer, optional
If specified, welch's method is used (dividing signal into
overlapping segments, and averaging periodogram) with the given
segments length (in number of points).
scaling : string, optional
If 'base' (default), result are in component unit.
If 'spectrum', the power spectrum is returned (in unit^2).
If 'density', the power spectral density is returned
(in unit^2/(1/unit_axe))
fill : string or float
Specifies the way to treat missing values.
A value for value filling.
A string ('linear', 'nearest' or 'cubic') for interpolation.
Notes
-----
If there is missing values on the field, 'fill' is used to linearly
interpolate the missing values (can impact the spectrum).
"""
# check parameters
try:
self[0].__getattribute__('{}_as_sf'.format(component))
except AttributeError():
raise ValueError()
if not isinstance(direction, STRINGTYPES):
raise TypeError()
if direction not in ['x', 'y']:
raise ValueError()
if intervtime is None:
intervtime = [self.times[0], self.times[-1]]
if not isinstance(intervtime, ARRAYTYPES):
raise TypeError()
intervtime = np.array(intervtime)
if not intervtime.shape == (2,):
raise ValueError()
if intervtime[0] < self.times[0]:
intervtime[0] = self.times[0]
if intervtime[-1] > self.times[-1]:
intervtime[-1] = self.times[-1]
if intervtime[0] >= intervtime[1]:
raise ValueError()
# loop on times
spec = 0
nmb = 0
for i, time in enumerate(self.times):
if time < intervtime[0] or time > intervtime[1]:
continue
comp = self[i].__getattribute__('{}_as_sf'.format(component))
if spec == 0:
spec = comp.get_spatial_spectrum(direction, intervx=intervx,
intervy=intervy,
welch_seglen=welch_seglen,
scaling=scaling, fill=fill)
else:
spec += comp.get_spatial_spectrum(direction, intervx=intervx,
intervy=intervy,
welch_seglen=welch_seglen,
scaling=scaling, fill=fill)
nmb += 1
# returning
spec /= nmb
return spec
[docs] def get_time_profile(self, component, pt, wanted_times=None, ind=False):
"""
Return a profile contening the time evolution of the given component.
Parameters
----------
component : string
Should be an attribute name of the stored fields.
pt : 2x1 array of numbers, or pts.Points object
Wanted position for the time profile, in axis units.
wanted_times : 2x1 array of numbers
Time interval in which getting profile (default is all).
ind : boolean, optional
If 'True', values are undersood as indices.
Returns
-------
profile : prof.Profile object
"""
# check parameters coherence
if not isinstance(component, STRINGTYPES):
raise TypeError("'component' must be a string")
if isinstance(pt, ARRAYTYPES):
if ind:
if pt[0] % 1 != 0 or pt[1] % 1 != 0:
raise ValueError()
ind_x = int(pt[0])
ind_y = int(pt[1])
else:
ind_x = self.get_indice_on_axe(1, pt[0], kind='nearest')
ind_y = self.get_indice_on_axe(2, pt[1], kind='nearest')
axe_x, axe_y = self.axe_x, self.axe_y
if not (0 <= ind_x < len(axe_x) and 0 <= ind_y < len(axe_y)):
raise ValueError("'x' ans 'y' values out of bounds")
pt = np.array([[ind_x, ind_y]]*len(self.times), dtype=int)
mask_times = np.zeros(len(self.times), dtype=bool)
if isinstance(pt, pts.Points):
mask_times = [time not in pt.v for time in self.times]
mask_times = np.array(mask_times, dtype=bool)
pt = [[self.get_indice_on_axe(1, pt.xy[i, 0], kind='nearest'),
self.get_indice_on_axe(2, pt.xy[i, 1], kind='nearest')]
for i in range(len(pt.xy))]
pt = np.array(pt, dtype=int)
if wanted_times is not None:
if wanted_times[-1] <= wanted_times[0]:
raise ValueError()
mask_times = np.logical_or(self.times < wanted_times[0],
mask_times)
mask_times = np.logical_or(self.times > wanted_times[1],
mask_times)
# getting wanted time if necessary
w_times_ind = np.arange(len(self.times))[~mask_times]
# getting component values
dim = len(w_times_ind)
compo = np.empty(dim, dtype=float)
masks = np.empty(dim, dtype=float)
for i, time_ind in enumerate(w_times_ind):
ind_x, ind_y = pt[i]
compo[i] = self.fields[time_ind].__getattribute__(component)[ind_x,
ind_y]
masks[i] = self.fields[time_ind].mask[ind_x, ind_y]
# gettign others datas
time = self.times[w_times_ind]
unit_time = self.unit_times
unit_values = self.unit_values
# getting position indices
return prof.Profile(time, compo, masks, unit_x=unit_time,
unit_y=unit_values)
[docs] def inject_time_profile(self, comp, pt, prof, ind=False):
"""
Overwrite the value at the given points with data from the time
profile.
Parameters
----------
comp : string
Should be an attribute name of the stored fields.
pt : 2x1 array of numbers, or pts.Points object
Wanted position for the time profile, in axis units.
prof : Profile object
Profile used to overwrite data at point
ind : boolean, optional
If 'True', values are understood as indices.
"""
# check
if not isinstance(comp, STRINGTYPES):
raise TypeError("'comp' must be a string")
if isinstance(pt, ARRAYTYPES):
if ind:
if pt[0] % 1 != 0 or pt[1] % 1 != 0:
raise ValueError()
ind_x = int(pt[0])
ind_y = int(pt[1])
else:
ind_x = self.get_indice_on_axe(1, pt[0], kind='nearest')
ind_y = self.get_indice_on_axe(2, pt[1], kind='nearest')
axe_x, axe_y = self.axe_x, self.axe_y
if not (0 <= ind_x < len(axe_x) and 0 <= ind_y < len(axe_y)):
raise ValueError("'x' ans 'y' values out of bounds")
pt = np.array([[ind_x, ind_y]]*len(self.times), dtype=int)
mask_times = np.zeros(len(self.times), dtype=bool)
if isinstance(pt, pts.Points):
mask_times = [time not in pt.v for time in self.times]
mask_times = np.array(mask_times, dtype=bool)
pt = [[self.get_indice_on_axe(1, pt.xy[i, 0], kind='nearest'),
self.get_indice_on_axe(2, pt.xy[i, 1], kind='nearest')]
for i in range(len(pt.xy))]
pt = np.array(pt, dtype=int)
if len(prof) != len(self):
raise ValueError()
# do it
for i in range(len(prof)):
self.fields[i].__getattribute__(comp)[ind_x, ind_y] = prof.y[i]
[docs] def get_temporal_spectrum(self, component, pt, ind=False,
wanted_times=None, welch_seglen=None,
scaling='base', fill='linear', mask_error=True,
detrend='constant'):
"""
Return a Profile object, with the temporal spectrum of
'component' on the point 'pt'.
Parameters
----------
component : string
.
pt : 2x1 array of numbers
.
ind : boolean
If true, 'pt' is read as indices,
else, 'pt' is read as coordinates.
wanted_times : 2x1 array, optional
Time interval in which compute spectrum (default is all).
welch_seglen : integer, optional
If specified, welch's method is used (dividing signal into
overlapping segments, and averaging periodogram) with the given
segments length (in number of points).
scaling : string, optional
If 'base' (default), result are in component unit.
If 'spectrum', the power spectrum is returned (in unit^2).
If 'density', the power spectral density is returned (in unit^2/Hz)
fill : string or float
Specifies the way to treat missing values.
A value for value filling.
A string ('linear', 'nearest', 'zero', 'slinear', 'quadratic,
'cubic' where 'slinear', 'quadratic' and 'cubic' refer to a spline
interpolation of first, second or third order) for interpolation.
mask_error : boolean
If 'False', instead of raising an error when masked value appear on
time profile, '(None, None)' is returned.
detrend : string, optional
Method used to detrend the profile. Can be 'none',
'constant' (default) or 'linear'.
Returns
-------
magn_prof : prof.Profile object
Magnitude spectrum.
"""
# checking parameters coherence
if not isinstance(pt, ARRAYTYPES):
raise TypeError("'pt' must be a 2x1 array")
if ind:
pt = np.array(pt, dtype=int)
else:
pt = np.array(pt, dtype=float)
if not pt.shape == (2,):
raise ValueError("'pt' must be a 2x1 array")
if ind and (not isinstance(pt[0], INTEGERTYPES) or
not isinstance(pt[1], INTEGERTYPES)):
raise TypeError("If 'ind' is True, 'pt' must be an array of two"
" integers")
if not isinstance(ind, bool):
raise TypeError("'ind' must be a boolean")
x = pt[0]
y = pt[1]
# getting time profile
time_prof = self.get_time_profile(component, pt=[x, y], ind=ind,
wanted_times=wanted_times)
magn_prof = time_prof.get_spectrum(welch_seglen=welch_seglen,
scaling=scaling, fill=fill,
mask_error=mask_error,
detrend=detrend)
return magn_prof
[docs] def get_temporal_spectrum_over_area(self, component, intervx, intervy,
ind=False, welch_seglen=None,
scaling='base', fill='linear',
detrend='constant'):
"""
Return a prof.Profile object, contening a mean spectrum of the given
component, on all the points included in the given intervals.
Parameters
----------
component : string
Scalar component ('Vx', 'Vy', 'magnitude', ...).
intervx, intervy : 2x1 arrays of numbers
Defining the square on which averaging the spectrum.
(in axes values)
ind : boolean
If true, 'pt' is read as indices,
else, 'pt' is read as coordinates.
welch_seglen : integer, optional
If specified, welch's method is used (dividing signal into
overlapping segments, and averaging periodogram) with the given
segments length (in number of points).
scaling : string, optional
If 'base' (default), result are in component unit.
If 'spectrum', the power spectrum is returned (in unit^2).
If 'density', the power spectral density is returned (in unit^2/Hz)
fill : string or float
Specifies the way to treat missing values.
A value for value filling.
A string ('linear', 'nearest', 'zero', 'slinear', 'quadratic,
'cubic' where 'slinear', 'quadratic' and 'cubic' refer to a spline
interpolation of first, second or third order) for interpolation.
detrend : string, optional
Method used to detrend the profile. Can be 'none',
'constant' (default) or 'linear'.
Returns
-------
magn_prof : prof.Profile object
Averaged magnitude spectrum.
"""
# checking parameters coherence
if not isinstance(component, STRINGTYPES):
raise TypeError("'component' must be a string")
if not isinstance(intervx, ARRAYTYPES):
raise TypeError("'intervx' must be an array")
if not isinstance(intervy, ARRAYTYPES):
raise TypeError("'intervy' must be an array")
if not isinstance(intervx[0], NUMBERTYPES):
raise TypeError("'intervx' must be an array of numbers")
if not isinstance(intervy[0], NUMBERTYPES):
raise TypeError("'intervy' must be an array of numbers")
axe_x, axe_y = self.axe_x, self.axe_y
# checking interval values and getting bound indices
if ind:
if not isinstance(intervx[0], int)\
or not isinstance(intervx[1], int)\
or not isinstance(intervy[0], int)\
or not isinstance(intervy[1], int):
raise TypeError("'intervx' and 'intervy' must be arrays of"
" integer if 'ind' is 'True'")
if intervx[0] < 0 or intervy[0] < 0\
or intervx[-1] >= len(axe_x)\
or intervy[-1] >= len(axe_y):
raise ValueError("intervals are out of bounds")
ind_x_min = intervx[0]
ind_x_max = intervx[1]
ind_y_min = intervy[0]
ind_y_max = intervy[1]
else:
axe_x_min = np.min(axe_x)
axe_x_max = np.max(axe_x)
axe_y_min = np.min(axe_y)
axe_y_max = np.max(axe_y)
if np.min(intervx) < axe_x_min\
or np.max(intervx) > axe_x_max\
or np.min(intervy) < axe_y_min\
or np.max(intervy) > axe_y_max:
raise ValueError("intervals ({}) are out of bounds ({})"
.format([intervx, intervy],
[[axe_x_min, axe_x_max],
[axe_y_min, axe_y_max]]))
ind_x_min = self.get_indice_on_axe(1, intervx[0])[-1]
ind_x_max = self.get_indice_on_axe(1, intervx[1])[0]
ind_y_min = self.get_indice_on_axe(2, intervy[0])[-1]
ind_y_max = self.get_indice_on_axe(2, intervy[1])[0]
# Averaging ponctual spectrums
magn = 0.
nmb_fields = (ind_x_max - ind_x_min + 1)*(ind_y_max - ind_y_min + 1)
real_nmb_fields = nmb_fields
for i in np.arange(ind_x_min, ind_x_max + 1):
for j in np.arange(ind_y_min, ind_y_max + 1):
tmp_m = self.get_temporal_spectrum(component, [i, j], ind=True,
welch_seglen=welch_seglen,
scaling=scaling,
fill=fill, mask_error=True,
detrend=detrend)
# check if the position is masked
if tmp_m is None:
real_nmb_fields -= 1
else:
magn = magn + tmp_m
if real_nmb_fields == 0:
raise Exception("I can't find a single non-masked time profile"
", maybe you will want to try 'zero_fill' "
"option")
magn = magn/real_nmb_fields
return magn
[docs] def get_spectrum_map(self, comp, welch_seglen=None, nmb_pic=1,
spec_smooth=None,
verbose=True):
"""
Return the temporal spectrum map.
Parameters
----------
comp : string
Component to get the spectrum from.
welch_seglen : integer
.
nmb_pic : integer
Number of succesive spectrum pic to detect
spec_smooth : number
.
verbose : bool
.
Returns
-------
map_freq_sf :
.
map_freq_quality_sf :
.
"""
# check
try:
self.fields[0].__getattribute__(comp)
except AttributeError():
raise ValueError()
# prepare
map_freq = []
map_freq_quality = []
map_freq_mask = np.zeros(self.shape, dtype=bool)
for i in range(nmb_pic):
map_freq.append(np.zeros(self.shape, dtype=float))
map_freq_quality.append(np.zeros(self.shape, dtype=float))
if verbose:
PG = ProgressCounter(init_mess="Begin spectrum map computation on {}"
.format(comp),
nmb_max=self.shape[0]*self.shape[1],
name_things="points")
# loop on field points
for i, x in enumerate(self.axe_x):
for j, y in enumerate(self.axe_y):
if verbose:
PG.print_progress()
# get local spectrum
tmp_prof = self.get_time_profile(comp, [x, y])
# check if should be masked
if np.sum(tmp_prof.mask)/float(len(tmp_prof)) > .5:
map_freq_mask[i, j] = True
continue
spec = tmp_prof.get_spectrum(welch_seglen=welch_seglen)
# smooth if necessary
if spec_smooth is not None:
spec.smooth(tos='gaussian', size=spec_smooth, inplace=True)
# get maximale frequences
for n in range(nmb_pic):
spec_max = spec.max
max_pos_ind = spec.get_value_position(spec_max,
ind=True)[0]
map_freq[n][i, j] = spec.x[max_pos_ind]
# get spectrum 'quality'
filt = np.logical_not(spec.mask)
spec_var = np.mean((spec.y[filt] -
np.mean(spec.y[filt]))**2)**.5
map_freq_quality[n][i, j] = (spec_max - spec.mean)/spec_var
# remove this particular pic
spec.mask[max_pos_ind] = True
# store results
maps_freq = []
maps_freq_quality = []
for i in range(nmb_pic):
map_freq_sf = sf.ScalarField()
map_freq_sf.import_from_arrays(axe_x=self.axe_x, axe_y=self.axe_y,
values=map_freq[i],
mask=map_freq_mask,
unit_x=self.unit_x,
unit_y=self.unit_y,
unit_values=spec.unit_y)
map_freq_quality_sf = sf.ScalarField()
map_freq_quality_sf.import_from_arrays(axe_x=self.axe_x,
axe_y=self.axe_y,
values=map_freq_quality[i],
mask=map_freq_mask,
unit_x=self.unit_x,
unit_y=self.unit_y,
unit_values='')
maps_freq.append(map_freq_sf)
maps_freq_quality.append(map_freq_quality_sf)
# return
if nmb_pic == 1:
return maps_freq[0], maps_freq_quality[0]
else:
return maps_freq, maps_freq_quality
def _get_comp_spectral_filtering(self, comp, fmin, fmax, order=2):
"""
Perform a temporal spectral filtering on the wanted component
Parameters
----------
fmin, fmax : numbers
Minimal and maximal frequencies
order : integer, optional
Butterworth filter order
Returns
-------
filt_tf : TemporalFields
Filtered temporal field
"""
# check
try:
tf = self.__getattribute__("{}_as_sf".format(comp))
except AttributeError():
raise ValueError()
# loop on space
xys = [[x, y]
for x in tf.axe_x
for y in tf.axe_y]
for x, y in xys:
tmp_prof = tf.get_time_profile('values', [x, y])
filt_tmp_prof = tmp_prof.spectral_filtering(fmin, fmax,
order=order)
tf.inject_time_profile('values', [x, y], filt_tmp_prof)
return tf
[docs] def get_recurrence_map(self, norm=2, verbose=False, bandwidth=None,
normalized=False):
"""
Return the recurrence map associated with the 2-norm.
Returns
-------
rec_map : sf.ScalarField object
.
"""
length = len(self.fields)
rec_map = np.zeros((length, length))
if verbose:
pc = ProgressCounter(init_mess="Begin recurence map computation",
nmb_max=int(length**2/2. + length/2.),
name_things='norms', perc_interv=1)
if self.field_type == vf.VectorField:
field_type = 0
nmb_val = self.shape[0]*self.shape[1]*2
elif self.field_type == sf.ScalarField:
field_type = 1
nmb_val = self.shape[0]*self.shape[1]
else:
raise Exception()
inv_norm = 1./norm
if norm % 2 == 0:
need_abs = False
else:
need_abs = True
# norm
if normalized:
if field_type == 0:
magnitude = self.get_mean_field().magnitude
normalizer = np.sum(magnitude**norm)**inv_norm/nmb_val
else:
magnitude = self.get_mean_field().values
normalizer = np.sum(magnitude**norm)**inv_norm/nmb_val
# double loop
for i in range(length):
for j in range(length):
if i > j:
continue
if i == j:
rec_map[i, j] = 0
continue
if bandwidth is not None:
if np.abs(i - j) > bandwidth:
rec_map[i, j] = np.nan
rec_map[j, i] = np.nan
continue
if verbose:
pc.print_progress()
# get difference fields
if field_type == 0:
valuesx = self.fields[i].comp_x - self.fields[j].comp_x
valuesy = self.fields[i].comp_y - self.fields[j].comp_y
values = np.concatenate((valuesx, valuesy))
elif field_type == 1:
values = self.fields[i].values - self.fields[j].values
# get norm
if need_abs:
res = np.sum(np.abs(values)**norm)**inv_norm/nmb_val
else:
res = np.sum(values**norm)**inv_norm/nmb_val
rec_map[i, j] = res
rec_map[j, i] = res
# normalize
if normalized:
rec_map /= normalizer
# return
rec_map_sf = sf.ScalarField()
rec_map_sf.import_from_arrays(axe_x=self.times, axe_y=self.times,
values=rec_map,
unit_x=self.unit_times,
unit_y=self.unit_times,
unit_values=self.unit_values)
return rec_map_sf
[docs] def extend(self, nmb_left=0, nmb_right=0, nmb_up=0, nmb_down=0,
inplace=False):
"""
Add columns or lines of masked values at the fields.
Parameters
----------
nmb_**** : integers
Number of lines/columns to add in each direction.
inplace : bool
If 'False', return a new extended field, if 'True', modify the
field inplace.
Returns
-------
Extended_field : TemporalFields object, optional
Extended field.
"""
if inplace:
tmp_tf = self
else:
tmp_tf = self.copy()
# scale axis
fld.Field.extend(tmp_tf, nmb_left=nmb_left, nmb_right=nmb_right,
nmb_up=nmb_up, nmb_down=nmb_down, inplace=True)
# scale fields
for i, _ in enumerate(tmp_tf.fields):
tmp_tf.fields[i].extend(nmb_left=nmb_left, nmb_right=nmb_right,
nmb_up=nmb_up, nmb_down=nmb_down,
inplace=True)
# return
if not inplace:
return tmp_tf
[docs] def scale(self, scalex=None, scaley=None, scalev=None, scalet=None,
inplace=False):
"""
Scale the Fields.
Parameters
----------
scalex, scaley, scalev : numbers or Unum objects
Scale for the axis and the values.
inplace : boolean
.
"""
if inplace:
tmp_f = self
else:
tmp_f = self.copy()
# scale the field (automaticly scale the fields axis)
# for compatibility purpose
if not hasattr(self, "xy_scale"):
self.xy_scale = make_unit('')
for field in self.fields:
if not hasattr(field, "xy_scale"):
field.xy_scale = self.xy_scale
fld.Field.scale(tmp_f, scalex=scalex, scaley=scaley,
inplace=True)
# scale the values
flds.Fields.scale(tmp_f, scalev=scalev, inplace=True)
# scale the time
if scalet is None:
pass
elif isinstance(scalet, NUMBERTYPES):
tmp_f.times *= scalet
elif isinstance(scalet, unum.Unum):
new_unit = tmp_f.unit_times*scalet
fact = new_unit.asNumber()
new_unit /= fact
tmp_f.unit_times = new_unit
tmp_f.times *= fact
else:
raise TypeError()
# returning
if not inplace:
return tmp_f
[docs] def make_evenly_spaced(self, interp='linear', res=1):
"""
Use interpolation to make the fields evenly spaced
Parameters
----------
interp : {‘linear’, ‘cubic’, ‘quintic’}, optional
The kind of spline interpolation to use. Default is ‘linear’.
res : number
Resolution of the resulting field.
A value of 1 meaning a spatial resolution equal to the smallest
space along the two axis for the initial field.
"""
# raise NotImplementedError('Not implemented yet')
for field in self.fields:
field.make_evenly_spaced(interp=interp, res=res)
self._Field__axe_x = self.fields[0].axe_x
self._Field__axe_y = self.fields[0].axe_y
self.__mask = False
[docs] def change_unit(self, axe, new_unit):
"""
Change the unit of an axe.
Parameters
----------
axe : string
'y' for changing the profile y axis unit
'x' for changing the profile x axis unit
'values' for changing values unit
'time' for changing time unit
new_unit : Unum.unit object or string
The new unit.
"""
if isinstance(new_unit, STRINGTYPES):
new_unit = make_unit(new_unit)
if not isinstance(new_unit, unum.Unum):
raise TypeError()
if not isinstance(axe, STRINGTYPES):
raise TypeError()
if axe in ['x', 'y', 'values']:
for field in self.fields:
field.change_unit(axe, new_unit)
elif axe == 'time':
old_unit = self.unit_times
new_unit = old_unit.asUnit(new_unit)
fact = new_unit.asNumber()
self.times *= fact
self.unit_times = new_unit/fact
else:
raise ValueError()
if axe in ['x', 'y']:
fld.Field.change_unit(self, axe, new_unit)
[docs] def add_field(self, field, time=0., unit_times="", copy=True):
"""
Add a field to the existing fields.
Parameters
----------
field : vf.VectorField or sf.ScalarField object
The field to add.
time : number
time associated to the field.
unit_time : Unum object
time unit.
"""
# TODO : pas de vérification de la cohérence des unités !
# checking parameters
if not isinstance(field, (vf.VectorField, sf.ScalarField)):
raise TypeError()
if self.field_type is None:
self.field_type = field.__class__
if not isinstance(field, self.field_type):
raise TypeError()
if not isinstance(time, NUMBERTYPES):
raise TypeError("'time' should be a number, not {}"
.format(type(time)))
if isinstance(unit_times, unum.Unum):
if unit_times.asNumber() != 1:
raise ValueError()
elif isinstance(unit_times, STRINGTYPES):
unit_times = make_unit(unit_times)
else:
raise TypeError()
# if this is the first field
if len(self.fields) == 0:
self.axe_x = field.axe_x
self.axe_y = field.axe_y
self.unit_x = field.unit_x
self.unit_y = field.unit_y
self.unit_times = unit_times
self.__times = np.asarray([time], dtype=float)
self.field_type = field.__class__
# if not
else:
# checking field type
if not isinstance(field, self.field_type):
raise TypeError()
# checking axis
axe_x, axe_y = self.axe_x, self.axe_y
vaxe_x, vaxe_y = field.axe_x, field.axe_y
if not np.all(axe_x == vaxe_x) and np.all(axe_y == vaxe_y):
raise ValueError("Axes of the new field must be consistent "
"with current axes")
# storing time
time = (time*self.unit_times/unit_times).asNumber()
self.__times = np.append(self.__times, time)
# use default constructor
flds.Fields.add_field(self, field, copy=copy)
# sorting the field with time
self.__sort_field_by_time()
[docs] def remove_fields(self, fieldnumbers):
"""
Remove field(s) of the existing fields.
Parameters
----------
fieldnumber : integer or list of integers
Velocity field(s) number(s) to remove.
"""
if isinstance(fieldnumbers, INTEGERTYPES):
fieldnumbers = [fieldnumbers]
for nmb in fieldnumbers:
self.__times = np.delete(self.times, nmb)
flds.Fields.remove_field(self, fieldnumbers)
[docs] def reduce_spatial_resolution(self, fact, inplace=False, verbose=False):
"""
Reduce the spatial resolution of the fields by a factor 'fact'
Parameters
----------
fact : int
Reducing factor.
inplace : boolean, optional
.
"""
if inplace:
tmpfs = self
else:
tmpfs = self.copy()
# verbose
if verbose:
pg = ProgressCounter(f"Reducing resolution by {fact}",
len(self), name_things='fields')
#
for f in tmpfs.fields:
f.reduce_spatial_resolution(fact=fact, inplace=True)
pg.print_progress()
#
self.axe_x = self.fields[0].axe_x
self.axe_y = self.fields[0].axe_y
return tmpfs
[docs] def reduce_temporal_resolution(self, nmb_in_interval, mean=True,
inplace=False):
"""
Return a TemporalVelocityFields, contening one field for each
'nmb_in_interval' field in the initial TFVS.
Parameters
----------
nmb_in_interval : integer
Length of the interval.
(one field is kept for each 'nmb_in_interval fields)
mean : boolean, optional
If 'True', the resulting fields are average over the interval.
Else, fields are taken directly.
inplace : boolean, optional
Returns
-------
TVFS : TemporalVelocityFields
"""
# cehck parameters
if not isinstance(nmb_in_interval, int):
raise TypeError("'nmb_in_interval' must be an integer")
if nmb_in_interval == 1:
return self.copy()
if nmb_in_interval >= len(self):
raise ValueError("'nmb_in_interval' is too big")
if not isinstance(mean, bool):
raise TypeError("'mean' must be a boolean")
#
tmp_TFS = self.__class__()
i = 0
times = self.times
while True:
tmp_f = self[i]
time = times[i]
if mean:
for j in np.arange(i + 1, i + nmb_in_interval):
tmp_f += self[j]
time += times[j]
tmp_f /= nmb_in_interval
time /= nmb_in_interval
tmp_TFS.add_field(tmp_f, time, self.unit_times)
i += nmb_in_interval
if i + nmb_in_interval >= len(self):
break
# returning
if inplace:
self.fields = tmp_TFS.fields
self.times = tmp_TFS.times
else:
return tmp_TFS
[docs] def augment_temporal_resolution(self, fact=2, inplace=False):
"""
Augment the temporal resolution using temporal interpoalation.
Parameters
----------
fact : integer
Temporal resolution ratio.
inplace : bool
.
"""
# check
assert type(fact) in [int], "TypeError"
assert fact > 0, "ValueError"
assert type(inplace) == bool, "TypeError"
# fact = 1 (fool...)
if fact == 1:
if inplace:
return None
else:
return self.copy()
# get data
if inplace:
tf = self
else:
tf = self.copy()
# get new times
new_times = []
for i in range(len(tf.times) - 1):
tmp_times = np.linspace(tf.times[i], tf.times[i + 1],
fact + 1)[0:-1]
new_times.append(tmp_times)
new_times = np.array(new_times).flatten()
new_times = np.append(new_times, tf.times[-1])
# loop on new times
new_fields = []
for time in new_times:
new_fields.append(tf.get_interpolated_field(time))
# store
tf.fields = new_fields
tf.times = new_times
# returning
if not inplace:
return tf
[docs] def crop_masked_border(self, hard=False, inplace=False):
"""
Crop the masked border of the velocity fields in place.
Parameters
----------
hard : boolean, optional
If 'True', partially masked border are croped as well.
inplace : boolean, optional
If 'True', crop the F in place,
else, return a croped TF.
"""
#get cumulated mask
mask_cum = self.mask_cum
# checking masked values presence
if not np.any(mask_cum):
return None
# hard cropping
if hard:
if inplace:
tmp_tf = self
else:
tmp_tf = self.copy()
# remove trivial borders
tmp_tf.crop_masked_border(hard=False, inplace=True)
# until there is no more masked values
while True:
# getting mask
masks = tmp_tf.mask
mask = np.sum(masks, axis=0)
mask = mask == len(tmp_tf.fields)
# getting number of masked value on each border
bd1 = np.sum(mask[0, :])
bd2 = np.sum(mask[-1, :])
bd3 = np.sum(mask[:, 0])
bd4 = np.sum(mask[:, -1])
# getting more masked border
more_masked = np.argmax([bd1, bd2, bd3, bd4])
# check remaining masked values
if [bd1, bd2, bd3, bd4][more_masked] == 0:
break
# deleting more masked border
if more_masked == 0:
len_x = len(tmp_tf.axe_x)
tmp_tf.crop(intervx=[1, len_x], ind=True, inplace=True)
elif more_masked == 1:
len_x = len(tmp_tf.axe_x)
tmp_tf.crop(intervx=[0, len_x - 2], ind=True,
inplace=True)
elif more_masked == 2:
len_y = len(tmp_tf.axe_y)
tmp_tf.crop(intervy=[1, len_y], ind=True, inplace=True)
elif more_masked == 3:
len_y = len(tmp_tf.axe_y)
tmp_tf.crop(intervy=[0, len_y - 2], ind=True, inplace=True)
if not inplace:
return tmp_tf
# soft cropping
else:
# getting positions to remove
# (column or line with only masked values)
axe_y_m = ~np.all(mask_cum, axis=0)
axe_x_m = ~np.all(mask_cum, axis=1)
# skip if nothing to do
if not np.any(axe_y_m) or not np.any(axe_x_m):
return None
# getting indices where we need to cut
axe_x_min = np.where(axe_x_m)[0][0]
axe_x_max = np.where(axe_x_m)[0][-1]
axe_y_min = np.where(axe_y_m)[0][0]
axe_y_max = np.where(axe_y_m)[0][-1]
# crop
if inplace:
self.crop(intervx=[axe_x_min, axe_x_max],
intervy=[axe_y_min, axe_y_max], ind=True,
inplace=True)
else:
tmp_tf = self.copy()
tmp_tf.crop(intervx=[axe_x_min, axe_x_max],
intervy=[axe_y_min, axe_y_max], ind=True,
inplace=True)
return tmp_tf
[docs] def mirroring(self, direction, position, inds_to_mirror='all', mir_coef=1.,
inplace=False, interp=None, value=[0, 0]):
"""
Return the fields with additional mirrored values.
Parameters
----------
direction : integer
Axe on which place the symetry plane (1 for x and 2 for y)
position : number
Position of the symetry plane along the given axe
inds_to_mirror : integer
Number of vector rows to symetrize (default is all)
mir_coef : number or 2x1 array, optional
Optional coefficient(s) applied only to the mirrored values.
It can be an array first value is for 'comp_x' and second one to
'comp_y' (for vector fields)
inplace : boolean, optional
.
interp : string, optional
If specified, method used to fill the gap near the
symetry plane by interpoaltion.
'value' : fill with the given value,
'nearest' : fill with the nearest value,
'linear' (default): fill using linear interpolation
(Delaunay triangulation),
'cubic' : fill using cubic interpolation (Delaunay triangulation)
value : array, optional
Value at the symetry plane, in case of interpolation
"""
if inplace:
tmp_tf = self
else:
tmp_tf = self.copy()
# mirror fields
for i in range(len(self.fields)):
tmp_tf.fields[i].mirroring(direction=direction, position=position,
inds_to_mirror=inds_to_mirror,
mir_coef=mir_coef, inplace=True,
interp=interp, value=value)
# update field
tmp_tf.__axe_x = self.fields[0].axe_x
tmp_tf.__axe_y = self.fields[0].axe_y
# return
if not inplace:
return tmp_tf
[docs] def remove_weird_fields(self, std_coef=3.29, treatment='interpolate',
inplace=False):
"""
Look at the time evolution of spatial mean magnitude to identify and
replace weird fields.
Parameters
----------
std_coef : number
Fields associated with mean magnitude outside the interval
[mean - std_coef*std, mean - std_coef*std] are treated as weird
fields. Default value of '3.29' corespond for a 99.9% interval.
treatment : string in ['remove', 'interpolate']
Type of treatment for the weird fields
(default is 'interpolate')
inplace : bool
.
Returns
-------
tf : TemporalField
treated temporal field
"""
# get data
if inplace:
tmp_tf = self
else:
tmp_tf = self.copy()
# get weird fields indices
mean_magn = []
for field in tmp_tf.fields:
mean_magn.append(np.sum(field.magnitude[~field.mask]))
mean = np.mean(mean_magn)
mean_eps = np.std(mean_magn)*std_coef
filt = np.logical_or(mean_magn < mean - mean_eps,
mean_magn > mean + mean_eps)
weird_inds = np.arange(len(tmp_tf))[filt]
# treat weird fields
if treatment == 'interpolate':
# replace weird fields with interpolations
for weird_ind in weird_inds:
eps = 1
while True:
if (weird_ind + eps in weird_inds or
weird_ind - eps in weird_inds):
eps += 1
else:
break
tmp_tf.fields[weird_ind] = (tmp_tf.fields[weird_ind + eps] +
tmp_tf.fields[weird_ind - eps])/2
elif treatment == 'remove':
tmp_tf.remove_fields(weird_inds)
else:
raise ValueError()
# return
if not inplace:
return tmp_tf
[docs] def crop(self, intervx=None, intervy=None, intervt=None, full_output=False,
ind=False, inplace=False):
"""
Return a croped field in respect with given intervals.
Parameters
----------
intervx : array, optional
interval wanted along x
intervy : array, optional
interval wanted along y
intervt : array, optional
interval wanted along time
full_output : boolean, optional
If 'True', cutting indices are alson returned
inplace : boolean, optional
If 'True', fields are croped in place.
"""
# check parameters
if intervt is not None:
if not isinstance(intervt, ARRAYTYPES):
raise TypeError()
intervt = np.array(intervt, dtype=float)
if intervt.shape != (2, ):
raise ValueError()
# get wanted times
if intervt is not None:
if not ind:
if intervt[0] < self.times[0]:
ind1 = 0
elif intervt[0] > self.times[-1]:
raise ValueError()
else:
ind1 = np.where(intervt[0] <= self.times)[0][0]
if intervt[1] > self.times[-1]:
ind2 = len(self.times) - 1
elif intervt[1] < self.times[0]:
raise ValueError()
else:
ind2 = np.where(intervt[1] >= self.times)[0][-1]
intervt = [ind1, ind2]
else:
intervt = [int(intervt[0]), int(intervt[1])]
# crop
if inplace:
cropfield = self
else:
cropfield = self.copy()
# temporal
if intervt is not None:
cropfield.fields = cropfield.fields[intervt[0]:intervt[1] + 1]
cropfield.times = cropfield.times[intervt[0]:intervt[1] + 1]
# spatial
fld.Field.crop(cropfield, intervx=intervx, intervy=intervy, ind=ind,
inplace=True)
for field in cropfield.fields:
field.crop(intervx=intervx, intervy=intervy, ind=ind,
inplace=True)
# returning
if not inplace:
return cropfield
[docs] def set_origin(self, x=None, y=None):
"""
Modify the axis in order to place the origin at the actual point (x, y)
Parameters
----------
x : number
y : number
"""
fld.Field.set_origin(self, x, y)
[docs] def copy(self):
"""
Return a copy of the velocityfields
"""
return copy.deepcopy(self)
def __sort_field_by_time(self):
if len(self.fields) in [0, 1]:
return None
ind_sort = np.argsort(self.times)
self.times = self.times[ind_sort]
self.fields = self.fields[ind_sort]
[docs] def display_multiple(self, component=None, kind=None, inds=None,
sharecb=False, sharex=False, sharey=False,
ncol=None, nrow=None, **plotargs):
"""
Display a component of the velocity fields.
Parameters
----------
component : string, optional
component to display
kind : string, optional
Kind of display wanted.
fields_ind : array of indices
Indices of fields to display.
samecb : boolean, optional
If 'True', the same color system is used for all the fields.
You have to pass 'vmin' and 'vmax', to have correct results.
ncol, nrow : int, optional
Wanted number of columns and rows. If not specified, these values
are computed so that ncol ~ nrow.
plotargs : dict, optional
Arguments passed to the function used to display the vector field.
"""
nmb_fields = len(inds)
# getting values
if component is None or component == 'V':
try:
values = [[self.fields[ind].comp_x, self.fields[ind].comp_y]
for ind in inds]
except AttributeError:
values = [self.fields[ind].values for ind in inds]
else:
values = [self.fields[ind].__getattribute__(component)
for ind in inds]
# display
if sharex or sharey:
plotargs['adjustable'] = 'datalim'
db = pplt.Displayer(x=[self.axe_x]*nmb_fields,
y=[self.axe_y]*nmb_fields,
values=values, kind=kind, **plotargs)
plot = db.draw_multiple(inds=list(range(len(inds))), sharecb=sharecb,
sharex=sharex,
sharey=sharey, ncol=ncol, nrow=nrow)
return plot
[docs] def display(self, compo=None, kind=None, sharecb=True, buffer_size=100,
**plotargs):
"""
Create a windows to display temporals field, controlled by buttons.
Parameters
----------
compo: string
Component to plot.
kind: string
Kind of plot to use.
sharecb: boolean
Do all the vector field serie has to share the same colorbar or
not.
buffer_size: number
Number of displays to keep in memory (faster, but use memory).
**plotargs : dic
Arguments passed to the plot command.
Display control
---------------
The display can be controlled useing the button, but also the keyboard:
space, right arrow or + : next field
backspace, left arrow or - : previous field
up arrow : last field
down arrow : first field
number + enter : goto a specific frame
p : play the animated fields
number + i : set the animation increment
number + t : set the animation time interval (ms)
q : close
s : save an image
"""
nmb_fields = len(self.fields)
# getting values
if compo is None or compo == 'V':
try:
values = np.asarray([[field.comp_x, field.comp_y]
for field in self.fields])
except AttributeError:
values = np.asarray([field.values for field in self.fields])
else:
values = np.asarray([field.__getattribute__(compo)
for field in self.fields])
# check arguments for colorbar drawning
if 'norm' in list(plotargs.keys()):
sharecb = True
normcb = plotargs['norm']
else:
normcb = None
# display
db = pplt.Displayer(x=[self.axe_x]*nmb_fields,
y=[self.axe_y]*nmb_fields,
values=values, kind=kind,
buffer_size=buffer_size,
**plotargs)
win = pplt.ButtonManager(db,
xlabel="X " + self.unit_x.strUnit(),
ylabel="Y " + self.unit_y.strUnit(),
sharecb=sharecb, normcb=normcb)
pplt.DataCursorTextDisplayer(db)
return db
[docs] def display_animate(self, compo=None, interval=500, fields_inds=None,
repeat=True,
**plotargs):
"""
Display fields animated in time.
Parameters
----------
compo : string
Composante to display
interval : number, optionnal
interval between two frames in milliseconds.
fields_ind : array of indices
Indices of wanted fields. by default, all the fields are displayed
repeat : boolean, optional
if True, the animation is repeated infinitely.
additional arguments can be passed (scale, vmin, vmax,...)
"""
from matplotlib import animation
if fields_inds is None:
fields_inds = len(self.fields)
# getting data
if self.field_type == vf.VectorField:
if compo == 'V' or compo is None:
comp = self.fields
elif compo == 'magnitude':
comp = self.magnitude_as_sf
elif compo == 'x':
comp = self.Vx_as_sf
elif compo == 'y':
comp = self.Vy_as_sf
elif compo == 'mask':
comp = self.mask_as_sf
else:
raise ValueError()
elif self.field_type == sf.ScalarField:
if compo == 'values' or compo is None:
comp = self.values_as_sf
elif compo == 'mask':
comp = self.mask_as_sf
else:
raise ValueError()
else:
raise TypeError()
if 'kind' in list(plotargs.keys()):
kind = plotargs['kind']
else:
kind = None
# display a vector field (quiver)
if isinstance(comp[0], vf.VectorField)\
and (kind is None or kind == "quiver"):
fig = plt.figure()
ax = plt.gca()
displ = comp[0].display(**plotargs)
ttl = plt.title('')
anim = animation.FuncAnimation(fig, self._update_vf,
frames=fields_inds,
interval=interval, blit=False,
repeat=repeat,
fargs=(fig, ax, displ, ttl, comp,
compo, plotargs))
return anim,
# display a scalar field (contour, contourf or imshow) or a streamplot
elif isinstance(comp[0], sf.ScalarField)\
or isinstance(comp[0], vf.VectorField):
fig = plt.figure()
ax = plt.gca()
displ = comp[0].display(**plotargs)
ttl = plt.suptitle('')
anim = animation.FuncAnimation(fig, self._update_sf,
frames=fields_inds,
interval=interval, blit=False,
repeat=repeat,
fargs=(fig, ax, displ, ttl, comp,
compo, plotargs))
return anim,
else:
raise ValueError("I don't know any '{}' composant".format(compo))
[docs] def record_animation(self, anim, filepath, kind='gif', fps=30, dpi=100,
bitrate=50, imagemagick_path=None):
"""
Record an animation in a gif file.
You must create an animation (using 'display_animate' for example)
before calling this method.
You may have to specify the path to imagemagick in orfer to use it.
Parameters
----------
"""
import matplotlib
if imagemagick_path is None:
imagemagick_path = r"C:\Program Files\ImageMagick\convert.exe"
matplotlib.rc('animation', convert_path=imagemagick_path)
if kind == 'gif':
writer = matplotlib.animation.ImageMagickWriter(fps=fps,
bitrate=bitrate)
anim.save(filepath, writer=writer, fps=fps, dpi=dpi)
elif kind == 'mp4':
anim.save(filepath, writer='fmpeg', fps=fps, bitrate=bitrate)
def _update_sf(self, num, fig, ax, displ, ttl, comp, compo, plotargs):
plt.sca(ax)
ax.cla()
displ = comp[num]._display(**plotargs)
title = "{}, at t={:.3} {}"\
.format(compo, float(self.times[num]),
self.unit_times.strUnit())
ttl.set_text(title)
return displ,
def _update_vf(self, num, fig, ax, displ, ttl, comp, compo, plotargs):
plt.sca(ax)
ax.cla()
displ = comp[num]._display(**plotargs)
title = "{}, at t={:.2f} {}"\
.format(compo, float(self.times[num]),
self.unit_times.strUnit())
ttl.set_text(title)
return displ,