Source code for IMTreatment.core.vectorfield

# -*- coding: utf-8 -*-
#!/bin/env python3

# Copyright (C) 2003-2007 Gaby Launay

# Author: Gaby Launay  <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0

# This file is part of IMTreatment.

# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.

# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import copy
import warnings

import matplotlib.pyplot as plt
import numpy as np
from scipy import ndimage

from .. import plotlib as pplt
import unum
from . import field, scalarfield as sf
from ..utils import make_unit
from ..utils.types import ARRAYTYPES, NUMBERTYPES, STRINGTYPES


[docs]class VectorField(field.Field): """ Class representing a vector field (2D field, with two components on each point). """ def __init__(self): super(VectorField, self).__init__() self.__comp_x = np.array([], dtype=float) self.__comp_y = np.array([], dtype=float) self.__mask = np.array([], dtype=bool) self.__unit_values = make_unit('') def __neg__(self): tmpvf = self.copy() tmpvf.comp_x = -tmpvf.comp_x tmpvf.comp_y = -tmpvf.comp_y return tmpvf def __add__(self, other): if isinstance(other, VectorField): # test unities system try: self.unit_values + other.unit_values self.unit_x + other.unit_x self.unit_y + other.unit_y except: raise ValueError("I think these units don't match, fox") # identical shape and axis if np.all(self.axe_x == other.axe_x) and \ np.all(self.axe_y == other.axe_y): tmpvf = self.copy() fact = (other.unit_values/self.unit_values).asNumber() tmpvf.comp_x = self.comp_x + other.comp_x*fact tmpvf.comp_y = self.comp_y + other.comp_y*fact tmpvf.mask = np.logical_or(self.mask, other.mask) return tmpvf # different shape, partially same axis else: # getting shared points new_ind_x = np.array([np.any(np.abs(val - other.axe_x) < np.abs(val)*1e-4) for val in self.axe_x]) new_ind_y = np.array([np.any(np.abs(val - other.axe_y) < np.abs(val)*1e-4) for val in self.axe_y]) new_ind_xo = np.array([np.any(np.abs(val - self.axe_x) < np.abs(val)*1e-4) for val in other.axe_x]) new_ind_yo = np.array([np.any(np.abs(val - self.axe_y) < np.abs(val)*1e-4) for val in other.axe_y]) if not np.any(new_ind_x) or not np.any(new_ind_y): raise ValueError("Incompatible shapes") new_ind_Y, new_ind_X = np.meshgrid(new_ind_y, new_ind_x) new_ind_value = np.logical_and(new_ind_X, new_ind_Y) new_ind_Yo, new_ind_Xo = np.meshgrid(new_ind_yo, new_ind_xo) new_ind_valueo = np.logical_and(new_ind_Xo, new_ind_Yo) # getting new axis and values new_axe_x = self.axe_x[new_ind_x] new_axe_y = self.axe_y[new_ind_y] fact = other.unit_values/self.unit_values new_comp_x = (self.comp_x[new_ind_value] + other.comp_x[new_ind_valueo] * fact.asNumber()) new_comp_y = (self.comp_y[new_ind_value] + other.comp_y[new_ind_valueo] * fact.asNumber()) new_comp_x = new_comp_x.reshape((len(new_axe_x), len(new_axe_y))) new_comp_y = new_comp_y.reshape((len(new_axe_x), len(new_axe_y))) new_mask = np.logical_or(self.mask[new_ind_value], other.mask[new_ind_valueo]) new_mask = new_mask.reshape((len(new_axe_x), len(new_axe_y))) # creating vf tmpvf = VectorField() tmpvf.import_from_arrays(new_axe_x, new_axe_y, new_comp_x, new_comp_y, mask=new_mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmpvf elif isinstance(other, ARRAYTYPES): other = np.array(other, subok=True) # Same shape if other.shape == self.shape: tmpvf = self.copy() tmpvf.comp_x = self.comp_x + other tmpvf.comp_y = self.comp_y + other tmpvf.mask = self.mask return tmpvf # signle vector elif other.shape == (2,): tmpvf = self.copy() tmpvf.comp_x += other[0] tmpvf.comp_y += other[1] tmpvf.mask = self.mask return tmpvf else: raise ValueError() elif isinstance(other, unum.Unum): tmpvf = self.copy() fact = (other / self.unit_values).asNumber() tmpvf.comp_x = self.comp_x + fact tmpvf.comp_y = self.comp_y + fact tmpvf.mask = self.mask return tmpvf elif isinstance(other, NUMBERTYPES): tmpvf = self.copy() tmpvf.comp_x = self.comp_x + other tmpvf.comp_y = self.comp_y + other tmpvf.mask = self.mask return tmpvf else: raise TypeError("You can only add a velocity field " "with others velocity fields") __radd__ = __add__ def __sub__(self, other): other_tmp = other.__neg__() tmpvf = self.__add__(other_tmp) return tmpvf def __rsub__(self, other): return other + self.__neg__() def __truediv__(self, other): if isinstance(other, ARRAYTYPES): other = np.array(other, subok=True) if other.shape != self.shape: raise ValueError() tmpvf = self.copy() tmpvf.comp_x = self.comp_x / other tmpvf.comp_y = self.comp_y / other tmpvf.mask = np.logical_or(self.mask, other == 0) return tmpvf elif isinstance(other, unum.Unum): tmpvf = self.copy() new_unit = tmpvf.unit_values/other scale = new_unit.asNumber() new_unit /= scale tmpvf.unit_values = new_unit tmpvf.comp_x *= scale tmpvf.comp_y *= scale tmpvf.mask = self.mask return tmpvf elif isinstance(other, NUMBERTYPES): tmpvf = self.copy() tmpvf.comp_x /= other tmpvf.comp_y /= other tmpvf.mask = self.mask return tmpvf else: raise TypeError("You can only divide a vector field " "by numbers") __div__ = __truediv__ def __rtruediv__(self, other): return other * self**(-1) __rdiv__ = __rtruediv__ def __mul__(self, other): if isinstance(other, ARRAYTYPES): other = np.array(other, subok=True) if other.shape != self.shape: raise ValueError() tmpvf = self.copy() tmpvf.comp_x = self.comp_x * other tmpvf.comp_y = self.comp_y * other tmpvf.mask = self.mask return tmpvf elif isinstance(other, unum.Unum): tmpvf = self.copy() new_unit = tmpvf.unit_values*other scale = new_unit.asNumber() new_unit /= scale tmpvf.unit_values = new_unit tmpvf.comp_x *= scale tmpvf.comp_y *= scale tmpvf.mask = self.mask return tmpvf elif isinstance(other, NUMBERTYPES): tmpvf = self.copy() tmpvf.comp_x *= other tmpvf.comp_y *= other tmpvf.mask = self.mask return tmpvf elif isinstance(other, sf.ScalarField): if other.shape != self.shape: raise ValueError() tmpvf = self.copy() tmpvf.comp_x *= other.values tmpvf.comp_y *= other.values tmpvf.mask = np.logical_or(other.mask, self.mask) return tmpvf else: raise TypeError("You cannot multiply Vectorfields object " "with {} objects".format(type(other))) __rmul__ = __mul__ def __pow__(self, number): if not isinstance(number, NUMBERTYPES): raise TypeError("You only can use a number for the power " "on a Vectorfield") tmpvf = self.copy() tmpvf.comp_x = np.power(tmpvf.comp_x, number) tmpvf.comp_y = np.power(tmpvf.comp_y, number) tmpvf.unit_values = np.power(tmpvf.unit_values, number) return tmpvf def __abs__(self): tmpvf = self.copy() tmpvf.comp_x = np.abs(tmpvf.comp_x) tmpvf.comp_y = np.abs(tmpvf.comp_y) return tmpvf def __iter__(self): mask = self.mask datax = self.comp_x datay = self.comp_y for ij, xy in field.Field.__iter__(self): i = ij[0] j = ij[1] if not mask[i, j]: yield ij, xy, [datax[i, j], datay[i, j]] def __eq__(self, obj): if not isinstance(obj, VectorField): return False if not self.shape == obj.shape: return False if not np.all(self.axe_x == obj.axe_x): return False if not np.all(self.axe_y == obj.axe_y): return False if not np.all(self.mask == obj.mask): return False if not np.all(self.comp_x[~self.mask] == obj.comp_x[~self.mask]): return False if not np.all(self.comp_y[~self.mask] == obj.comp_y[~self.mask]): return False if not self.unit_x == obj.unit_x: return False if not self.unit_y == obj.unit_y: return False if not self.unit_values == obj.unit_values: return False return True @property def comp_x(self): return self.__comp_x @comp_x.setter def comp_x(self, new_comp_x): if not isinstance(new_comp_x, ARRAYTYPES): raise TypeError() new_comp_x = np.array(new_comp_x) if not new_comp_x.shape == self.shape: raise ValueError("'comp_x' must be coherent with axis system") # storing dat self.__comp_x = new_comp_x @comp_x.deleter def comp_x(self): raise Exception("Nope, can't do that") @property def comp_x_as_sf(self): tmp_sf = sf.ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.comp_x, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf @property def comp_y(self): return self.__comp_y @comp_y.setter def comp_y(self, new_comp_y): if not isinstance(new_comp_y, ARRAYTYPES): raise TypeError() new_comp_y = np.array(new_comp_y) if not new_comp_y.shape == self.shape: raise ValueError() # storing data self.__comp_y = new_comp_y @comp_y.deleter def comp_y(self): raise Exception("Nope, can't do that") @property def comp_y_as_sf(self): tmp_sf = sf.ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.comp_y, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf @property def mask(self): return self.__mask @mask.setter def mask(self, new_mask): # check 'new_mask' coherence if isinstance(new_mask, bool): fill_value = new_mask new_mask = np.empty(self.shape, dtype=bool) new_mask.fill(fill_value) elif isinstance(new_mask, ARRAYTYPES): if not isinstance(new_mask.flat[0], np.bool_): raise TypeError() new_mask = np.array(new_mask, dtype=bool) else: raise TypeError("'mask' should be an array or a boolean," " not a {}".format(type(new_mask))) if self.shape != new_mask.shape: raise ValueError() # check if the new mask don'r reveal masked values if np.any(np.logical_not(new_mask[self.mask])): raise Warning("This mask reveal masked values, maybe you should" "use the 'fill' function instead") # nanify masked values self.comp_x[new_mask] = np.NaN self.comp_y[new_mask] = np.NaN # store mask self.__mask = new_mask @mask.deleter def mask(self): raise Exception("Nope, can't do that") @property def mask_as_sf(self): tmp_sf = sf.ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.mask, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf @property def unit_values(self): return self.__unit_values @unit_values.setter def unit_values(self, new_unit_values): if isinstance(new_unit_values, unum.Unum): if new_unit_values.asNumber() == 1: self.__unit_values = new_unit_values else: raise ValueError() elif isinstance(new_unit_values, STRINGTYPES): self.__unit_values = make_unit(new_unit_values) else: raise TypeError() @unit_values.deleter def unit_values(self): raise Exception("Nope, can't do that") @property def min(self): return np.min(self.magnitude[~self.mask]) @property def max(self): return np.max(self.magnitude[~self.mask]) @property def magnitude(self): """ Return a scalar field with the velocity field magnitude. """ comp_x, comp_y = self.comp_x, self.comp_y mask = self.mask values = (comp_x**2 + comp_y**2)**(.5) values[mask] = np.NaN return values @property def magnitude_as_sf(self): """ Return a scalarfield with the velocity field magnitude. """ tmp_sf = sf.ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.magnitude, mask=self.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf @property def theta(self): """ Return a scalar field with the vector angle (in reference of the unit_y vector [1, 0]). Parameters ---------- low_velocity_filter : number If not zero, points where V < Vmax*low_velocity_filter are masked. Returns ------- theta_sf : sf.ScalarField object Contening theta field. """ # get data comp_x, comp_y = self.comp_x, self.comp_y not_mask = np.logical_not(self.mask) theta = np.zeros(self.shape) # getting angle norm = self.magnitude not_mask = np.logical_and(not_mask, norm != 0) theta[not_mask] = comp_x[not_mask]/norm[not_mask] theta[not_mask] = np.arccos(theta[not_mask]) tmp_comp_y = comp_y.copy() tmp_comp_y[~not_mask] = 0 sup_not_mask = tmp_comp_y < 0 theta[sup_not_mask] = 2*np.pi - theta[sup_not_mask] return theta @property def theta_as_sf(self): """ Return a scalarfield with the velocity field angles. """ tmp_sf = sf.ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.theta, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf
[docs] def import_from_arrays(self, axe_x, axe_y, comp_x, comp_y, mask=False, unit_x="", unit_y="", unit_values=""): """ Set the vector field from a set of arrays. Parameters ---------- axe_x : array Discretized axis value along x axe_y : array Discretized axis value along y comp_x : array or masked array Values of the x component at the discritized points comp_y : array or masked array Values of the y component at the discritized points mask : array of boolean, optional Mask on comp_x and comp_y unit_x : string, optionnal Unit for the values of axe_x unit_y : string, optionnal Unit for the values of axe_y unit_values : string, optionnal Unit for the field components. """ # overwrite previous self.__clean() # Use numpy arrays axe_x = np.array(axe_x, dtype=float) axe_y = np.array(axe_y, dtype=float) comp_x = np.array(comp_x, dtype=float) comp_y = np.array(comp_y, dtype=float) if mask is not None and not isinstance(mask, bool): mask = np.array(mask, dtype=bool) # Be sure nan values are masked mask = np.logical_or(mask, np.isnan(comp_x)) mask = np.logical_or(mask, np.isnan(comp_y)) # Be sure axes are one-dimensional if axe_x.ndim >= 2: if np.all(axe_x[0, 0] == axe_x[:, 0]): axe_x = axe_x[0] else: axe_x = axe_x[:, 0] if axe_y.ndim >= 2: if np.all(axe_y[0, 0] == axe_y[:, 0]): axe_y = axe_y[0] else: axe_y = axe_y[:, 0] # Check if axis are evenly spaced delta_x = axe_x[1:] - axe_x[:-1] delta_y = axe_y[1:] - axe_y[:-1] epsx_abs = delta_x*1e-6 epsy_abs = delta_y*1e-6 if np.any(delta_y - delta_y[0] > epsy_abs) or \ np.any(delta_x - delta_x[0] > epsx_abs): warnings.warn("Axis are not evenly spaced.\n" "Consider using 'make_evenly_spaced' method" " to avoid bad surprises.") # Be sure comp_x and comp_y are of the good shape if len(axe_x) == comp_x.shape[1] and \ len(axe_y) == comp_x.shape[0] and \ len(axe_x) != len(axe_y): comp_x = comp_x.transpose() comp_y = comp_y.transpose() try: mask = mask.transpose() except: pass # Be sure axes are crescent if axe_x[0] > axe_x[-1]: axe_x = axe_x[::-1] comp_x = comp_x[::-1] comp_y = comp_y[::-1] if axe_y[0] > axe_y[-1]: axe_y = axe_y[::-1] comp_x = comp_x[:, ::-1] comp_y = comp_y[:, ::-1] # Store data self.axe_x = axe_x self.axe_y = axe_y self.comp_x = comp_x self.comp_y = comp_y self.mask = mask self.unit_x = unit_x self.unit_y = unit_y self.unit_values = unit_values
[docs] def check_consistency(self): """ Raise errors if the vectorfield show some incoherences. """ super(VectorField, self).check_consistency() # types if not isinstance(self.comp_x, np.ndarray): raise Exception() if not isinstance(self.comp_y, np.ndarray): raise Exception() if not isinstance(self.mask, np.ndarray): raise Exception() if not isinstance(self.unit_values, unum.Unum): raise Exception() # shapes shape = (len(self.axe_x), len(self.axe_y)) if not np.all(self.comp_x.shape == shape): raise Exception() if not np.all(self.comp_y.shape == shape): raise Exception() if not np.all(self.mask.shape == shape): raise Exception() # null values if np.any(np.isnan(self.comp_x[~self.mask])): raise Exception() if np.any(np.isnan(self.comp_y[~self.mask])): raise Exception()
[docs] def get_props(self): """ Print the VectorField main properties """ text = "Shape: {}".format(self.shape) unit_x = self.unit_x.strUnit() text += "Axe x: [{}..{}]{}".format(self.axe_x[0], self.axe_x[-1], unit_x) unit_y = self.unit_y.strUnit() text += "Axe y: [{}..{}]{}".format(self.axe_y[0], self.axe_y[-1], unit_y) unit_values = self.unit_values.strUnit() xmin = np.min(self.comp_x[~self.mask]) xmax = np.max(self.comp_x[~self.mask]) ymin = np.min(self.comp_y[~self.mask]) ymax = np.max(self.comp_y[~self.mask]) text += "Comp x: [{}..{}]{}".format(xmin, xmax, unit_values) text += "Comp y: [{}..{}]{}".format(ymin, ymax, unit_values) nmb_mask = np.sum(self.mask) nmb_tot = self.shape[0]*self.shape[1] text += "Masked values: {}/{}".format(nmb_mask, nmb_tot) return text
[docs] def get_value(self, x, y, ind=False, unit=False): """ Return the vector field components on the point (x, y). If ind is true, x and y are indices, else, x and y are value on axes (interpolated if necessary). """ return np.array([self.comp_x_as_sf.get_value(x, y, ind=ind, unit=unit), self.comp_y_as_sf.get_value(x, y, ind=ind, unit=unit)])
[docs] def get_profile(self, component, direction, position, ind=False, interp='linear'): """ Return a profile of the vector field component, at the given position (or at least at the nearest possible position). If position is an interval, the fonction return an average profile in this interval. Parameters ---------- component : string in ['vx', 'vy'] component to treat. direction : string in ['x', 'y'] Direction along which we choose a position. position : float or interval of float Position or interval in which we want a profile. ind : boolean, optional If 'True', position is taken as an indice Else (default), position is in the field units. interp : string in ['nearest', 'linear'] if 'nearest', get the profile at the nearest position on the grid, if 'linear', use linear interpolation to get the profile at the exact position Returns ------- profile : Profile object Asked profile. cutposition : array or number Final position or interval in which the profile has been taken. """ if component == 'vx': return self.comp_x_as_sf.get_profile(direction, position, ind, interp=interp) elif component == 'vy': return self.comp_y_as_sf.get_profile(direction, position, ind, interp=interp) else: raise TypeError("'component' must be 'vx' or 'vy'")
[docs] def copy(self): """ Return a copy of the vectorfield. """ return copy.deepcopy(self)
[docs] def get_norm(self, norm=2, normalized='perpoint'): """ Return the field norm """ values = np.concatenate((self.comp_x[~self.mask], self.comp_y[~self.mask])) res = (np.sum(np.abs(values)**norm))**(1./norm) if normalized == "perpoint": res /= np.sum(~self.mask) return res
[docs] def scale(self, scalex=None, scaley=None, scalev=None, inplace=False): """ Scale the VectorField. Parameters ---------- scalex, scaley, scalev : numbers or Unum objects Scale for the axis and the values. inplace : boolean . """ if inplace: tmp_f = self else: tmp_f = self.copy() # xy revx, revy = field.Field.scale(tmp_f, scalex=scalex, scaley=scaley, inplace=True, output_reverse=True) # v if scalev is None: pass elif isinstance(scalev, NUMBERTYPES): tmp_f.comp_x *= scalev tmp_f.comp_y *= scalev elif isinstance(scalev, unum.Unum): new_unit = tmp_f.unit_values*scalev fact = new_unit.asNumber() new_unit /= fact tmp_f.unit_values = new_unit tmp_f.comp_x *= fact tmp_f.comp_y *= fact else: raise TypeError() if revx and revy: tmp_f.comp_x = -tmp_f.comp_x[::-1, ::-1] tmp_f.comp_y = -tmp_f.comp_y[::-1, ::-1] elif revx: tmp_f.comp_x = -tmp_f.comp_x[::-1, :] tmp_f.comp_y = tmp_f.comp_y[::-1, :] elif revy: tmp_f.comp_x = tmp_f.comp_x[:, ::-1] tmp_f.comp_y = -tmp_f.comp_y[:, ::-1] # returning if not inplace: return tmp_f
[docs] def rotate(self, angle, inplace=False): """ Rotate the vector field. Parameters ---------- angle : integer Angle in degrees (positive for trigonometric direction). In order to preserve the orthogonal grid, only multiples of 90° are accepted (can be negative multiples). inplace : boolean, optional If 'True', vector field is rotated in place, else, the function return a rotated field. Returns ------- rotated_field : VectorField object, optional Rotated vector field. """ # check params if not isinstance(angle, NUMBERTYPES): raise TypeError() if angle % 90 != 0: raise ValueError() if not isinstance(inplace, bool): raise TypeError() # get data if inplace: tmp_field = self else: tmp_field = self.copy() # normalize angle angle = angle % 360 # rotate the parent field.Field.rotate(tmp_field, angle, inplace=True) # rotate nmb_rot90 = int(angle/90) comp_x = np.rot90(tmp_field.comp_x, nmb_rot90) comp_y = np.rot90(tmp_field.comp_y, nmb_rot90) mask = np.rot90(tmp_field.mask, nmb_rot90) comp_x2 = np.cos(angle/180.*np.pi)*comp_x - \ np.sin(angle/180.*np.pi)*comp_y comp_y2 = np.cos(angle/180.*np.pi)*comp_y + \ np.sin(angle/180.*np.pi)*comp_x tmp_field.__comp_x, tmp_field.__comp_y = comp_x, comp_y tmp_field.__comp_x = comp_x2 tmp_field.__comp_y = comp_y2 tmp_field.__mask = mask # returning if not inplace: return tmp_field
[docs] def change_unit(self, axe, new_unit): """ Change the unit of an axe. Parameters ---------- axe : string 'y' for changing the profile y axis unit 'x' for changing the profile x axis unit 'values' or changing values unit new_unit : Unum.unit object or string The new unit. """ if isinstance(new_unit, STRINGTYPES): new_unit = make_unit(new_unit) if not isinstance(new_unit, unum.Unum): raise TypeError() if not isinstance(axe, STRINGTYPES): raise TypeError() if axe == 'x': field.Field.change_unit(self, axe, new_unit) elif axe == 'y': field.Field.change_unit(self, axe, new_unit) elif axe == 'values': old_unit = self.unit_values new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.comp_x *= fact self.comp_y *= fact self.unit_values = new_unit/fact else: raise ValueError()
[docs] def smooth(self, tos='uniform', size=None, inplace=False, **kw): """ Smooth the vectorfield in place. Warning : fill up the field (should be used carefully with masked field borders) Parameters ---------- tos : string, optional Type of smoothing, can be 'uniform' (default) or 'gaussian' (See ndimage module documentation for more details) size : number, optional Size of the smoothing (is radius for 'uniform' and sigma for 'gaussian'). Default is 3 for 'uniform' and 1 for 'gaussian'. inplace : boolean, optional . kw : dic Additional parameters for ndimage methods (See ndimage documentation) """ if not isinstance(tos, STRINGTYPES): raise TypeError("'tos' must be a string") if size is None and tos == 'uniform': size = 3 elif size is None and tos == 'gaussian': size = 1 # getting field if inplace: tmp_vf = self else: tmp_vf = self.copy() # filling up the field before smoothing tmp_vf.fill(inplace=True) # smoothing if tos == "uniform": tmp_vf.comp_x = ndimage.uniform_filter(tmp_vf.comp_x, size, **kw) tmp_vf.comp_y = ndimage.uniform_filter(tmp_vf.comp_y, size, **kw) elif tos == "gaussian": tmp_vf.comp_x = ndimage.gaussian_filter(tmp_vf.comp_x, size, **kw) tmp_vf.comp_y = ndimage.gaussian_filter(tmp_vf.comp_y, size, **kw) else: raise ValueError("'tos' must be 'uniform' or 'gaussian'") # storing if not inplace: return tmp_vf
[docs] def make_evenly_spaced(self, interp="linear", res=1): """ Use interpolation to make the field evenly spaced Parameters ---------- interp : {‘linear’, ‘cubic’, ‘quintic’}, optional The kind of spline interpolation to use. Default is ‘linear’. res : number Resolution of the resulting field. A value of 1 meaning a spatial resolution equal to the smallest space along the two axis for the initial field. """ # get data vx = self.comp_x_as_sf vy = self.comp_y_as_sf # vx.make_evenly_spaced(interp=interp, res=res) vy.make_evenly_spaced(interp=interp, res=res) # store self.import_from_arrays(vx.axe_x, vx.axe_y, vx.values, vy.values, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values)
[docs] def fill(self, kind='linear', value=[0., 0.], inplace=False, reduce_tri=True, crop=False): """ Fill the masked part of the array. Parameters ---------- kind : string, optional Type of algorithm used to fill. 'value' : fill with the given value 'nearest' : fill with the nearest value 'linear' (default): fill using linear interpolation (Delaunay triangulation) 'cubic' : fill using cubic interpolation (Delaunay triangulation) value : 2x1 array of numbers Values used to fill (for kind='value'). inplace : boolean, optional If 'True', fill the sf.ScalarField in place. If 'False' (default), return a filled version of the field. reduce_tri : boolean, optional If 'True', treatment is used to reduce the triangulation effort (faster when a lot of masked values) If 'False', no treatment (faster when few masked values) crop : boolean, optional If 'True', TVF borders are croped before filling. """ # check parameters coherence if isinstance(value, NUMBERTYPES): value = [value, value] if not isinstance(value, ARRAYTYPES): raise TypeError() value = np.array(value) if not value.shape == (2,): raise ValueError() if crop: self.crop_masked_border(hard=False, inplace=True) # filling components comp_x = self.comp_x_as_sf comp_y = self.comp_y_as_sf new_comp_x = comp_x.fill(kind=kind, value=value[0], inplace=False, reduce_tri=reduce_tri) new_comp_y = comp_y.fill(kind=kind, value=value[1], inplace=False, reduce_tri=reduce_tri) # returning if inplace: self.comp_x = new_comp_x.values self.comp_y = new_comp_y.values mask = np.empty(self.shape, dtype=bool) mask.fill(False) self.__mask = mask else: vf = VectorField() vf.import_from_arrays(self.axe_x, self.axe_y, new_comp_x.values, new_comp_y.values, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return vf
[docs] def crop(self, intervx=None, intervy=None, ind=False, inplace=False): """ Crop the area in respect with given intervals. Parameters ---------- intervx : array, optional interval wanted along x intervy : array, optional interval wanted along y ind : boolean, optional If 'True', intervals are understood as indices along axis. If 'False' (default), intervals are understood in axis units. inplace : boolean, optional If 'True', the field is croped in place. """ if inplace: indmin_x, indmax_x, indmin_y, indmax_y = \ field.Field.crop(self, intervx, intervy, full_output=True, ind=ind, inplace=True) self.__comp_x = self.comp_x[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] self.__comp_y = self.comp_y[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] self.__mask = self.mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] else: indmin_x, indmax_x, indmin_y, indmax_y, cropfield = \ field.Field.crop(self, intervx=intervx, intervy=intervy, full_output=True, ind=ind) cropfield.__comp_x = self.comp_x[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] cropfield.__comp_y = self.comp_y[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] cropfield.__mask = self.mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] return cropfield
[docs] def crop_masked_border(self, hard=False, inplace=False): """ Crop the masked border of the field in place. Parameters ---------- hard : boolean, optional If 'True', partially masked border are croped as well. """ # if inplace: tmp_sf = self else: tmp_sf = self.copy() # checking masked values presence mask = tmp_sf.mask if not np.any(mask): return None # hard cropping if hard: # remove trivial borders tmp_sf.crop_masked_border(hard=False, inplace=True) # until there is no more masked values while np.any(tmp_sf.mask): # getting number of masked value on each border bd1 = np.sum(tmp_sf.mask[0, :]) bd2 = np.sum(tmp_sf.mask[-1, :]) bd3 = np.sum(tmp_sf.mask[:, 0]) bd4 = np.sum(tmp_sf.mask[:, -1]) # getting more masked border more_masked = np.argmax([bd1, bd2, bd3, bd4]) # deleting more masked border if more_masked == 0: len_x = len(tmp_sf.axe_x) tmp_sf.crop(intervx=[1, len_x], ind=True, inplace=True) elif more_masked == 1: len_x = len(tmp_sf.axe_x) tmp_sf.crop(intervx=[0, len_x - 2], ind=True, inplace=True) elif more_masked == 2: len_y = len(self.axe_y) tmp_sf.crop(intervy=[1, len_y], ind=True, inplace=True) elif more_masked == 3: len_y = len(tmp_sf.axe_y) tmp_sf.crop(intervy=[0, len_y - 2], ind=True, inplace=True) # soft cropping else: axe_x_m = np.logical_not(np.all(mask, axis=1)) axe_y_m = np.logical_not(np.all(mask, axis=0)) axe_x_min = np.where(axe_x_m)[0][0] axe_x_max = np.where(axe_x_m)[0][-1] axe_y_min = np.where(axe_y_m)[0][0] axe_y_max = np.where(axe_y_m)[0][-1] tmp_sf.crop([axe_x_min, axe_x_max], [axe_y_min, axe_y_max], ind=True, inplace=True) # returning if not inplace: return tmp_sf
[docs] def extend(self, nmb_left=0, nmb_right=0, nmb_up=0, nmb_down=0, inplace=False): """ Add columns or lines of masked values at the vectorfield. Parameters ---------- nmb_**** : integers Number of lines/columns to add in each direction. inplace : bool If 'False', return a new extended field, if 'True', modify the field inplace. Returns ------- Extended_field : Field object, optional Extended field. """ if inplace: field.Field.extend(self, nmb_left=nmb_left, nmb_right=nmb_right, nmb_up=nmb_up, nmb_down=nmb_down, inplace=True) new_shape = self.shape else: new_field = field.Field.extend(self, nmb_left=nmb_left, nmb_right=nmb_right, nmb_up=nmb_up, nmb_down=nmb_down, inplace=False) new_shape = new_field.shape new_Vx = np.zeros(new_shape, dtype=float) new_Vy = np.zeros(new_shape, dtype=float) if nmb_right == 0: slice_x = slice(nmb_left, new_Vx.shape[0] + 2) else: slice_x = slice(nmb_left, -nmb_right) if nmb_up == 0: slice_y = slice(nmb_down, new_Vx.shape[1] + 2) else: slice_y = slice(nmb_down, -nmb_up) new_Vx[slice_x, slice_y] = self.comp_x new_Vy[slice_x, slice_y] = self.comp_y new_mask = np.ones(new_shape, dtype=bool) new_mask[slice_x, slice_y] = self.mask if inplace: self.comp_x = new_Vx self.comp_y = new_Vy self.__mask = new_mask else: new_field.comp_x = new_Vx new_field.comp_y = new_Vy new_field.__mask = new_mask return new_field
[docs] def mirroring(self, direction, position, inds_to_mirror='all', mir_coef=1., inplace=False, interp=None, value=[0, 0]): """ Return a field with additional mirrored values. Parameters ---------- direction : string in ['x', 'y'] Axe on which place the symetry plane. position : number Position of the symetry plane along the given axe inds_to_mirror : integer Number of vector rows to symetrize (default is all) mir_coef : number or 2x1 array, optional Optional coefficient(s) applied only to the mirrored values. If ana array first value is for 'comp_x' and second one to 'comp_y' inplace : boolean, optional . interp : string, optional If specified, method used to fill the gap near the symetry plane by interpoaltion. 'value' : fill with the given value, 'nearest' : fill with the nearest value, 'linear' (default): fill using linear interpolation (Delaunay triangulation), 'cubic' : fill using cubic interpolation (Delaunay triangulation) value : array, optional Value at the symetry plane, in case of interpolation """ if direction not in ['x', 'y']: raise ValueError() # getting components vx = self.comp_x_as_sf vy = self.comp_y_as_sf xy_scale = self.xy_scale # treating sign changments if isinstance(mir_coef, NUMBERTYPES): if direction == 'x': coefx = -1 coefy = 1 else: coefx = 1 coefy = -1 coefx *= mir_coef coefy *= mir_coef elif isinstance(mir_coef, ARRAYTYPES): coefx = mir_coef[0] coefy = mir_coef[1] else: raise ValueError() # mirroring on components vx.mirroring(direction, position, inds_to_mirror=inds_to_mirror, interp=interp, value=value[0], inplace=True, mir_coef=coefx) vy.mirroring(direction, position, inds_to_mirror=inds_to_mirror, interp=interp, value=value[1], inplace=True, mir_coef=coefy) # storing if inplace: tmp_vf = self else: tmp_vf = VectorField() mask = np.logical_or(vx.mask, vy.mask) tmp_vf.import_from_arrays(vx.axe_x, vx.axe_y, vx.values, vy.values, mask=mask, unit_x=vx.unit_x, unit_y=vy.unit_y, unit_values=vx.unit_values) tmp_vf.xy_scale = xy_scale # returning if not inplace: return tmp_vf
[docs] def reduce_spatial_resolution(self, fact, inplace=False): """ Reduce the spatial resolution of the field by a factor 'fact' Parameters ---------- fact : int Reducing factor. inplace : boolean, optional . """ # reducing Vx = self.comp_x_as_sf Vy = self.comp_y_as_sf Vx.reduce_spatial_resolution(fact, inplace=True) Vy.reduce_spatial_resolution(fact, inplace=True) # returning if inplace: self.__init__() self.import_from_arrays(Vx.axe_x, Vx.axe_y, Vx.values, Vy.values, mask=Vx.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) else: vf = VectorField() vf.import_from_arrays(Vx.axe_x, Vx.axe_y, Vx.values, Vy.values, mask=Vx.mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return vf
def __clean(self): self.__init__() def _display(self, component=None, kind=None, axis='image', **plotargs): # adapt xy scale if streamplot is needed if kind == "stream" and self.xy_scale.asNumber() != 1.: magn = self.magnitude new_comp_x = self.comp_x*self.xy_scale.asNumber() new_magn = (new_comp_x**2 + self.comp_y**2)**.5 values = [new_comp_x/new_magn*magn, self.comp_y/new_magn*magn] elif component in [None, 'V']: values = [self.comp_x, self.comp_y] elif component in ['comp_x', 'x']: values = self.comp_x elif component in ['comp_y', 'y']: values = self.comp_y else: values = self.__getattribute__(component) dp = pplt.Displayer(x=self.axe_x, y=self.axe_y, values=values, kind=kind, **plotargs) plot = dp.draw(cb=False) pplt.DataCursorTextDisplayer(dp) unit_x, unit_y = self.unit_x, self.unit_y if unit_x.strUnit() == "[]": plt.xlabel("x") else: plt.xlabel("x " + unit_x.strUnit()) if unit_y.strUnit() == "[]": plt.ylabel("y") else: plt.ylabel("y " + unit_y.strUnit()) return plot
[docs] def display(self, component=None, kind=None, **plotargs): """ Display something from the vector field. If component is not given, a quiver is displayed. If component is an integer, the coresponding component of the field is displayed. Parameters ---------- component : string, optional Component to display, can be 'V', 'x', 'y', 'mask' kind : string, optinnal Scalar plots : if 'None': each datas are plotted (imshow), if 'contour': contours are ploted (contour), if 'contourf': filled contours are ploted (contourf). Vector plots : if 'quiver': quiver plot, if 'stream': streamlines. plotargs : dict Arguments passed to the function used to display the vector field. Returns ------- fig : figure reference Reference to the displayed figure """ displ = self._display(component, kind, **plotargs) unit_values = self.unit_values # Vx, Vy = self.comp_x, self.comp_y if component is None or component == 'V': if kind == 'quiver' or kind is None: if 'C' not in list(plotargs.keys()): cb = plt.colorbar(displ) if unit_values.strUnit() == "[]": cb.set_label("Magnitude") else: cb.set_label("Magnitude " + unit_values.strUnit()) # legendarrow = round(np.max([Vx.max(), Vy.max()])) # plt.quiverkey(displ, 1.075, 1.075, legendarrow, # "$" + str(legendarrow) # + unit_values.strUnit() + "$", # labelpos='W', fontproperties={'weight': 'bold'}) elif kind in ['stream', 'track']: if 'color' not in list(plotargs.keys()): cb = plt.colorbar(displ.lines) if unit_values.strUnit() == "[]": cb.set_label("Magnitude") else: cb.set_label("Magnitude " + unit_values.strUnit()) elif component in ['x', 'comp_x']: cb = plt.colorbar(displ) if unit_values.strUnit() == "[]": cb.set_label("Vx") else: cb.set_label("Vx " + unit_values.strUnit()) elif component in ['y', 'comp_y']: cb = plt.colorbar(displ) if unit_values.strUnit() == "[]": cb.set_label("Vy") else: cb.set_label("Vy " + unit_values.strUnit()) elif component == 'mask': cb = plt.colorbar(displ) cb.set_label("Mask") elif component == 'magnitude': cb = plt.colorbar(displ) if unit_values.strUnit() == "[]": cb.set_label("Magnitude") else: cb.set_label("Magnitude " + unit_values.strUnit()) else: raise ValueError("Unknown 'component' value") plt.title("") return displ