Source code for IMTreatment.core.scalarfield

# -*- coding: utf-8 -*-
#!/bin/env python3

# Copyright (C) 2003-2007 Gaby Launay

# Author: Gaby Launay  <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0

# This file is part of IMTreatment.

# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.

# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import copy
import warnings

import scipy.ndimage.measurements as msr
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate as spinterp
import unum
from scipy import ndimage

from .. import plotlib as pplt
from . import field as fld, profile as prof, points as pts
from ..utils import make_unit
from ..utils.types import ARRAYTYPES, NUMBERTYPES, STRINGTYPES


[docs]class ScalarField(fld.Field): """ Class representing a scalar field (2D field, with one component on each point). """ def __init__(self): fld.Field.__init__(self) self.__values = np.array([]) self._values_dtype = None self.__mask = np.array([], dtype=bool) self.__unit_values = make_unit("") def __eq__(self, another): if not isinstance(another, ScalarField): return False if not np.allclose(self.axe_x, another.axe_x): return False if not np.allclose(self.axe_y, another.axe_y): return False if not np.all(self.mask == another.mask): return False if not np.allclose(self.values[~self.mask], another.values[~another.mask]): return False if not np.all(self.unit_x == another.unit_x): return False if not np.all(self.unit_y == another.unit_y): return False if not np.all(self.unit_values == another.unit_values): return False return True def __neg__(self): tmpsf = self.copy() tmpsf.values = -tmpsf.values return tmpsf def __add__(self, otherone): # if we add with a ScalarField object if isinstance(otherone, ScalarField): # test unities system try: self.unit_values + otherone.unit_values self.unit_x + otherone.unit_x self.unit_y + otherone.unit_y except: raise ValueError("I think these units don't match, fox") # identical shape and axis if np.all(self.axe_x == otherone.axe_x) and \ np.all(self.axe_y == otherone.axe_y): tmpsf = self.copy() fact = otherone.unit_values/self.unit_values tmpsf.values += np.array(otherone.values*fact.asNumber(), dtype=self._values_dtype) tmpsf.mask = np.logical_or(self.mask, otherone.mask) # different shape, partially same axis else: # getting shared points new_ind_x = np.array([val in otherone.axe_x for val in self.axe_x]) new_ind_y = np.array([val in otherone.axe_y for val in self.axe_y]) new_ind_xo = np.array([val in self.axe_x for val in otherone.axe_x]) new_ind_yo = np.array([val in self.axe_y for val in otherone.axe_y]) if not np.any(new_ind_x) or not np.any(new_ind_y): raise ValueError("Incompatible shapes") new_ind_Y, new_ind_X = np.meshgrid(new_ind_y, new_ind_x) new_ind_value = np.logical_and(new_ind_X, new_ind_Y) new_ind_Yo, new_ind_Xo = np.meshgrid(new_ind_yo, new_ind_xo) new_ind_valueo = np.logical_and(new_ind_Xo, new_ind_Yo) # getting new axis and values new_axe_x = self.axe_x[new_ind_x] new_axe_y = self.axe_y[new_ind_y] fact = otherone.unit_values/self.unit_values new_values = (self.values[new_ind_value] + otherone.values[new_ind_valueo] * fact.asNumber()) new_values = new_values.reshape((len(new_axe_x), len(new_axe_y))) new_mask = np.logical_or(self.mask[new_ind_value], otherone.mask[new_ind_valueo]) new_mask = new_mask.reshape((len(new_axe_x), len(new_axe_y))) # creating sf tmpsf = ScalarField() tmpsf.import_from_arrays(new_axe_x, new_axe_y, new_values, mask=new_mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmpsf # if we add with a number elif isinstance(otherone, NUMBERTYPES): tmpsf = self.copy() tmpsf.values += otherone return tmpsf elif isinstance(otherone, unum.Unum): try: self.unit_values + otherone except: raise ValueError("Given number have to be consistent with" "the scalar field (same units)") tmpsf = self.copy() tmpsf.values += (otherone/self.unit_values).asNumber() return tmpsf else: raise TypeError("You can only add a scalarfield " "with others scalarfields or with numbers") def __radd__(self, obj): return self.__add__(obj) def __sub__(self, obj): return self.__add__(-obj) def __rsub__(self, obj): return self.__neg__() + obj def __truediv__(self, obj): if isinstance(obj, NUMBERTYPES): tmpsf = self.copy() tmpsf.values /= obj return tmpsf elif isinstance(obj, unum.Unum): tmpsf = self.copy() unit_values = tmpsf.unit_values / obj tmpsf.values *= unit_values.asNumber() unit_values /= unit_values.asNumber() tmpsf.unit_values = unit_values return tmpsf elif isinstance(obj, ARRAYTYPES): obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() tmpsf = self.copy() mask = np.logical_or(self.mask, obj == 0) not_mask = np.logical_not(mask) tmpsf.values[not_mask] = tmpsf.values[not_mask] / obj[not_mask] tmpsf.mask = mask return tmpsf elif isinstance(obj, ScalarField): if np.any(self.axe_x != obj.axe_x)\ or np.any(self.axe_y != obj.axe_y)\ or self.unit_x != obj.unit_x\ or self.unit_y != obj.unit_y: raise ValueError("Fields are not consistent") tmpsf = self.copy() filt_nan = obj.values != 0 values = np.zeros(shape=self.values.shape) values[filt_nan] = self.values[filt_nan]/obj.values[filt_nan] mask = np.logical_or(self.mask, obj.mask) mask = np.logical_or(mask, np.logical_not(filt_nan)) unit = self.unit_values / obj.unit_values tmpsf.values = values*unit.asNumber() tmpsf.mask = mask tmpsf.unit_values = unit/unit.asNumber() return tmpsf else: raise TypeError("Unsupported operation between {} and a " "ScalarField object".format(type(obj))) __div__ = __truediv__ def __rtruediv__(self, obj): if isinstance(obj, NUMBERTYPES): tmpsf = self.copy() tmpsf.values = obj/tmpsf.values tmpsf.unit_values = 1/tmpsf.unit_values return tmpsf elif isinstance(obj, unum.Unum): tmpsf = self.copy() tmpsf.values = obj.asNumber()/tmpsf.values tmpsf.unit_values = obj/obj.asNumber()/tmpsf.unit_values return tmpsf elif isinstance(obj, ARRAYTYPES): obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() tmpsf = self.copy() mask = np.logical_or(self.mask, obj == 0) not_mask = np.logical_not(mask) tmpsf.values[not_mask] = obj[not_mask] / tmpsf.values[not_mask] tmpsf.mask = mask return tmpsf elif isinstance(obj, ScalarField): if np.any(self.axe_x != obj.axe_x)\ or np.any(self.axe_y != obj.axe_y)\ or self.unit_x != obj.unit_x\ or self.unit_y != obj.unit_y: raise ValueError("Fields are not consistent") tmpsf = self.copy() values = obj.values / self.values mask = np.logical_or(self.mask, obj.mask) unit = obj.unit_values / self.unit_values tmpsf.values = values*unit.asNumber() tmpsf.mask = mask tmpsf.unit_values = unit/unit.asNumber() return tmpsf else: raise TypeError("Unsupported operation between {} and a " "ScalarField object".format(type(obj))) def __mul__(self, obj): if isinstance(obj, NUMBERTYPES): tmpsf = self.copy() tmpsf.values *= obj tmpsf.mask = self.mask return tmpsf elif isinstance(obj, unum.Unum): tmpsf = self.copy() tmpsf.values *= obj.asNumber() tmpsf.unit_values *= obj/obj.asNumber() tmpsf.mask = self.mask return tmpsf elif isinstance(obj, ARRAYTYPES): obj = np.array(obj, subok=True) if not obj.shape == self.shape: raise ValueError() tmpsf = self.copy() mask = self.mask not_mask = np.logical_not(mask) tmpsf.values[not_mask] *= obj[not_mask] tmpsf.mask = mask return tmpsf elif isinstance(obj, np.ma.MaskedArray): if obj.shape != self.values.shape: raise ValueError("Fields are not consistent") tmpsf = self.copy() tmpsf.values *= obj tmpsf.mask = obj.mask return tmpsf elif isinstance(obj, ScalarField): if np.any(self.axe_x != obj.axe_x)\ or np.any(self.axe_y != obj.axe_y)\ or self.unit_x != obj.unit_x\ or self.unit_y != obj.unit_y: raise ValueError("Fields are not consistent") tmpsf = self.copy() values = self.values * obj.values mask = np.logical_or(self.mask, obj.mask) unit = self.unit_values * obj.unit_values tmpsf.values = values*unit.asNumber() tmpsf.mask = mask tmpsf.unit_values = unit/unit.asNumber() return tmpsf else: raise TypeError("Unsupported operation between {} and a " "ScalarField object".format(type(obj))) __rmul__ = __mul__ def __abs__(self): tmpsf = self.copy() tmpsf.values = np.abs(tmpsf.values) return tmpsf def __pow__(self, number): if not isinstance(number, NUMBERTYPES): raise TypeError("You only can use a number for the power " "on a Scalar field") tmpsf = self.copy() tmpsf.values[np.logical_not(tmpsf.mask)] \ = np.power(tmpsf.values[np.logical_not(tmpsf.mask)], number) tmpsf.mask = self.mask tmpsf.unit_values = np.power(tmpsf.unit_values, number) return tmpsf def __iter__(self): data = self.values mask = self.mask for ij, xy in fld.Field.__iter__(self): i = ij[0] j = ij[1] if not mask[i, j]: yield ij, xy, data[i, j] @property def values(self): val = self.__values try: val[self.mask] = np.nan except ValueError: val[self.mask] = 0 return val @values.setter def values(self, new_values): if not isinstance(new_values, ARRAYTYPES): raise TypeError() new_values = np.asarray(new_values) if self.shape == new_values.shape: # adapting mask to 'nan' values self.__mask = np.isnan(new_values) # storing data self.__values = new_values else: raise ValueError("'values' should have the same shape as the " "original values : {}, not {}." .format(self.shape, new_values.shape)) @values.deleter def values(self): raise Exception("Nope, can't do that") @property def mask(self): return self.__mask @mask.setter def mask(self, new_mask): # check 'new_mask' coherence if isinstance(new_mask, (bool, np.bool_)): fill_value = new_mask new_mask = np.empty(self.shape, dtype=bool) new_mask.fill(fill_value) elif isinstance(new_mask, ARRAYTYPES): new_mask = np.asarray(new_mask, dtype=bool) else: raise TypeError("'mask' should be an array or a boolean," " not a {}".format(type(new_mask))) if self.shape != new_mask.shape: raise ValueError() # # check if the new mask don'r reveal masked values # if np.any(np.logical_not(new_mask[self.mask])): # raise Warning("This mask reveal masked values, maybe you should" # "use the 'fill' function instead") # # nanify masked values # self.values[new_mask] = np.NaN # store mask self.__mask = new_mask @mask.deleter def mask(self): raise Exception("Nope, can't do that") @property def mask_as_sf(self): tmp_sf = ScalarField() tmp_sf.import_from_arrays(self.axe_x, self.axe_y, self.mask, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return tmp_sf @property def unit_values(self): return self.__unit_values @unit_values.setter def unit_values(self, new_unit_values): if isinstance(new_unit_values, unum.Unum): if (new_unit_values.asNumber() - 1.) < 1e-10: self.__unit_values = new_unit_values else: raise ValueError() elif isinstance(new_unit_values, STRINGTYPES): self.__unit_values = make_unit(new_unit_values) else: raise TypeError() @unit_values.deleter def unit_values(self): raise Exception("Nope, can't do that") @property def min(self): return np.min(self.values[np.logical_not(self.mask)]) @property def max(self): return np.max(self.values[np.logical_not(self.mask)]) @property def mean(self): return np.mean(self.values[np.logical_not(self.mask)]) @property def median(self): return np.median(self.values[np.logical_not(self.mask)])
[docs] def import_from_arrays(self, axe_x, axe_y, values, mask=None, unit_x="", unit_y="", unit_values="", dtype=np.float, dontchecknans=False, dontcheckunits=False): """ Set the field from a set of arrays. Parameters ---------- axe_x : array Discretized axis value along x axe_y : array Discretized axis value along y values : array or masked array Values of the field at the discritized points unit_x : String unit, optionnal Unit for the values of axe_x unit_y : String unit, optionnal Unit for the values of axe_y unit_values : String unit, optionnal Unit for the scalar field dontchecknans: boolean Don't check for nans values (faster) dontcheckunits: boolean Don't check for unit consistency (faster) dtype: Numerical type Numerical type to store the data to. Should be a type supported by numpy arrays. Default to 'float'. """ # # overwrite previous # self.__clean() # Use numpy arrays self._values_dtype = dtype axe_x = np.array(axe_x, dtype=float) axe_y = np.array(axe_y, dtype=float) values = np.array(values, dtype=self._values_dtype) if mask is None: mask = np.zeros(values.shape, dtype=bool) elif isinstance(mask, bool): nmask = np.empty(values.shape, dtype=bool) nmask.fill(mask) mask = nmask else: mask = np.array(mask, dtype=bool) # Be sure nan values are masked if not dontchecknans: if np.isnan(np.sum(values)): mask = np.logical_or(mask, np.isnan(values)) # Be sure axes are one-dimensional if axe_x.ndim >= 2: if np.all(axe_x[0, 0] == axe_x[:, 0]): axe_x = axe_x[0] else: axe_x = axe_x[:, 0] if axe_y.ndim >= 2: if np.all(axe_y[0, 0] == axe_y[:, 0]): axe_y = axe_y[0] else: axe_y = axe_y[:, 0] # Check if axis are evenly spaced delta_x = axe_x[1:] - axe_x[:-1] delta_y = axe_y[1:] - axe_y[:-1] epsx_abs = delta_x*1e-6 epsy_abs = delta_y*1e-6 if np.any(delta_y - delta_y[0] > epsy_abs) or \ np.any(delta_x - delta_x[0] > epsx_abs): warnings.warn("Axis are not evenly spaced.\n" "Consider using 'make_evenly_spaced' method" " to avoid bad surprises.") # Be sure values is of the good shape if len(axe_x) == values.shape[1] and \ len(axe_y) == values.shape[0] and \ len(axe_x) != len(axe_y): values = values.transpose() try: mask = mask.transpose() except: pass # Be sure axes are crescent if axe_x[0] > axe_x[-1]: axe_x = axe_x[::-1] values = values[::-1] if axe_y[0] > axe_y[-1]: axe_y = axe_y[::-1] values = values[:, ::-1] # storing datas self.axe_x = axe_x self.axe_y = axe_y self.__values = values self.__mask = mask if dontcheckunits: self._Field__unit_x = unit_x self._Field__unit_y = unit_y else: self.unit_x = unit_x self.unit_y = unit_y self.unit_values = unit_values
[docs] def get_props(self): """ Print the ScalarField main properties """ text = "Shape: {}".format(self.shape) unit_x = self.unit_x.strUnit() text += "Axe x: [{}..{}]{}".format(self.axe_x[0], self.axe_x[-1], unit_x) unit_y = self.unit_y.strUnit() text += "Axe y: [{}..{}]{}".format(self.axe_y[0], self.axe_y[-1], unit_y) unit_values = self.unit_values.strUnit() text += "Values: [{}..{}]{}".format(self.min, self.max, unit_values) nmb_mask = np.sum(self.mask) nmb_tot = self.shape[0]*self.shape[1] text += "Masked values: {}/{}".format(nmb_mask, nmb_tot) return text
[docs] def get_value(self, x, y, ind=False, unit=False): """ Return the scalar field value on the point (x, y). If ind is true, x and y are indices, else, x and y are value on axes (interpolated if necessary). """ if not isinstance(ind, bool): raise TypeError("'ind' must be a boolean") if ind: if not isinstance(x, int) or not isinstance(y, int): raise TypeError("'x' and 'y' must be integers") if x > len(self.axe_x) - 1 or y > len(self.axe_y) - 1\ or x < 0 or y < 0: raise ValueError("'x' and 'y' must be correct indices") else: if not isinstance(x, NUMBERTYPES)\ or not isinstance(y, NUMBERTYPES): raise TypeError("'x' and 'y' must be numbers") if x > self.axe_x[-1] or y > self.axe_y[-1]\ or x < self.axe_x[0] or y < self.axe_y[0]: raise ValueError("'x' and 'y' are out of axes") if unit: unit = self.unit_values else: unit = 1. if ind: return self.values[x, y]*unit else: ind_x = None ind_y = None # getting indices interval inds_x = self.get_indice_on_axe(1, x) inds_y = self.get_indice_on_axe(2, y) # if something masked if np.sum(self.mask[inds_x, inds_y]) != 0: res = np.NaN # if we are on a grid point elif inds_x[0] == inds_x[1] and inds_y[0] == inds_y[1]: res = self.values[inds_x[0], inds_y[0]]*unit # if we are on a x grid branch elif inds_x[0] == inds_x[1]: ind_x = inds_x[0] pos_y1 = self.axe_y[inds_y[0]] pos_y2 = self.axe_y[inds_y[1]] value1 = self.values[ind_x, inds_y[0]] value2 = self.values[ind_x, inds_y[1]] i_value = ((value2*np.abs(pos_y1 - y) + value1*np.abs(pos_y2 - y)) / np.abs(pos_y1 - pos_y2)) res = i_value*unit # if we are on a y grid branch elif inds_y[0] == inds_y[1]: ind_y = inds_y[0] pos_x1 = self.axe_x[inds_x[0]] pos_x2 = self.axe_x[inds_x[1]] value1 = self.values[inds_x[0], ind_y] value2 = self.values[inds_x[1], ind_y] i_value = ((value2*np.abs(pos_x1 - x) + value1*np.abs(pos_x2 - x)) / np.abs(pos_x1 - pos_x2)) return i_value*unit # if we are in the middle of nowhere (linear interpolation) else: ind_x = inds_x[0] ind_y = inds_y[0] a, b = np.meshgrid(self.axe_x[ind_x:ind_x + 2], self.axe_y[ind_y:ind_y + 2], indexing='ij') values = self.values[ind_x:ind_x + 2, ind_y:ind_y + 2] a = a.flatten() b = b.flatten() pts = list(zip(a, b)) interp_vx = spinterp.LinearNDInterpolator(pts, values.flatten()) i_value = float(interp_vx(x, y)) res = i_value*unit return res
[docs] def get_zones_centers(self, bornes=[0.75, 1], rel=True, kind='ponderated'): """ Return a pts.Points object contening centers of the zones lying in the given bornes. Parameters ---------- bornes : 2x1 array, optionnal Trigger values determining the zones. '[inferior borne, superior borne]' rel : Boolean If 'rel' is 'True' (default), values of 'bornes' are relative to the extremum values of the field. If 'rel' is 'False', values of bornes are treated like absolute values. kind : string, optional if 'kind' is 'center', given points are geometrical centers, if 'kind' is 'extremum', given points are extrema (min or max) on zones if 'kind' is 'ponderated'(default, given points are centers of mass, ponderated by the scaler field. Returns ------- pts : pts.Points object Contening the centers coordinates """ # correcting python's problem with egality... bornes[0] -= 0.00001*abs(bornes[0]) bornes[1] += 0.00001*abs(bornes[1]) # checking parameters coherence if not isinstance(bornes, ARRAYTYPES): raise TypeError("'bornes' must be an array") if not isinstance(bornes, np.ndarray): bornes = np.array(bornes, dtype=float) if not bornes.shape == (2,): raise ValueError("'bornes' must be a 2x1 array") if bornes[0] == bornes[1]: return None if not bornes[0] < bornes[1]: raise ValueError("'bornes' must be crescent") if not isinstance(rel, bool): raise TypeError("'rel' must be a boolean") if not isinstance(kind, STRINGTYPES): raise TypeError("'kind' must be a string") # compute minimum and maximum if 'rel=True' if rel: if bornes[0]*bornes[1] < 0: raise ValueError("In relative 'bornes' must have the same" " sign") mini = self.min maxi = self.max if np.abs(bornes[0]) > np.abs(bornes[1]): bornes[1] = abs(maxi - mini)*bornes[1] + maxi bornes[0] = abs(maxi - mini)*bornes[0] + maxi else: bornes[1] = abs(maxi - mini)*bornes[1] + mini bornes[0] = abs(maxi - mini)*bornes[0] + mini # check if the zone exist else: mini = self.min maxi = self.max if maxi < bornes[0] or mini > bornes[1]: return None # getting data values = self.values mask = self.mask if np.any(mask): warnings.warn("There is masked values, algorithm can give " "strange results") # check if there is more than one point superior aoi = np.logical_and(values >= bornes[0], values <= bornes[1]) if np.sum(aoi) == 1: inds = np.where(aoi) x = self.axe_x[inds[0][0]] y = self.axe_y[inds[1][0]] return pts.Points([[x, y]], unit_x=self.unit_x, unit_y=self.unit_y) zones = np.logical_and(np.logical_and(values >= bornes[0], values <= bornes[1]), np.logical_not(mask)) # compute the center with labelzones labeledzones, nmbzones = msr.label(zones) inds = [] if kind == 'extremum': mins, _, ind_min, ind_max = msr.extrema(values, labeledzones, np.arange(nmbzones) + 1) for i in np.arange(len(mins)): if bornes[np.argmax(np.abs(bornes))] < 0: inds.append(ind_min[i]) else: inds.append(ind_max[i]) elif kind == 'center': inds = msr.center_of_mass(np.ones(self.shape), labeledzones, np.arange(nmbzones) + 1) elif kind == 'ponderated': inds = msr.center_of_mass(np.abs(values), labeledzones, np.arange(nmbzones) + 1) else: raise ValueError("Invalid value for 'kind'") coords = [] for ind in inds: indx = ind[0] indy = ind[1] if indx % 1 == 0: x = self.axe_x[int(indx)] else: dx = self.axe_x[1] - self.axe_x[0] x = self.axe_x[int(indx)] + dx*(indx % 1) if indy % 1 == 0: y = self.axe_y[int(indy)] else: dy = self.axe_y[1] - self.axe_y[0] y = self.axe_y[int(indy)] + dy*(indy % 1) coords.append([x, y]) coords = np.array(coords, dtype=float) if len(coords) == 0: return None return pts.Points(coords, unit_x=self.unit_x, unit_y=self.unit_y)
[docs] def get_nearest_extrema(self, pts, extrema='max', ind=False): """ For a given set of points, return the positions of the nearest local extrema (minimum or maximum). Parameters ---------- pts : Nx2 array Set of pts.Points position. Returns ------- extremum_pos : Nx2 array """ # get data tmp_sf = self.copy() tmp_sf.mirroring(direction='x', position=tmp_sf.axe_x[0], inds_to_mirror=1, inplace=True) tmp_sf.mirroring(direction='x', position=tmp_sf.axe_x[-1], inds_to_mirror=1, inplace=True) tmp_sf.mirroring(direction='y', position=tmp_sf.axe_y[0], inds_to_mirror=1, inplace=True) tmp_sf.mirroring(direction='y', position=tmp_sf.axe_y[-1], inds_to_mirror=1, inplace=True) dx = tmp_sf.axe_x[1] - tmp_sf.axe_x[0] dy = tmp_sf.axe_y[1] - tmp_sf.axe_y[0] # get gradient field grad_x, grad_y = np.gradient(tmp_sf.values, dx, dy) from . import vectorfield as vf tmp_vf = vf.VectorField() tmp_vf.import_from_arrays(tmp_sf.axe_x, tmp_sf.axe_y, grad_x, grad_y, unit_x=tmp_sf.unit_x, unit_y=tmp_sf.unit_y, unit_values=tmp_sf.unit_values) # extract the streamline from the gradient field from ..field_treatment import get_streamlines if extrema == 'min': reverse = True else: reverse = False sts = get_streamlines(tmp_vf, pts, reverse=reverse, resolution=0.1) # get the final converged points extremum_pos = [] if isinstance(sts, ARRAYTYPES): for i, st in enumerate(sts): if len(st.xy) == 0: extremum_pos.append(pts[i]) else: extremum_pos.append(st.xy[-1]) else: extremum_pos.append(sts.xy[-1]) extremum_pos = np.array(extremum_pos) # returning return extremum_pos
[docs] def get_profile(self, direction, position, ind=False, interp='linear'): """ Return a profile of the scalar field, at the given position (or at least at the nearest possible position). If position is an interval, the fonction return an average profile in this interval. Parameters ---------- direction : string in ['x', 'y'] Direction along which we choose a position. position : float, interval of float or string Position, interval in which we want a profile or 'all' ind : boolean If 'True', position has to be given in indices If 'False' (default), position has to be given in axis unit. interp : string in ['nearest', 'linear'] if 'nearest', get the profile at the nearest position on the grid, if 'linear', use linear interpolation to get the profile at the exact position Returns ------- profile : prof.Profile object Wanted profile """ # checking parameters if direction not in ['x', 'y']: raise TypeError("'direction' must be 'x' or 'y'") if not isinstance(position, NUMBERTYPES + ARRAYTYPES + STRINGTYPES): raise TypeError() if isinstance(position, ARRAYTYPES): position = np.array(position, dtype=float) if not position.shape == (2,): raise ValueError("'position' must be a number or an interval") elif isinstance(position, STRINGTYPES): if position != 'all': raise ValueError() if not isinstance(ind, bool): raise TypeError() if not isinstance(interp, STRINGTYPES): raise TypeError() if interp not in ['nearest', 'linear']: raise ValueError() # getting data if direction == 'x': axe = self.axe_x unit_x = self.unit_y unit_y = self.unit_values else: axe = self.axe_y unit_x = self.unit_x unit_y = self.unit_values # applying interval type if isinstance(position, ARRAYTYPES) and not ind: for pos in position: if pos > axe.max(): pos = axe.max() if pos < axe.min(): pos = axe.min() elif isinstance(position, ARRAYTYPES) and ind: if np.min(position) < -len(axe) + 1 or \ np.max(position) > len(axe) - 1: raise ValueError("'position' must be included in" " the choosen axis values") elif isinstance(position, NUMBERTYPES) and not ind: if position > axe.max() or position < axe.min(): raise ValueError("'position' must be included in the choosen" " axis values (here [{0},{1}])" .format(axe.min(), axe.max())) elif isinstance(position, NUMBERTYPES) and ind: if np.min(position) < 0 or np.max(position) > len(axe) - 1: raise ValueError("'position' must be included in the choosen" " axis values (here [{0},{1}])" .format(0, len(axe) - 1)) elif position == 'all': position = np.array([axe[0], axe[-1]]) else: raise ValueError() # if use interpolation if isinstance(position, NUMBERTYPES) and interp == 'linear': if direction == 'x': axe = self.axe_y if ind: position = self.axe_x[position] vals = [self.get_value(position, axe_i) for axe_i in axe] tmp_prof = prof.Profile(x=axe, y=vals, mask=False, unit_x=self.unit_y, unit_y=self.unit_values) if direction == 'y': axe = self.axe_x if ind: position = self.axe_y[position] vals = [self.get_value(axe_i, position) for axe_i in axe] tmp_prof = prof.Profile(x=axe, y=vals, mask=False, unit_x=self.unit_x, unit_y=self.unit_values) return tmp_prof # if not if isinstance(position, NUMBERTYPES) and not ind: for i in np.arange(1, len(axe)): if (axe[i] >= position and axe[i-1] <= position) \ or (axe[i] <= position and axe[i-1] >= position): break if np.abs(position - axe[i]) > np.abs(position - axe[i-1]): finalindice = i-1 else: finalindice = i if direction == 'x': prof_mask = self.mask[finalindice, :] profile = self.values[finalindice, :] axe = self.axe_y else: prof_mask = self.mask[:, finalindice] profile = self.values[:, finalindice] axe = self.axe_x elif isinstance(position, NUMBERTYPES) and ind: if direction == 'x': prof_mask = self.mask[position, :] profile = self.values[position, :] axe = self.axe_y else: prof_mask = self.mask[:, position] profile = self.values[:, position] axe = self.axe_x # Calculation of the profile for an interval of position elif isinstance(position, ARRAYTYPES) and not ind: axe_mask = np.logical_and(axe >= position[0], axe <= position[1]) if direction == 'x': prof_mask = self.mask[axe_mask, :].mean(0) profile = self.values[axe_mask, :].mean(0) axe = self.axe_y else: prof_mask = self.mask[:, axe_mask].mean(1) profile = self.values[:, axe_mask].mean(1) axe = self.axe_x elif isinstance(position, ARRAYTYPES) and ind: if direction == 'x': prof_mask = self.mask[position[0]:position[1], :].mean(0) profile = self.values[position[0]:position[1], :].mean(0) axe = self.axe_y else: prof_mask = self.mask[:, position[0]:position[1]].mean(1) profile = self.values[:, position[0]:position[1]].mean(1) axe = self.axe_x return prof.Profile(axe, profile, prof_mask, unit_x, unit_y)
[docs] def get_histogram(self, cum=False, normalized=False, bins=None, range=None): """ Return the image histogram. Parameters ========== cum: boolean If True, get a cumulative histogram. Returns ======= hist: array of numbers Histogram. """ if bins is None: bins = 10 hist, xs = np.histogram(self.values.flatten(), bins=bins, range=range, density=normalized) xs = xs[0:-1] + np.mean(xs[0:2]) if cum: hist = np.cumsum(hist) return prof.Profile(xs, hist, mask=False, unit_x=self.unit_values, unit_y="counts")
[docs] def get_spatial_autocorrelation(self, direction, window_len=None): """ Return the spatial auto-correlation along the wanted direction. Take the middle point for reference for correlation computation. Parameters ---------- direction : string 'x' or 'y' window_len : integer, optional Window length for sweep correlation. if 'None' (default), all the signal is used, and boundary effect can be seen. Returns ------- profile : prof.Profile object Spatial correlation """ # Direction X if direction == 'x': if window_len is None: window_len = int(len(self.axe_x) - 2) # loop on profiles cor = np.zeros(int(np.floor(window_len/2.)*2 + 1)) nmb = 0 for i, y in enumerate(self.axe_y): tmp_prof = self.get_profile('y', i, ind=True) cor += tmp_prof.get_auto_correlation(window_len, raw=True) nmb += 1 cor /= nmb # returning dx = self.axe_x[1] - self.axe_x[0] x = np.arange(0, len(cor)*dx, dx) return prof.Profile(x=x, y=cor, unit_x=self.unit_x, unit_y=make_unit('')) elif direction == 'y': if window_len is None: window_len = int(len(self.axe_y) - 2) # loop on profiles cor = np.zeros(int(np.floor(window_len/2.)*2 + 1)) nmb = 0 for i, x in enumerate(self.axe_x): tmp_prof = self.get_profile('x', i, ind=True) cor += tmp_prof.get_auto_correlation(window_len, raw=True) nmb += 1 cor /= nmb # returning dy = self.axe_y[1] - self.axe_y[0] y = np.arange(0, len(cor)*dy, dy) return prof.Profile(x=y, y=cor, unit_x=self.unit_y, unit_y=make_unit('')) else: raise ValueError()
[docs] def get_spatial_spectrum(self, direction, intervx=None, intervy=None, welch_seglen=None, scaling='base', fill='linear'): """ Return a spatial spectrum. Parameters ---------- direction : string 'x' or 'y'. intervx and intervy : 2x1 arrays of number, optional To chose the zone where to calculate the spectrum. If not specified, the biggest possible interval is choosen. welch_seglen : integer, optional If specified, welch's method is used (dividing signal into overlapping segments, and averaging periodogram) with the given segments length (in number of points). scaling : string, optional If 'base' (default), result are in component unit. If 'spectrum', the power spectrum is returned (in unit^2). If 'density', the power spectral density is returned (in unit^2/(1/unit_axe)) fill : string or float Specifies the way to treat missing values. A value for value filling. A string ('linear', 'nearest' or 'cubic') for interpolation. Returns ------- spec : prof.Profile object Magnitude spectrum. Notes ----- If there is missing values on the field, 'fill' is used to linearly interpolate the missing values (can impact the spectrum). """ # check parameters if not isinstance(direction, STRINGTYPES): raise TypeError() if direction not in ['x', 'y']: raise ValueError() if intervx is None: intervx = [self.axe_x[0], self.axe_x[-1]] if not isinstance(intervx, ARRAYTYPES): raise TypeError() intervx = np.array(intervx) if intervx[0] < self.axe_x[0]: intervx[0] = self.axe_x[0] if intervx[1] > self.axe_x[-1]: intervx[1] = self.axe_x[-1] if intervx[1] <= intervx[0]: raise ValueError() if intervy is None: intervy = [self.axe_y[0], self.axe_y[-1]] if not isinstance(intervy, ARRAYTYPES): raise TypeError() intervy = np.array(intervy) if intervy[0] < self.axe_y[0]: intervy[0] = self.axe_y[0] if intervy[1] > self.axe_y[-1]: intervy[1] = self.axe_y[-1] if intervy[1] <= intervy[0]: raise ValueError() if isinstance(fill, NUMBERTYPES): value = fill fill = 'value' else: value = 0. # getting data tmp_SF = self.crop(intervx=intervx, intervy=intervy, inplace=False) tmp_SF.fill(kind=fill, value=value, inplace=True, reduce_tri=True) # getting spectrum if direction == 'x': # first spectrum prof = tmp_SF.get_profile('y', tmp_SF.axe_y[0]) spec = prof.get_spectrum(welch_seglen=welch_seglen, scaling=scaling, fill=fill) # otherones for y in tmp_SF.axe_y[1::]: prof = tmp_SF.get_profile('y', y) spec += prof.get_spectrum(welch_seglen=welch_seglen, scaling=scaling, fill=fill) spec /= len(tmp_SF.axe_y) else: # first spectrum prof = tmp_SF.get_profile('x', tmp_SF.axe_x[0]) spec = prof.get_spectrum(welch_seglen=welch_seglen, scaling=scaling, fill=fill) # otherones for x in tmp_SF.axe_x[1::]: prof = tmp_SF.get_profile('x', x) spec += prof.get_spectrum(welch_seglen=welch_seglen, scaling=scaling, fill=fill) spec /= len(tmp_SF.axe_x) return spec
[docs] def get_norm(self, norm=2, normalized='perpoint'): """ Return the field norm """ values = self.values[~self.mask] norm = (np.sum(np.abs(values)**norm))**(1./norm) if normalized == 'perpoint': norm /= np.sum(~self.mask) return norm
[docs] def get_interpolator(self, interp="linear"): """ Return the field interpolator. Parameters ---------- kind : {‘linear’, ‘cubic’, ‘quintic’}, optional The kind of spline interpolation to use. Default is ‘linear’. """ return spinterp.interp2d(self.axe_x, self.axe_y, self.values.transpose(), kind=interp)
[docs] def integrate_over_line(self, direction, interval): """ Return the integral on an interval and along a direction ('x' or 'y'). Discretized integral is computed with a trapezoidal algorithm. Parameters ---------- direction : string in ['x', 'y'] Direction along which we choose a position. interval : interval of numbers Interval on which we want to calculate the integral. Returns ------- integral : float Result of the integrale calcul unit : Unit object Unit of the result """ profile = self.get_profile(direction, interval) if np.any(profile.mask): raise Exception("Masked values on the line") integrale = np.trapz(profile.y, profile.x) if direction == 1: unit = self.unit_values*self.unit_y else: unit = self.unit_values*self.unit_x return integrale*unit
[docs] def integrate_over_surface(self, intervx=None, intervy=None): """ Return the integral on a surface. Discretized integral is computed with a very rustic algorithm which just sum the value on the surface. if 'intervx' and 'intervy' are given, return the integral over the delimited surface. WARNING : Only works (and badly) with regular axes. Parameters ---------- intervx : interval of numbers, optional Interval along x on which we want to compute the integrale. intervy : interval of numbers, optional Interval along y on which we want to compute the integrale. Returns ------- integral : float Result of the integrale computation. unit : Unit object The unit of the integrale result. """ if intervx is None: intervx = [-np.inf, np.inf] if intervy is None: intervy = [-np.inf, np.inf] cropfield = self.crop(intervx=intervx, intervy=intervy, inplace=False) if np.any(cropfield.mask): raise Exception("Masked values on the surface") axe2_x, axe2_y = cropfield.axe_x, cropfield.axe_y unit_x, unit_y = cropfield.unit_x, cropfield.unit_y integral = (cropfield.values.sum() * np.abs(axe2_x[-1] - axe2_x[0]) * np.abs(axe2_y[-1] - axe2_y[0]) / len(axe2_x) / len(axe2_y)) unit = cropfield.unit_values*unit_x*unit_y return integral*unit
[docs] def copy(self): """ Return a copy of the scalarfield. """ return copy.deepcopy(self)
[docs] def export_to_scatter(self, mask=None): """ Return the scalar field under the form of a pts.Points object. Parameters ---------- mask : array of boolean, optional Mask to choose values to extract (values are taken where mask is False). Returns ------- Pts : pts.Points object Contening the ScalarField points. """ if mask is None: mask = np.zeros(self.shape) if not isinstance(mask, ARRAYTYPES): raise TypeError("'mask' must be an array of boolean") mask = np.array(mask) if mask.shape != self.shape: raise ValueError("'mask' must have the same dimensions as" "the ScalarField :{}".format(self.shape)) # récupération du masque mask = np.logical_or(mask, self.mask) tmp_pts = None v = np.array([], dtype=float) # boucle sur les points for inds, pos, value in self: if mask[inds[0], inds[1]]: continue if tmp_pts is None: tmp_pts = [pos] else: tmp_pts = np.append(tmp_pts, [pos], axis=0) v = np.append(v, value) return pts.Points(tmp_pts, v, self.unit_x, self.unit_y, self.unit_values)
[docs] def scale(self, scalex=None, scaley=None, scalev=None, inplace=False): """ Scale the ScalarField. Parameters ---------- scalex, scaley, scalev : numbers or Unum objects Scale for the axis and the values inplace : boolean . """ if inplace: tmp_f = self else: tmp_f = self.copy() # xy revx, revy = fld.Field.scale(tmp_f, scalex=scalex, scaley=scaley, inplace=True, output_reverse=True) # v if scalev is None: pass elif isinstance(scalev, NUMBERTYPES): tmp_f.values *= scalev elif isinstance(scalev, unum.Unum): new_unit = tmp_f.unit_values*scalev fact = new_unit.asNumber() new_unit /= fact tmp_f.unit_values = new_unit tmp_f.values *= fact else: raise TypeError() if revx and revy: tmp_f.values = tmp_f.values[::-1, ::-1] elif revx: tmp_f.values = tmp_f.values[::-1, :] elif revy: tmp_f.values = tmp_f.values[:, ::-1] # returning if not inplace: return tmp_f
[docs] def rotate(self, angle, inplace=False): """ Rotate the scalar field. Parameters ---------- angle : integer Angle in degrees (positive for trigonometric direction). In order to preserve the orthogonal grid, only multiples of 90° are accepted (can be negative multiples). inplace : boolean, optional If 'True', scalar field is rotated in place, else, the function return a rotated field. Returns ------- rotated_field : ScalarField object, optional Rotated scalar field. """ # check params if not isinstance(angle, NUMBERTYPES): raise TypeError() if angle % 90 != 0: raise ValueError() if not isinstance(inplace, bool): raise TypeError() # get data if inplace: tmp_field = self else: tmp_field = self.copy() # normalize angle angle = angle % 360 # rotate the parent fld.Field.rotate(tmp_field, angle, inplace=True) # rotate nmb_rot90 = int(angle/90) tmp_field.__values = np.rot90(tmp_field.values, nmb_rot90) tmp_field.__mask = np.rot90(tmp_field.mask, nmb_rot90) # returning if not inplace: return tmp_field
[docs] def change_dtype(self, new_type): """ Change the values dtype. """ if new_type != self._values_dtype: self.values = np.array(self.values, dtype=new_type) self._values_dtype = new_type
[docs] def change_unit(self, axe, new_unit): """ Change the unit of an axe. Parameters ---------- axe : string 'y' for changing the profile y axis unit 'x' for changing the profile x axis unit 'values' or changing values unit new_unit : Unum.unit object or string The new unit. """ if isinstance(new_unit, STRINGTYPES): new_unit = make_unit(new_unit) if not isinstance(new_unit, unum.Unum): raise TypeError() if not isinstance(axe, STRINGTYPES): raise TypeError() if axe == 'x': fld.Field.change_unit(self, axe, new_unit) elif axe == 'y': fld.Field.change_unit(self, axe, new_unit) elif axe == 'values': old_unit = self.unit_values new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.values *= fact self.unit_values = new_unit/fact else: raise ValueError()
[docs] def crop(self, intervx=None, intervy=None, ind=False, inplace=False): """ Crop the area in respect with given intervals. Parameters ---------- intervx : array, optional interval wanted along x intervy : array, optional interval wanted along y ind : boolean, optional If 'True', intervals are understood as indices along axis. If 'False' (default), intervals are understood in axis units. inplace : boolean, optional If 'True', the field is croped in place. """ if inplace: values = self.values mask = self.mask indmin_x, indmax_x, indmin_y, indmax_y = \ fld.Field.crop(self, intervx, intervy, full_output=True, ind=ind, inplace=True) self.__values = values[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] self.__mask = mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] else: indmin_x, indmax_x, indmin_y, indmax_y, cropfield = \ fld.Field.crop(self, intervx=intervx, intervy=intervy, full_output=True, ind=ind, inplace=False) cropfield.__values = self.values[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] cropfield.__mask = self.mask[indmin_x:indmax_x + 1, indmin_y:indmax_y + 1] return cropfield
[docs] def extend(self, nmb_left=0, nmb_right=0, nmb_up=0, nmb_down=0, value=None, inplace=False, ind=True): """ Add columns or lines of masked values at the scalarfield. Parameters ---------- nmb_**** : integers Number of lines/columns to add in each direction. value : None or number Value used to fill the new columns and lines. If 'value' is not given, new columns and lines are masked. inplace : bool If 'False', return a new extended field, if 'True', modify the field inplace. Returns ------- Extended_field : Field object, optional Extended field. """ if not ind: dx = self.axe_x[1] - self.axe_x[0] dy = self.axe_y[1] - self.axe_y[0] nmb_left = np.ceil(nmb_left/dx) nmb_right = np.ceil(nmb_right/dx) nmb_up = np.ceil(nmb_up/dy) nmb_down = np.ceil(nmb_down/dy) ind = True # check params if not (isinstance(nmb_left, int) or nmb_left % 1 == 0): raise TypeError() if not (isinstance(nmb_right, int) or nmb_right % 1 == 0): raise TypeError() if not (isinstance(nmb_up, int) or nmb_up % 1 == 0): raise TypeError() if not (isinstance(nmb_down, int) or nmb_down % 1 == 0): raise TypeError() nmb_left = int(nmb_left) nmb_right = int(nmb_right) nmb_up = int(nmb_up) nmb_down = int(nmb_down) if np.any(np.array([nmb_left, nmb_right, nmb_up, nmb_down]) < 0): raise ValueError() # used herited method to extend the field tmpsf = fld.Field.extend(self, nmb_left=nmb_left, nmb_right=nmb_right, nmb_up=nmb_up, nmb_down=nmb_down, inplace=False) new_shape = tmpsf.shape # extend the value ans mask if value is None: new_values = np.zeros(new_shape, dtype=self._values_dtype) new_mask = np.ones(new_shape, dtype=bool) else: new_values = np.ones(new_shape, dtype=self._values_dtype)*value new_mask = np.zeros(new_shape, dtype=bool) if nmb_right == 0: slice_x = slice(nmb_left, new_values.shape[0] + 2) else: slice_x = slice(nmb_left, -nmb_right) if nmb_up == 0: slice_y = slice(nmb_down, new_values.shape[1] + 2) else: slice_y = slice(nmb_down, -nmb_up) new_values[slice_x, slice_y] = self.values new_mask[slice_x, slice_y] = self.mask # return if inplace: self.import_from_arrays(axe_x=tmpsf.axe_x, axe_y=tmpsf.axe_y, values=new_values, mask=new_mask, unit_x=tmpsf.unit_x, unit_y=tmpsf.unit_y, unit_values=tmpsf.unit_values) else: new_field.values = new_values new_field.mask = new_mask return new_field
[docs] def mirroring(self, direction, position, inds_to_mirror='all', mir_coef=1, interp=None, value=0, inplace=False): """ Return a field with additional mirrored values. Parameters ---------- direction : string in ['x', 'y'] Axe on which place the symetry plane position : number Position of the symetry plane alogn the given axe inds_to_mirror : integer Number of vector rows to symetrize (default is all) mir_coef : number, optional Optional coefficient applied only to the mirrored values. inplace : boolean, optional . interp : string, optional If specified, method used to fill the gap near the symetry plane by interpoaltion. 'value' : fill with the given value, 'nearest' : fill with the nearest value, 'linear' (default): fill using linear interpolation (Delaunay triangulation), 'cubic' : fill using cubic interpolation (Delaunay triangulation) value : array, optional Value at the symetry plane, in case of interpolation """ # check params if direction not in ['x', 'y']: raise TypeError() if not isinstance(position, NUMBERTYPES): raise TypeError("Position should be a number, not {}" .format(type(position))) position = float(position) # get data axe_x = self.axe_x axe_y = self.axe_y if inplace: tmp_vf = self else: tmp_vf = self.copy() tmp_vf.crop_masked_border(inplace=True) # get side to mirror if direction == 'x': axe = axe_x x_median = (axe_x[-1] + axe_x[0])/2. delta = axe_x[1] - axe_x[0] if position < axe_x[0]: border = axe_x[0] side = 'left' elif position > axe_x[-1]: border = axe_x[-1] side = 'right' elif position < x_median: tmp_vf.crop(intervx=[position, axe_x[-1]], ind=False, inplace=True) side = 'left' axe_x = tmp_vf.axe_x border = axe_x[0] elif position > x_median: tmp_vf.crop(intervx=[axe_x[0], position], ind=False, inplace=True) side = 'right' axe_x = tmp_vf.axe_x border = axe_x[-1] else: raise ValueError() elif direction == 'y': axe = axe_y y_median = (axe_y[-1] + axe_y[0])/2. delta = axe_y[1] - axe_y[0] if position < axe_y[0]: border = axe_y[0] side = 'down' elif position > axe_y[-1]: border = axe_y[-1] side = 'up' elif position < y_median: tmp_vf.crop(intervy=[position, axe_y[-1]], ind=False, inplace=True) side = 'down' axe_y = tmp_vf.axe_y border = axe_y[0] elif position > y_median: tmp_vf.crop(intervy=[axe_y[0], position], ind=False, inplace=True) side = 'up' axe_y = tmp_vf.axe_y border = axe_y[-1] else: raise ValueError() else: raise ValueError() # get length to mirror if inds_to_mirror == 'all' or inds_to_mirror > len(axe): inds_to_mirror = len(axe) - 1 if side in ['left', 'down']: delta_gap = -(position - border)/delta else: delta_gap = (position - border)/delta inds_to_add = np.ceil(inds_to_mirror + 2*delta_gap) - 1 # extend the field tmp_dic = {'nmb_{}'.format(side): inds_to_add} tmp_vf.extend(inplace=True, **tmp_dic) new_axe_x = tmp_vf.axe_x new_axe_y = tmp_vf.axe_y # filling mirrored part with interpolated values for i, x in enumerate(new_axe_x): for j, y in enumerate(new_axe_y): # if point is not masked if not tmp_vf.mask[i, j]: continue # get mirror point position if direction == 'x': mir_pos = [position - (x - position), y] else: mir_pos = [x, position - (y - position)] # if mirror point is outside hte field if mir_pos[0] < new_axe_x[0] or mir_pos[0] > new_axe_x[-1] \ or mir_pos[1] < new_axe_y[0] \ or mir_pos[1] > new_axe_y[-1]: continue # get mirror point value mir_val = tmp_vf.get_value(*mir_pos, ind=False) # if mirror point can't be interpolated (masked) if np.isnan(mir_val): continue # Sotring the new value in the field tmp_vf.values[i, j] = mir_val*mir_coef tmp_vf.mask[i, j] = False # getting mask masked_values = np.any(tmp_vf.mask) # interpolating between mirror images if interp is None: pass elif interp == 'value' and masked_values: tmp_vf.fill(kind='value', value=value, inplace=True, crop=False, reduce_tri=False) elif interp == 'linear' and masked_values: # getting data new_axe_x = tmp_vf.axe_x new_axe_y = tmp_vf.axe_y values = tmp_vf.values mask = tmp_vf.mask # direction x if side in ['right', 'left']: # get last column ind_last = np.where(mask[:, 0])[0][0] - 1 # get number of missing values nmb_masked = np.sum(mask[:, 0]) # loop on lines for i in np.arange(len(new_axe_y)): tmp_val = np.linspace(values[ind_last - 1, i], values[ind_last + nmb_masked, i], nmb_masked + 2)[1:-1] values[ind_last:ind_last + nmb_masked, i] = tmp_val else: # get last column ind_last = np.where(mask[0, :])[0][0] # get number of missing values nmb_masked = np.sum(mask[0, :]) # loop on lines for i in np.arange(len(new_axe_x)): tmp_val = np.linspace(values[i, ind_last - 1], values[i, ind_last + nmb_masked], nmb_masked + 2)[1:-1] values[i, ind_last:ind_last + nmb_masked] = tmp_val tmp_vf.values = values tmp_vf.mask = False # slower interplation method elif interp is not None and masked_values: # getting data x, y = tmp_vf.axe_x, tmp_vf.axe_y values = tmp_vf.values mask = tmp_vf.mask # geting filters from mask if interp in ['nearest', 'linear', 'cubic']: X, Y = np.meshgrid(x, y, indexing='ij') xy = [X.flat[:], Y.flat[:]] xy = np.transpose(xy) filt = np.logical_not(mask) xy_masked = xy[mask.flatten()] # getting the zone to interpolate xy_good = xy[filt.flatten()] values_good = values[filt] else: raise ValueError() # adding the value at the symetry plane if direction == 'x': addit_xy = list(zip([position]*len(tmp_vf.axe_y), tmp_vf.axe_y)) addit_values = [value]*len(tmp_vf.axe_y) else: addit_xy = list(zip(tmp_vf.axe_x, [position]*len(tmp_vf.axe_x))) addit_values = [value]*len(tmp_vf.axe_x) xy_good = np.concatenate((xy_good, addit_xy)) values_good = np.concatenate((values_good, addit_values)) # if interpolation if interp == 'value': values[mask] = value elif interp == 'nearest': nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[mask] = nearest(xy_masked) elif interp == 'linear': linear = spinterp.LinearNDInterpolator(xy_good, values_good) values[mask] = linear(xy_masked) new_mask = np.isnan(values) if np.any(new_mask): nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[new_mask] = nearest(xy[new_mask.flatten()]) elif interp == 'cubic': cubic = spinterp.CloughTocher2DInterpolator(xy_good, values_good) values[mask] = cubic(xy_masked) new_mask = np.isnan(values) if np.any(new_mask): nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[new_mask] = nearest(xy[new_mask.flatten()]) else: raise ValueError("unknown 'tof' value") tmp_vf.values = values tmp_vf.mask = False # returning if not inplace: return tmp_vf
[docs] def crop_masked_border(self, hard=False, inplace=False): """ Crop the masked border of the field in place or not. Parameters ---------- hard : boolean, optional If 'True', partially masked border are croped as well. """ # if inplace: tmp_vf = self else: tmp_vf = self.copy() # checking masked values presence mask = tmp_vf.mask if not np.any(mask): return None # hard cropping if hard: # remove trivial borders tmp_vf.crop_masked_border(hard=False, inplace=True) # until there is no more masked values while np.any(tmp_vf.mask): # getting number of masked value on each border bd1 = np.sum(tmp_vf.mask[0, :]) bd2 = np.sum(tmp_vf.mask[-1, :]) bd3 = np.sum(tmp_vf.mask[:, 0]) bd4 = np.sum(tmp_vf.mask[:, -1]) # getting more masked border more_masked = np.argmax([bd1, bd2, bd3, bd4]) # deleting more masked border if more_masked == 0: len_x = len(tmp_vf.axe_x) tmp_vf.crop(intervx=[1, len_x], ind=True, inplace=True) elif more_masked == 1: len_x = len(tmp_vf.axe_x) tmp_vf.crop(intervx=[0, len_x - 2], ind=True, inplace=True) elif more_masked == 2: len_y = len(tmp_vf.axe_y) tmp_vf.crop(intervy=[1, len_y], ind=True, inplace=True) elif more_masked == 3: len_y = len(tmp_vf.axe_y) tmp_vf.crop(intervy=[0, len_y - 2], ind=True, inplace=True) # soft cropping else: axe_x_m = np.logical_not(np.all(mask, axis=1)) axe_y_m = np.logical_not(np.all(mask, axis=0)) axe_x_min = np.where(axe_x_m)[0][0] axe_x_max = np.where(axe_x_m)[0][-1] axe_y_min = np.where(axe_y_m)[0][0] axe_y_max = np.where(axe_y_m)[0][-1] tmp_vf.crop([axe_x_min, axe_x_max], [axe_y_min, axe_y_max], ind=True, inplace=True) # returning if not inplace: return tmp_vf
[docs] def fill(self, kind='linear', value=0., inplace=False, reduce_tri=True, crop=False): """ Fill the masked part of the array. Parameters ---------- kind : string, optional Type of algorithm used to fill. 'value' : fill with the given value 'nearest' : fill with the nearest value 'linear' (default): fill using linear interpolation (Delaunay triangulation) 'cubic' : fill using cubic interpolation (Delaunay triangulation) value : number Value used to fill (for kind='value'). inplace : boolean, optional If 'True', fill the ScalarField in place. If 'False' (default), return a filled version of the field. reduce_tri : boolean, optional If 'True', treatment is used to reduce the triangulation effort (faster when a lot of masked values) If 'False', no treatment (faster when few masked values) crop : boolean, optional If 'True', SF borders are croped before filling. """ # check parameters coherence if not isinstance(kind, STRINGTYPES): raise TypeError("'kind' must be a string") if not isinstance(value, NUMBERTYPES): raise TypeError("'value' must be a number") if crop: self.crop_masked_border(hard=False, inplace=True) # getting data x, y = self.axe_x, self.axe_y values = self.values mask = self.mask if kind in ['nearest', 'linear', 'cubic']: X, Y = np.meshgrid(x, y, indexing='ij') xy = [X.flat[:], Y.flat[:]] xy = np.transpose(xy) filt = np.logical_not(mask) xy_masked = xy[mask.flatten()] # getting the zone to interpolate if reduce_tri and kind in ['nearest', 'linear', 'cubic']: import scipy.ndimage as spim dilated = spim.binary_dilation(self.mask, np.arange(9).reshape((3, 3))) filt_good = np.logical_and(filt, dilated) xy_good = xy[filt_good.flatten()] values_good = values[filt_good] elif not reduce_tri and kind in ['nearest', 'linear', 'cubic']: xy_good = xy[filt.flatten()] values_good = values[filt] else: pass # if there is nothing to do... if not np.any(mask): pass # if interpolation elif kind == 'value': values[mask] = value elif kind == 'nearest': nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[mask] = nearest(xy_masked) elif kind == 'linear': linear = spinterp.LinearNDInterpolator(xy_good, values_good) values[mask] = linear(xy_masked) new_mask = np.isnan(values) if np.any(new_mask): nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[new_mask] = nearest(xy[new_mask.flatten()]) elif kind == 'cubic': cubic = spinterp.CloughTocher2DInterpolator(xy_good, values_good) values[mask] = cubic(xy_masked) new_mask = np.isnan(values) if np.any(new_mask): nearest = spinterp.NearestNDInterpolator(xy_good, values_good) values[new_mask] = nearest(xy[new_mask.flatten()]) else: raise ValueError("unknown 'tof' value") # returning if inplace: self.values = values self.mask = False else: sf = ScalarField() sf.import_from_arrays(x, y, values, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) return sf
[docs] def smooth(self, tos='uniform', size=None, inplace=False, **kw): """ Smooth the scalarfield in place. Warning : fill up the field (should be used carefully with masked field borders) Parameters ---------- tos : string, optional Type of smoothing, can be 'uniform' (default) or 'gaussian' (See ndimage module documentation for more details) size : number, optional Size of the smoothing (is radius for 'uniform' and sigma for 'gaussian') in indice number. Default is 3 for 'uniform' and 1 for 'gaussian'. inplace : boolean, optional If True, Field is smoothed in place, else, the smoothed field is returned. kw : dic Additional parameters for ndimage methods (See ndimage documentation) """ if not isinstance(tos, STRINGTYPES): raise TypeError("'tos' must be a string") if size is None and tos == 'uniform': size = 3 elif size is None and tos == 'gaussian': size = 1 # filling up the field before smoothing if inplace: self.fill(inplace=True) values = self.values else: tmp_sf = self.fill(inplace=False) values = tmp_sf.values # smoothing if tos == "uniform": values = ndimage.uniform_filter(values, size, **kw) elif tos == "gaussian": values = ndimage.gaussian_filter(values, size, **kw) else: raise ValueError("'tos' must be 'uniform' or 'gaussian'") # storing if inplace: self.values = values else: tmp_sf.values = values return tmp_sf
[docs] def make_evenly_spaced(self, interp="linear", res=1): """ Use interpolation to make the field evenly spaced Parameters ---------- interp : {‘linear’, ‘cubic’, ‘quintic’}, optional The kind of spline interpolation to use. Default is ‘linear’. res : number Resolution of the resulting field. A value of 1 meaning a spatial resolution equal to the smallest space along the two axis for the initial field. """ # get data axex = self.axe_x axey = self.axe_y dx = np.min(axex[1:] - axex[:-1])/res dy = np.min(axey[1:] - axey[:-1])/res Dx = axex[-1] - axex[0] Dy = axey[-1] - axey[0] # interp = self.get_interpolator(interp=interp) # new_x = np.arange(axex[0], axex[-1] + .1*dx, dx) new_x = np.linspace(axex[0], axex[-1], int(Dx/dx)) # new_y = np.arange(axey[0], axey[-1] + .1*dy, dy) new_y = np.linspace(axey[0], axey[-1], int(Dy/dy)) new_values = interp(new_x, new_y) # store self.import_from_arrays(new_x, new_y, new_values.transpose(), mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values)
[docs] def reduce_spatial_resolution(self, fact, inplace=False): """ Reduce the spatial resolution of the field by a factor 'fact' Parameters ---------- fact : int Reducing factor. inplace : boolean, optional . """ if not isinstance(fact, int): raise TypeError() if fact < 1: raise ValueError() if fact == 1: if inplace: pass else: return self.copy() if fact % 2 == 0: pair = True else: pair = False # get new axis axe_x = self.axe_x axe_y = self.axe_y if pair: new_axe_x = (axe_x[np.arange(fact/2 - 1, len(axe_x) - fact/2, fact, dtype=int)] + axe_x[np.arange(fact/2, len(axe_x) - fact/2 + 1, fact, dtype=int)])/2. new_axe_y = (axe_y[np.arange(fact/2 - 1, len(axe_y) - fact/2, fact, dtype=int)] + axe_y[np.arange(fact/2, len(axe_y) - fact/2 + 1, fact, dtype=int)])/2. else: new_axe_x = axe_x[np.arange((fact - 1)/2, len(axe_x) - (fact - 1)/2, fact, dtype=int)] new_axe_y = axe_y[np.arange((fact - 1)/2, len(axe_y) - (fact - 1)/2, fact, dtype=int)] # get new values values = self.values mask = self.mask if pair: inds_x = np.arange(fact/2, len(axe_x) - fact/2 + 1, fact, dtype=int) inds_y = np.arange(fact/2, len(axe_y) - fact/2 + 1, fact, dtype=int) new_values = np.zeros((len(inds_x), len(inds_y))) new_mask = np.zeros((len(inds_x), len(inds_y))) for i in np.arange(len(inds_x)): intervx = slice(inds_x[i] - int(fact/2), inds_x[i] + int(fact/2)) for j in np.arange(len(inds_y)): intervy = slice(inds_y[j] - int(fact/2), inds_y[j] + int(fact/2)) if np.all(mask[intervx, intervy]): new_mask[i, j] = True new_values[i, j] = 0. else: new_values[i, j] = np.mean(values[intervx, intervy] [~mask[intervx, intervy]]) else: inds_x = np.arange((fact - 1)/2, len(axe_x) - (fact - 1)/2, fact) inds_y = np.arange((fact - 1)/2, len(axe_y) - (fact - 1)/2, fact) new_values = np.zeros((len(inds_x), len(inds_y))) new_mask = np.zeros((len(inds_x), len(inds_y))) for i in np.arange(len(inds_x)): intervx = slice(inds_x[i] - (fact - 1)/2, inds_x[i] + (fact - 1)/2 + 1) for j in np.arange(len(inds_y)): intervy = slice(inds_y[j] - (fact - 1)/2, inds_y[j] + (fact - 1)/2 + 1) if np.all(mask[intervx, intervy]): new_mask[i, j] = True new_values[i, j] = 0. else: new_values[i, j] = np.mean(values[intervx, intervy] [~mask[intervx, intervy]]) # ensuring right data type new_values = np.array(new_values, dtype=self._values_dtype) # returning if inplace: self.__init__() self.import_from_arrays(new_axe_x, new_axe_y, new_values, mask=new_mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values, dtype=self._values_dtype) else: sf = ScalarField() sf.import_from_arrays(new_axe_x, new_axe_y, new_values, mask=new_mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values, dtype=self._values_dtype) return sf
def __clean(self): self.__init__() def _display(self, component=None, kind=None, **plotargs): # getting datas axe_x, axe_y = self.axe_x, self.axe_y unit_x, unit_y = self.unit_x, self.unit_y X, Y = np.meshgrid(self.axe_y, self.axe_x) # getting wanted component if component is None or component == 'values': values = self.values.astype(dtype=self._values_dtype) mask = self.mask try: values[mask] = np.NaN except ValueError: values[mask] = 0 elif component == 'mask': values = self.mask else: raise ValueError("unknown value of 'component' parameter : {}" .format(component)) dp = pplt.Displayer(x=axe_x, y=axe_y, values=values, kind=kind, **plotargs) plot = dp.draw() pplt.DataCursorTextDisplayer(dp) # setting labels if unit_x.strUnit() == "[]": plt.xlabel("x") else: plt.xlabel("x " + unit_x.strUnit()) if unit_y.strUnit() == "[]": plt.ylabel("y") else: plt.ylabel("y " + unit_y.strUnit()) return plot
[docs] def display(self, component=None, kind=None, **plotargs): """ Display the scalar field. Parameters ---------- component : string, optional Component to display, can be 'values' or 'mask' kind : string, optinnal If 'imshow': (default) each datas are plotted (imshow), if 'contour': contours are ploted (contour), if 'contourf': filled contours are ploted (contourf). **plotargs : dict Arguments passed to the 'contourf' function used to display the scalar field. Returns ------- fig : figure reference Reference to the displayed figure. """ displ = self._display(component, kind, **plotargs) plt.title("") cb = plt.colorbar(displ) if self.unit_values.strUnit() == "[]": cb.set_label("Values") else: cb.set_label("Values {}".format(self.unit_values.strUnit())) # search for limits in case of masked field if component != 'mask': mask = self.mask for i in np.arange(len(self.axe_x)): if not np.all(mask[i, :]): break xmin = self.axe_x[i] for i in np.arange(len(self.axe_x) - 1, -1, -1): if not np.all(mask[i, :]): break xmax = self.axe_x[i] for i in np.arange(len(self.axe_y)): if not np.all(mask[:, i]): break ymin = self.axe_y[i] for i in np.arange(len(self.axe_y) - 1, -1, -1): if not np.all(mask[:, i]): break ymax = self.axe_y[i] plt.xlim([xmin, xmax]) plt.ylim([ymin, ymax]) return displ