Source code for IMTreatment.core.points

# -*- coding: utf-8 -*-
#!/bin/env python3

# Copyright (C) 2003-2007 Gaby Launay

# Author: Gaby Launay  <>
# URL:
# Version: 1.0

# This file is part of IMTreatment.

# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.

# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.

import matplotlib.pyplot as plt
from .. import plotlib as pplt
import numpy as np
import unum
import copy
from scipy import stats
from scipy import ndimage
from ..utils import make_unit

[docs]class Points(object): def __init__(self, xy=np.empty((0, 2), dtype=float), v=[], unit_x='', unit_y='', unit_v='', name=''): """ Class representing a set of points. You can use 'make_unit' to provide unities. Parameters ---------- xy : nx2 array. Representing the coordinates of each point of the set (n points). v : n array, optional Representing values attached at each points. unit_x : Unit object, optional X unit_y. unit_y : Unit object, optional Y unit_y. unit_v : Unit object, optional values unit_y. name : string, optional Name of the points set """ self.__v = [] if len(xy) == 0: xy = np.empty((0, 2), dtype=float) self.xy = xy self.v = v self.unit_v = unit_v self.unit_x = unit_x self.unit_y = unit_y = name def __iter__(self): if self.v is None or len(self.v) == 0: for i in np.arange(len(self.xy)): yield self.xy[i] else: for i in np.arange(len(self.xy)): yield self.xy[i], self.v[i] def __len__(self): return self.xy.shape[0] def __add__(self, another): if isinstance(another, Points): # trivial additions if len(self.xy) == 0: return another.copy() elif len(another.xy) == 0: return self.copy() # checking unit systems if len(self.xy) != 0: try: self.unit_x + another.unit_x self.unit_y + another.unit_y if self.v is not None and another.v is not None: self.unit_v + another.unit_v except unum.IncompatibleUnitsError: raise ValueError("Units system are not the same") else: self.unit_x = another.unit_x self.unit_y = another.unit_y if another.v is not None: self.unit_v = another.unit_v # compacting coordinates if another.xy.shape == (0,): new_xy = self.xy elif self.xy.shape == (0,): xy = another.xy xy[:, 0] = xy[:, 0]*(self.unit_x/another.unit_x).asNumber() xy[:, 1] = xy[:, 1]*(self.unit_y/another.unit_y).asNumber() new_xy = xy elif another.xy.shape == (0,) and self.xy.shape == (0,): new_xy = np.array([[]]) else: xy = another.xy xy[:, 0] = xy[:, 0]*(self.unit_x/another.unit_x).asNumber() xy[:, 1] = xy[:, 1]*(self.unit_y/another.unit_y).asNumber() new_xy = np.append(self.xy, xy, axis=0) # testing v presence v_presence = True if self.v is None and another.v is None: if len(self.xy) != 0: v_presence = False elif self.v is not None and another.v is not None: v_presence = True else: raise Exception() # compacting points and returning if v_presence: if self.v is None and another.v is None: v = np.array([]) elif self.v is None: v = another.v*(self.unit_v/another.unit_v).asNumber() elif another.v is None: v = self.v else: v_tmp = another.v*(self.unit_v/another.unit_v).asNumber() v = np.append(self.v, v_tmp) return Points(new_xy, v, unit_x=self.unit_x, unit_y=self.unit_y, unit_v=self.unit_v) else: return Points(new_xy, unit_x=self.unit_x, unit_y=self.unit_y) else: raise Exception("You can't add {} to Points objects" .format(type(another))) def __eq__(self, obj): if not isinstance(obj, Points): return False if not np.all(self.xy == obj.xy): return False if not np.all(self.v == obj.v): return False if not self.unit_x == obj.unit_x: return False if not self.unit_y == obj.unit_y: return False if not self.unit_v == obj.unit_v: return False return True @property def xy(self): return self.__xy @xy.setter def xy(self, values): values = np.asarray(values, dtype=float) if len(values != 0): if not values.ndim == 2: raise ValueError("ndim of xy is {} and should be 2" .format(values.ndim)) if not values.shape[1] == 2: raise ValueError() self.__xy = values if len(values) != len(self.__v): self.__v = np.array([]) @xy.deleter def xy(self): raise Exception("Nope, can't do that") @property def v(self): return self.__v @v.setter def v(self, values): if values is None: self__v = None else: values = np.array(values, subok=True) if not values.ndim == 1: raise ValueError() if not len(values) in [0, len(self.__xy)]: raise ValueError() self.__v = values @v.deleter def v(self): raise Exception("Nope, can't do that") @property def unit_x(self): return self.__unit_x @unit_x.setter def unit_x(self, unit): if isinstance(unit, unum.Unum): self.__unit_x = unit elif isinstance(unit, STRINGTYPES): try: self.__unit_x = make_unit(unit) except (ValueError, TypeError): raise Exception() else: raise Exception() @unit_x.deleter def unit_x(self): raise Exception("Nope, can't delete 'unit_x'") @property def unit_y(self): return self.__unit_y @unit_y.setter def unit_y(self, unit): if isinstance(unit, unum.Unum): self.__unit_y = unit elif isinstance(unit, STRINGTYPES): try: self.__unit_y = make_unit(unit) except (ValueError, TypeError): raise Exception() else: raise Exception() @unit_y.deleter def unit_y(self): raise Exception("Nope, can't delete 'unit_y'") @property def unit_v(self): return self.__unit_v @unit_v.setter def unit_v(self, unit): if isinstance(unit, unum.Unum): self.__unit_v = unit elif isinstance(unit, STRINGTYPES): try: self.__unit_v = make_unit(unit) except (ValueError, TypeError): raise Exception() else: raise Exception() @unit_v.deleter def unit_v(self): raise Exception("Nope, can't delete 'unit_v'") @property def name(self): return self.__name @name.setter def name(self, name): if isinstance(name, STRINGTYPES): self.__name = name else: raise Exception() @name.deleter def name(self): raise Exception("Nope, can't delete 'name'")
[docs] def copy(self): """ Return a copy of the Points object. """ return copy.deepcopy(self)
[docs] def get_points_density(self, bw_method=None, resolution=100, output_format='normalized', raw=False): """ Return a ScalarField with points density. Parameters ---------- bw_method : str, scalar or callable, optional The method used to calculate the estimator bandwidth. This can be 'scott', 'silverman', a scalar constant or a callable. If a scalar, this will be used as percent of the data std. If a callable, it should take a gaussian_kde instance as only parameter and return a scalar. If None (default), 'scott' is used. resolution : integer, optional Resolution for the resulting field. output_format : string, optional 'normalized' (default) : give position probability (integral egal 1). 'ponderated' : give position probability ponderated by the number or points (integral egal number of points). 'concentration' : give local concentration (in point per surface). raw : boolean, optional If 'False' (default), return a ScalarField object, if 'True', return numpy array. Returns ------- density : array, ScalarField object or None Return 'None' if there is not enough points in the cloud. """ # checking points length if len(self.xy) < 2: return None # getting data min_x = np.min(self.xy[:, 0]) max_x = np.max(self.xy[:, 0]) min_y = np.min(self.xy[:, 1]) max_y = np.max(self.xy[:, 1]) # getting min, max values and resolution in each direction width_x = max_x - min_x width_y = max_y - min_y if width_x == 0 and width_y == 0: raise Exception() elif width_x == 0: min_x = min_x - width_y/2. max_x = max_x + width_y/2. width_x = width_y res_x = resolution res_y = resolution elif width_y == 0: min_y = min_y - width_x/2. max_y = max_y + width_x/2. width_y = width_x res_x = resolution res_y = resolution elif width_x > width_y: res_x = resolution res_y = int(np.round(resolution*width_y/width_x)) else: res_y = resolution res_x = int(np.round(resolution*width_x/width_y)) if res_x < 2 or res_y < 2: raise ValueError() # check potential singular covariance matrix situations if (np.all(self.xy[:, 0] == self.xy[0, 0]) or np.all(self.xy[:, 1] == self.xy[0, 1])): return None # get kernel using scipy std = np.mean([np.std(self.xy[:, 0]), np.std(self.xy[:, 1])]) if isinstance(bw_method, NUMBERTYPES): if width_x > width_y: ad_len = width_y else: ad_len = width_x ad_bw_method = bw_method*std/ad_len else: ad_bw_method = bw_method kernel = stats.gaussian_kde(self.xy.transpose(), bw_method=ad_bw_method) # little adaptation to avoid streched density map if width_x > width_y: kernel.inv_cov[0, 0] = np.max([kernel.inv_cov]) else: kernel.inv_cov[1, 1] = np.max([kernel.inv_cov]) kernel.inv_cov[0, 1] *= 0 kernel.inv_cov[1, 0] *= 0 # creating grid if width_x > width_y: dx_border = kernel.factor*width_y/2. dy_border = dx_border else: dx_border = kernel.factor*width_x/2. dy_border = dx_border axe_x = np.linspace(min_x - dx_border, max_x + dx_border, res_x) axe_y = np.linspace(min_y - dy_border, max_y + dy_border, res_y) X, Y = np.meshgrid(axe_x, axe_y) X = X.flatten() Y = Y.flatten() positions = np.array([[X[i], Y[i]] for i in np.arange(len(X))]).transpose() # estimating density values = kernel(positions) values = values.reshape((res_y, res_x)).transpose() # normalize (not normalized yet because of the modification of inv_cov) dx = axe_x[1] - axe_x[0] dy = axe_y[1] - axe_y[0] values /= np.sum(np.sum(values))*(dx)*(dy) # adapt to wanted output_format if output_format is None or output_format == "normalized": unit_values = make_unit('') elif output_format == 'ponderated': values = values*len(self.xy) unit_values = make_unit('') elif output_format == "percentage": values = values*100 unit_values = make_unit('') elif output_format == "concentration": unit_values = 1/self.unit_x/self.unit_y values = values*len(self.xy) else: raise ValueError() # return if np.all(np.isnan(values)) or np.all(values == np.inf): return None if raw: return values else: from .scalarfield import ScalarField sf = ScalarField() sf.import_from_arrays(axe_x, axe_y, values, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=unit_values) return sf
[docs] def get_points_density2(self, res, subres=None, raw=False, ponderated=False): """ Return a ScalarField with points density. Parameters ---------- res : number or 2x1 array of numbers fdensity field number of subdivision. Can be the same number for both axis, or one number per axis (need to give a tuple). raw : boolean, optional If 'False' (default), return a ScalarField object, if 'True', return numpy array. ponderated : boolean, optiona If 'True', values associated to points are used to ponderate the density field. Default is 'False'. subres : odd integer, optional If specified, a subgrid of resolution res*subres is used to make result more accurate. """ # checking parameters if isinstance(res, int): res_x = res res_y = res elif isinstance(res, ARRAYTYPES): if len(res) != 2: raise ValueError() res_x = res[0] res_y = res[1] else: raise TypeError() if not isinstance(raw, bool): raise TypeError() if not isinstance(ponderated, bool): raise TypeError() if isinstance(subres, int) and subres > 0: subres = np.floor(subres/2)*2 subres2 = (subres)/2 elif subres is None: pass else: raise TypeError() # If we use a subgrid if subres is not None: # creating grid min_x = np.min(self.xy[:, 0]) max_x = np.max(self.xy[:, 0]) min_y = np.min(self.xy[:, 1]) max_y = np.max(self.xy[:, 1]) dx = (max_x - min_x)/(res_x) dy = (max_y - min_y)/(res_y) sub_dx = dx/subres sub_dy = dy/subres axe_x = np.arange(min_x - dx/2, max_x + dx/2 + sub_dx, sub_dx) axe_y = np.arange(min_y - dy/2, max_y + dy/2 + sub_dy, sub_dy) values = np.zeros((len(axe_x), len(axe_y))) # filling grid with density for i, pt in enumerate(self.xy): x = pt[0] y = pt[1] ind_x = np.argmin(np.abs(axe_x - x)) ind_y = np.argmin(np.abs(axe_y - y)) slic_x = slice(ind_x - subres2 + 1, ind_x + subres2) slic_y = slice(ind_y - subres2 + 1, ind_y + subres2) if ponderated: values[slic_x, slic_y] += self.v[i] else: values[slic_x, slic_y] += 1 values /= (dx*dy) values = values[subres2:-subres2, subres2:-subres2] axe_x = axe_x[subres2:-subres2] axe_y = axe_y[subres2:-subres2] # if we do not use a subgrid else: # creating grid min_x = np.min(self.xy[:, 0]) max_x = np.max(self.xy[:, 0]) min_y = np.min(self.xy[:, 1]) max_y = np.max(self.xy[:, 1]) axe_x, dx = np.linspace(min_x, max_x, res_x, retstep=True) axe_y, dy = np.linspace(min_y, max_y, res_y, retstep=True) values = np.zeros((len(axe_x), len(axe_y))) # filling grid with density for i, pt in enumerate(self.xy): x = pt[0] y = pt[1] ind_x = np.argmin(np.abs(axe_x - x)) ind_y = np.argmin(np.abs(axe_y - y)) if ponderated: values[ind_x, ind_y] += self.v[i] else: values[ind_x, ind_y] += 1 values /= (dx*dy) # return the field if raw: return values else: from .scalarfield import ScalarField sf = ScalarField() if ponderated: unit_values = self.unit_v/self.unit_x/self.unit_y else: unit_values = 1/self.unit_x/self.unit_y sf.import_from_arrays(axe_x, axe_y, values, mask=False, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=unit_values) return sf
[docs] def get_clusters(self, eps, min_samples=5): """ Perform DBSCAN clustering from vector array or distance matrix. (see sklearn.cluster.DBSCAN) Notes ------ DBSCAN - Density-Based Spatial Clustering of Applications with Noise. Finds core samples of high density and expands clusters from them. Good for data which contains clusters of similar density. """ try: import sklearn.cluster as clst except ImportError: raise Exception("You need to install `sklearn` to use this " "functionnality") X = self.xy db = clst.DBSCAN(eps=eps, min_samples=min_samples).fit(X) labels = db.labels_ uniq_labels = np.array(list(set(labels))) nmb_cluster = len(uniq_labels) if -1 in uniq_labels: nmb_cluster -= 1 # create separated points objects points = [] for lab in uniq_labels: filt = labels == lab if len(self.v) == len(self.xy): new_v = self.v[filt] else: new_v = self.v tmp_pts = Points(self.xy[filt], new_v, unit_x=self.unit_x, unit_y=self.unit_y, unit_v=self.unit_v) points.append(tmp_pts) return points
[docs] def get_envelope(self, alpha=None): """ Return the convex or concave hull (if alpha specified) for the set of points. Parameters ---------- alpha : number maximum distance between two points of the hull. Notes ----- Credit to mlaloux ( """ # import shapely functions try: from shapely.geometry import mapping from shapely.geometry import MultiLineString from shapely.ops import polygonize, cascaded_union except ImportError: raise Exception("This functionnality need 'shapely' module") if alpha is None: alpha = np.inf # triangulate from scipy.spatial import Delaunay points = self.xy tri = Delaunay(points) # add and sort points edges = set() edge_points = [] def add_edge(i, j): """Add a line between the i-th and j-th points, if not in the list already""" if (i, j) in edges or (j, i) in edges: return edges.add((i, j)) edge_points.append(points[[i, j]]) points = np.array(points) for ia, ib, ic in tri.vertices: pa = points[ia] pb = points[ib] pc = points[ic] # Lengths of sides of triangle a = np.sqrt((pa[0]-pb[0])**2 + (pa[1]-pb[1])**2) b = np.sqrt((pb[0]-pc[0])**2 + (pb[1]-pc[1])**2) c = np.sqrt((pc[0]-pa[0])**2 + (pc[1]-pa[1])**2) # Semiperimeter of triangle s = (a + b + c)/2.0 # Area of triangle by Heron's formula area = np.sqrt(s*(s-a)*(s-b)*(s-c)) circum_r = a*b*c/(4.0*area) # Here's the radius filter. #if circum_r < 1.0/alpha: if circum_r < alpha: add_edge(ia, ib) add_edge(ib, ic) add_edge(ic, ia) # concatenate polygons m = MultiLineString(edge_points) triangles = list(polygonize(m)) hull = mapping(cascaded_union(triangles))['coordinates'][0] hull = np.array(hull, dtype=float) if hull.ndim == 3: hull = hull[0] # transform to Points object pt = Points(xy=hull, v=[], unit_x=self.unit_x, unit_y=self.unit_y) return pt
[docs] def get_velocity(self, incr=1, smooth=0, xaxis='time'): """ Assuming that associated 'v' values are times for each points, compute the velocity of the trajectory. Parameters ---------- incr : integer, optional Increment use to get used points (default is 1). smooth : number, optional Cut off frequency for the lowpass filter. xaxis : string, optional Value to put in the profile x axis, can be 'time' (default), 'x' or 'y'. Returns ------- Vx : Profile object Profile of x velocity versus time. Vy : Profile object Profile of y velocity versus time. """ if smooth < 0: raise ValueError() if xaxis not in ['time', 'x', 'y']: raise ValueError() # checking 'v' presence if len(self.v) == 0: raise Exception() # sorting points by time ind_sort = np.argsort(self.v) times = self.v[ind_sort] xy = self.xy[ind_sort] x = xy[:, 0] y = xy[:, 1] # using increment if necessary if incr != 1: x = x[::incr] y = y[::incr] times = times[::incr] dx = x[1::] - x[:-1] dy = y[1::] - y[:-1] dt = times[1::] - times[:-1] # smoothing if necessary if smooth != 0: tmp_pts = Points(list(zip(x, y)), times) tmp_pts.smooth(tos='lowpass', size=smooth, inplace=True) x = tmp_pts.xy[:, 0] y = tmp_pts.xy[:, 1] # getting velocity between points Vx = np.array([(x[i + 1] - x[i])/dt[i] for i in np.arange(len(x) - 1)]) Vy = np.array([(y[i + 1] - y[i])/dt[i] for i in np.arange(len(y) - 1)]) # getting xaxis if xaxis == 'time': x_prof = times[:-1] + dt/2. elif xaxis == 'x': x_prof = x[:-1] + dx/2. elif xaxis == 'y': x_prof = y[:-1] + dy/2. # returning profiles from .profile import Profile unit_Vx = self.unit_x/self.unit_v Vx *= unit_Vx.asNumber() unit_Vx /= unit_Vx.asNumber() prof_x = Profile(x_prof, Vx, mask=False, unit_x=self.unit_v, unit_y=unit_Vx) unit_Vy = self.unit_y/self.unit_v Vy *= unit_Vy.asNumber() unit_Vy /= unit_Vy.asNumber() prof_y = Profile(x_prof, Vy, mask=False, unit_x=self.unit_v, unit_y=unit_Vy) return prof_x, prof_y
[docs] def get_evolution_on_sf(self, SF, axe_x=None): """ Return the evolution of the value represented by a scalar field, on the path of the trajectory. Parameters ---------- SF : ScalarField object axe_x : string, optional What put in the x axis (can be 'x', 'y', 'v'). default is 'v' when available and 'x' else. Returns ------- evol : Profile object """ # check parameters from .scalarfield import ScalarField if not isinstance(SF, ScalarField): raise TypeError() if len(self.xy) == 0: from .profile import Profile return Profile() if axe_x is None: if len(self.v) == len(self.xy): axe_x = 'v' else: axe_x = 'x' if not isinstance(axe_x, STRINGTYPES): raise TypeError() # get x values if axe_x == 'v': if len(self.v) == 0: raise ValueError() x_prof = self.v unit_x = self.unit_v elif axe_x == 'x': x_prof = self.xy[:, 0] unit_x = self.unit_x elif axe_x == 'y': x_prof = self.xy[:, 1] unit_x = self.unit_y else: raise ValueError() # get the y value y_prof = np.empty((len(self.xy)), dtype=float) for i, pt in enumerate(self.xy): y_prof[i] = SF.get_value(*pt, ind=False, unit=False) mask = np.isnan(y_prof) unit_y = SF.unit_values # returning evol = Profile(x_prof, y_prof, mask=mask, unit_x=unit_x, unit_y=unit_y) return evol
[docs] def get_evolution_on_tsf(self, TSF, axe_x=None): """ Return the evolution of the value represented by scalar fields, on the path of the trajectory. Timse of the TSF must be consistent with the times of the Points. Parameters ---------- TSF : tsf.TemporalScalarField object axe_x : string, optional What put in the x axis (can be 'x', 'y', 'v'). default is 'v' (associated with time) Returns ------- evol : Profile object """ # check parameters from . import temporalscalarfields as tsf from .profile import Profile if not isinstance(TSF, tsf.TemporalScalarFields): raise TypeError() if len(self.xy) == 0: return Profile() if axe_x is None: axe_x = 'v' if not isinstance(axe_x, STRINGTYPES): raise TypeError() # get x values if axe_x == 'v': if len(self.v) == 0: raise ValueError() x_prof = self.v unit_x = self.unit_v elif axe_x == 'x': x_prof = self.xy[:, 0] unit_x = self.unit_x elif axe_x == 'y': x_prof = self.xy[:, 1] unit_x = self.unit_y else: raise ValueError() # get the y value times = self.v y_prof = np.empty((len(self.xy)), dtype=float) for i, pt in enumerate(self.xy): time = times[i] SF = TSF.fields[TSF.times == time][0] y_prof[i] = SF.get_value(*pt, ind=False, unit=False) mask = np.isnan(y_prof) unit_y = TSF.unit_values # returning evol = Profile(x_prof, y_prof, mask=mask, unit_x=unit_x, unit_y=unit_y) return evol
[docs] def fit(self, kind='polynomial', order=2, simplify=False): """ Return the parametric coefficients of the fitting curve on the points. Parameters ---------- kind : string, optional The kind of fitting used. Can be 'polynomial' or 'ellipse'. order : integer Approximation order for the fitting. Simplify : boolean or string, optional Can be False (default), 'x' or 'y'. Perform a simplification (see Points.Siplify()) before the fitting. Returns ------- p : array, only for polynomial fitting Polynomial coefficients, highest power first radii : array, only for ellipse fitting Ellipse demi-axes radii. center : array, only for ellipse fitting Ellipse center coordinates. alpha : number Angle between the x axis and the major axis. """ if not isinstance(order, int): raise TypeError("'order' must be an integer") if not isinstance(kind, STRINGTYPES): raise TypeError("'kind' must be a string") if not simplify: xytmp = self.xy elif simplify == 'x': xytmp = self.simplify(axe=0).xy elif simplify == 'y': xytmp = self.simplify(axe=1).xy if kind == 'polynomial': p = np.polyfit(xytmp[:, 0], xytmp[:, 1], deg=order) return p elif kind == 'ellipse': import fit_ellipse as fte res = fte.fit_ellipse(xytmp) radii, center, alpha = fte.get_parameters(res) return radii, center, alpha
[docs] def add(self, pt, v=None): """ Add a new point. Parameters ---------- pt : 2x1 array of numbers Point to add. v : number, optional Value of the point (needed if other points have values). """ # check parameters if not isinstance(pt, ARRAYTYPES): raise TypeError() pt = np.array(pt, subok=True) if not pt.shape == (2,): raise ValueError() if not isinstance(pt[0], NUMBERTYPES): raise TypeError() if v is not None: if not isinstance(v, NUMBERTYPES): raise TypeError() # store new data self.__xy = np.append(self.xy, [pt], axis=0) if v is None and self.v.shape[0] != 0: raise ValueError('You should specify an associated value : v') if v is not None: self.__v = np.append(self.v, v)
[docs] def remove(self, ind): """ Remove the point number 'ind' of the points cloud. In place. Parameters ---------- ind : integer or array of integer """ if isinstance(ind, INTEGERTYPES): ind = [ind] elif isinstance(ind, ARRAYTYPES): if not np.all([isinstance(val, int) for val in ind]): raise TypeError("'ind' must be an integer or an array of" " integer") ind = np.array(ind) else: raise TypeError("'ind' must be an integer or an array of integer") tmp_v = self.v.copy() self.xy = np.delete(self.xy, ind, axis=0) if len(self.v) == len(self.xy) + 1: self.v = np.delete(tmp_v, ind, axis=0)
[docs] def change_unit(self, axe, new_unit): """ Change the unit of an axe. Parameters ---------- axe : string 'y' for changing the profile y axis unit 'x' for changing the profile x axis unit 'v' for changing the profile values unit new_unit : Unum.unit object or string The new unit. """ if isinstance(new_unit, STRINGTYPES): new_unit = make_unit(new_unit) if not isinstance(new_unit, unum.Unum): raise TypeError() if not isinstance(axe, STRINGTYPES): raise TypeError() if axe == 'x': old_unit = self.unit_x new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.xy[:, 0] *= fact self.unit_x = new_unit/fact elif axe == 'y': old_unit = self.unit_y new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.xy[:, 1] *= fact self.unit_y = new_unit/fact elif axe == 'v': old_unit = self.unit_v new_unit = old_unit.asUnit(new_unit) fact = new_unit.asNumber() self.v *= fact self.unit_v = new_unit/fact else: raise ValueError()
[docs] def set_origin(self, x=None, y=None): """ Set the given point (x, y) as the new referential. Parameters ---------- x: number . y: number . """ # Check if x is not None: if not isinstance(x, NUMBERTYPES): raise TypeError() self.xy[:, 0] -= x if y is not None: if not isinstance(y, NUMBERTYPES): raise TypeError() self.xy[:, 1] -= y
[docs] def crop(self, intervx=None, intervy=None, intervv=None, inplace=True, ind=False): """ Crop the points cloud. Parameters ---------- intervx : 2x1 tuple Interval on x axis intervy : 2x1 tuple Interval on y axis intervv : 2x1 tuple Interval on v values Returns ------- tmp_pts : Points object croped version of the point cloud. """ # if inplace: tmp_pts = self else: tmp_pts = self.copy() # Getting cropping mask if ind: if intervx is not None or intervy is not None: raise ValueError() mask = np.ones(len(self.xy), dtype=bool) mask[intervv[0]:intervv[1]] = False else: mask = np.zeros(len(self.xy), dtype=bool) if intervx is not None: out_zone = np.logical_or(self.xy[:, 0] < intervx[0], self.xy[:, 0] > intervx[1]) mask = np.logical_or(mask, out_zone) if intervy is not None: out_zone = np.logical_or(self.xy[:, 1] < intervy[0], self.xy[:, 1] > intervy[1]) mask = np.logical_or(mask, out_zone) if intervv is not None and len(self.v) != 0: out_zone = np.logical_or(self.v < intervv[0], self.v > intervv[1]) mask = np.logical_or(mask, out_zone) # Cropping values tmp_pts.__xy = tmp_pts.xy[~mask, :] if len(tmp_pts.v) != 0: tmp_pts.__v = tmp_pts.v[~mask] # returning if not inplace: return tmp_pts
[docs] def cut(self, intervx=None, intervy=None): """ Return a point cloud where the given area has been removed. Parameters ---------- intervx : 2x1 tuple Interval on x axis intervy : 2x1 tuple Interval on y axis Returns ------- tmp_pts : Points object Cutted version of the point cloud. """ tmp_pts = self.copy() mask = np.ones(len(self.xy)) if intervx is not None: out_zone = np.logical_and(self.xy[:, 0] > intervx[0], self.xy[:, 0] < intervx[1]) mask = np.logical_and(mask, out_zone) if intervy is not None: out_zone = np.logical_and(self.xy[:, 1] > intervy[0], self.xy[:, 1] < intervy[1]) mask = np.logical_and(mask, out_zone) tmp_pts.xy = tmp_pts.xy[~mask, :] if len(tmp_pts.v) != 0: tmp_pts.v = tmp_pts.v[~mask] return tmp_pts
[docs] def scale(self, scalex=1., scaley=1., scalev=1., inplace=False): """ Change the scale of the axis. Parameters ---------- scalex, scaley, scalev : numbers or Unum objects scales along x, y and v inplace : boolean, optional If 'True', scaling is done in place, else, a new instance is returned. """ # check params if not isinstance(scalex, NUMBERTYPES + (unum.Unum, )): raise TypeError() if not isinstance(scaley, NUMBERTYPES + (unum.Unum, )): raise TypeError() if not isinstance(scalev, NUMBERTYPES + (unum.Unum, )): raise TypeError() if not isinstance(inplace, bool): raise TypeError() if inplace: tmp_pt = self else: tmp_pt = self.copy() # adapt unit if isinstance(scalex, unum.Unum): new_unit = scalex*tmp_pt.unit_x fact = new_unit.asNumber() new_unit /= fact tmp_pt.unit_x = new_unit scalex = fact if isinstance(scaley, unum.Unum): new_unit = scaley*tmp_pt.unit_y fact = new_unit.asNumber() new_unit /= fact tmp_pt.unit_y = new_unit scaley = fact if isinstance(scalev, unum.Unum): new_unit = scalev*tmp_pt.unit_v fact = new_unit.asNumber() new_unit /= fact tmp_pt.unit_v = new_unit scalev = fact # loop if scalex != 1. or scaley != 1.: tmp_pt.xy *= np.array([scalex, scaley]) if scalev != 1.: tmp_pt.v *= scalev # returning if not inplace: return tmp_pt
[docs] def rotate(self, angle, inplace=False): """ Rotate the point set. Parameters ---------- angle: number Rotation angle in radian. """ # Checks if inplace: tmpp = self else: tmpp = self.copy() # x, y = self.xy[:, 0], self.xy[:, 1] new_x = x*np.cos(angle) - y*np.sin(angle) new_y = y*np.cos(angle) + x*np.sin(angle) tmpp.xy[:, 0] = new_x tmpp.xy[:, 1] = new_y # Return return tmpp
[docs] def remove_doublons(self, method='average', inplace=False, eps_rel=1e-6): """ Replace values associated to the same 'v' by their average. Parameters ---------- method : string in {'average', 'max', 'min'} Method used to remove the doublons. """ if len(self.v) == 0: raise Exception() if inplace: tmp_pt = self else: tmp_pt = self.copy() vs = tmp_pt.v ord_magn = (np.sum(vs**2)/len(vs))**.5 nmb_dec = -int(round(np.log10(ord_magn*eps_rel))) tmp_vs = np.round(vs, decimals=nmb_dec) new_vs = np.sort(list(set(tmp_vs))) if method == 'average': new_xy = [np.mean(tmp_pt.xy[tmp_vs == vi], axis=0) for vi in new_vs] elif method == 'min': new_xy = [np.min(tmp_pt.xy[tmp_vs == vi], axis=0) for vi in new_vs] elif method == 'max': new_xy = [np.max(tmp_pt.xy[tmp_vs == vi], axis=0) for vi in new_vs] else: raise ValueError() tmp_pt.xy = new_xy tmp_pt.v = new_vs # returning if not inplace: return tmp_pt
[docs] def reverse(self): """ Return a Points object where x and y axis are swaped. """ tmp_pt = self.copy() xy_tmp = tmp_pt.xy*0 xy_tmp[:, 0] = tmp_pt.xy[:, 1] xy_tmp[:, 1] = tmp_pt.xy[:, 0] tmp_pt.xy = xy_tmp return tmp_pt
[docs] def decompose(self): """ return a tuple of Points object, with only one point per object. """ if len(self) == 1: return [self] if len(self) != len(self.v): raise Exception() pts_tupl = [] for i in np.arange(len(self)): pts_tupl.append(Points([self.xy[i]], [self.v[i]], self.unit_x, self.unit_y, self.unit_v, return pts_tupl
[docs] def sort(self, ref='x', inplace=False): """ Sort the points according to the reference. Parameters ---------- ref : string or array of indice can be 'x', 'y' or 'v' to sort according those values or an array of indice inplace ; boolean If 'True', sort in place, else, return an new sorted instance. """ # check parameters if isinstance(ref, STRINGTYPES): if ref not in ['x', 'y', 'v']: raise ValueError elif isinstance(ref, ARRAYTYPES): ref = np.array(ref, dtype=int) if len(ref) != len(self.xy): raise ValueError() else: raise TypeError() if not isinstance(inplace, bool): raise TypeError() # get order if ref == 'x': order = np.argsort(self.xy[:, 0]) elif ref == 'y': order = np.argsort(self.xy[:, 1]) elif ref == 'v': if len(self.v) == 0: raise ValueError() order = np.argsort(self.v) else: order = ref # reordering if inplace: tmp_pt = self else: tmp_pt = self.copy() tmp_pt.xy = tmp_pt.xy[order] if len(tmp_pt.v) == len(tmp_pt.xy): tmp_pt.v = tmp_pt.v[order] # returning if not inplace: return tmp_pt
[docs] def order_on_line(self, inplace=False): """ Re-order the set of points to try to create a line. """ if inplace: tmp_pts = self else: tmp_pts = self.copy() # from sklearn.neighbors import NearestNeighbors clf = NearestNeighbors(2).fit(self.xy) G = clf.kneighbors_graph() import networkx as nx T = nx.from_scipy_sparse_matrix(G) order = list(nx.dfs_preorder_nodes(T, 0)) paths = [list(nx.dfs_preorder_nodes(T, i)) for i in range(len(self.xy))] mindist = np.inf minidx = 0 for i in range(len(self.xy)): p = paths[i] # order of nodes ordered = self.xy[p] # ordered nodes # find cost of that order by the sum of euclidean distances between points (i) and (i+1) cost = (((ordered[:-1] - ordered[1:])**2).sum(1)).sum() if cost < mindist: mindist = cost minidx = i opt_order = paths[minidx] new_x = self.xy[:, 0][opt_order] new_y = self.xy[:, 1][opt_order] # return tmp_pts.xy = np.array(list(zip(new_x, new_y))) return tmp_pts
[docs] def remove_nans(self, inplace=False): """ Remove the points containing nans values. """ if inplace: tmp_pts = self else: tmp_pts = self.copy() # get indices to remove inds = np.logical_or(np.isnan(tmp_pts.xy[:, 0]), np.isnan(tmp_pts.xy[:, 1])) inds = np.logical_or(inds, np.isnan(tmp_pts.v)) # remove for ind in np.where(inds)[0][::-1]: tmp_pts.remove(ind) # return if not inplace: return tmp_pts
[docs] def augment_resolution(self, fact=2, interp='linear', inplace=False): """ Augment the temporal resolution of the points. Only have sense if points are sorted to set some kind of trajectory. Parameters ---------- fact : integer Resolution augmentation needed (default is '2', for a result profile with twice more points) interp : string in ['linear', 'nearest', slinear', 'quadratic, 'cubic'] Specifies the kind of interpolation as a string (Default is 'linear'). slinear', 'quadratic' and 'cubic' refer to a spline interpolation of first, second or third order. inplace bool . Note ---- If masked values are present, they are interpolated as well, using the surrounding values. """ if inplace: tmp_pts = self else: tmp_pts = self.copy() is_v = len(self.v) == len(self.xy) # get and interpolate from .profile import Profile tmp_x = Profile(list(range(len(self.xy))), self.xy[:, 0]) tmp_x.augment_resolution(fact=fact, interp=interp, inplace=True) tmp_y = Profile(list(range(len(self.xy))), self.xy[:, 1]) tmp_y.augment_resolution(fact=fact, interp=interp, inplace=True) if is_v: tmp_v = Profile(list(range(len(self.xy))), self.v) tmp_v.augment_resolution(fact=fact, interp=interp, inplace=True) # store tmp_pts.xy = list(zip(tmp_x.y, tmp_y.y)) if is_v: tmp_pts.v = tmp_v.y if not inplace: return tmp_pts
[docs] def smooth(self, tos='gaussian', size=None, inplace=False, **kw): """ Return a smoothed points field. Parameters ---------- tos : string, optional Type of smoothing, can be 'uniform' (default), 'gaussian' or 'lowpass'. size : number, optional radius of the smoothing for 'uniform', radius of the smoothing for 'gaussian', cut off frequency for 'lowpass' Default are 3 for 'uniform', 1 for 'gaussian' and 0.1 for ' lowpass'. inplace : boolean If 'False', return a smoothed points field else, smooth in place. kw : dic Additional parameters for ndimage methods (See ndimage documentation) """ if not isinstance(tos, STRINGTYPES): raise TypeError("'tos' must be a string") if size is None and tos == 'uniform': size = 3 elif size is None and tos == 'gaussian': size = 1 elif size is None and tos == 'lowpass': size = 0.1 # default smoothing border mode to 'nearest' if tos in ['uniform', 'gaussian'] and 'mode' not in list(kw.keys()): kw.update({'mode': 'nearest'}) # getting data if not inplace: tmp_pts = self.copy() y = self.xy[:, 1] x = self.xy[:, 0] if len(self.v) != 0: v = self.v # smoothing if tos == "uniform": x = ndimage.uniform_filter(x, size, **kw) y = ndimage.uniform_filter(y, size, **kw) if len(self.v) != 0: v = ndimage.uniform_filter(v, size, **kw) elif tos == "gaussian": x = ndimage.gaussian_filter(x, size, **kw) y = ndimage.gaussian_filter(y, size, **kw) if len(self.v) != 0: v = ndimage.gaussian_filter(v, size, **kw) elif tos == 'lowpass': from scipy import signal x = self.xy[:, 0] y = self.xy[:, 1] N = 2 Wn = size B, A = signal.butter(N, Wn, output='ba') x = signal.filtfilt(B, A, x) y = signal.filtfilt(B, A, y) if len(self.v) != 0: v = signal.filtfilt(B, A, v) else: raise ValueError("'tos' must be 'uniform', 'gaussian' or " "'lowpass'") # storing if inplace: self.xy[:, 0] = x self.xy[:, 1] = y if len(self.v) != 0: self.v = v else: tmp_pts.xy[:, 0] = x tmp_pts.xy[:, 1] = y if len(tmp_pts.v) != 0: tmp_pts.v = v return tmp_pts
def _display(self, kind=None, axe_x=None, axe_y=None, axe_color=None, **plotargs): if len(self.xy) == 0: return None # x values if axe_x == 'x' or axe_x is None: x_values = self.xy[:, 0] elif axe_x == 'y': x_values = self.xy[:, 1] elif axe_x == 'v': if len(self.v) != len(self.xy): raise Exception() x_values = self.v else: raise ValueError() # y values if axe_y == 'x': y_values = self.xy[:, 0] elif axe_y == 'y' or axe_y is None: y_values = self.xy[:, 1] elif axe_y == 'v': if len(self.v) != len(self.xy): raise Exception() y_values = self.v else: raise ValueError() # color values if axe_color == 'x': color_values = self.xy[:, 0] elif axe_color == 'y': color_values = self.xy[:, 1] elif axe_color in [None, 'v']: if len(self.v) != len(self.xy): color_values = None if ('c' not in plotargs.keys() and 'color' not in plotargs.keys()): plotargs['color'] = 'k' color_values = self.v else: raise ValueError() # check c and color if 'c' in plotargs.keys(): plotargs['color'] = None if 'color' in plotargs.keys(): plotargs['c'] = None # display dp = pplt.Displayer(x=x_values, y=y_values, values=color_values, kind=kind, **plotargs) ax = dp.draw() return ax
[docs] def display(self, kind=None, axe_x=None, axe_y=None, axe_color=None, **plotargs): """ Display the set of points. Parameters ---------- kind : string, optional Can be 'plot' (default if points have not values). or 'scatter' (default if points have values). or 'colored_plot'. axe_x, axe_y, axe_color : strings in ['x', 'y', 'v'] To determine wich value has to be plotted along which axis, and whith value is used to color the scattered points. Default plot 'y' to 'x' with colors from 'v'. **plotargs : dic Additionnal arguments sent to 'plot' or 'scatter' """ # default values if axe_x is None: if axe_y != 'x': axe_x = 'x' else: axe_x = 'y' if axe_y is None: if axe_x != 'y': axe_y = 'y' else: axe_y = 'x' if axe_color is None: axes = ['x', 'y', 'v'] try: axes.remove(axe_x) axes.remove(axe_y) except ValueError: axes = ['v'] axe_color = axes[0] # display the values plot = self._display(kind, axe_x=axe_x, axe_y=axe_y, axe_color=axe_color, **plotargs) if len(self.v) != 0 and kind is not 'plot': cb = plt.colorbar(plot) cb.set_label(self.unit_v.strUnit()) # cb label if axe_color == 'x': cb.set_label('X ' + self.unit_x.strUnit()) elif axe_color == 'y': cb.set_label('Y ' + self.unit_y.strUnit()) else: cb.set_label('V ' + self.unit_v.strUnit()) # x axis label if axe_x == 'x': plt.xlabel('X ' + self.unit_x.strUnit()) elif axe_x == 'y': plt.xlabel('Y ' + self.unit_y.strUnit()) else: plt.xlabel('V ' + self.unit_v.strUnit()) # y axis label if axe_y == 'x': plt.ylabel('X ' + self.unit_x.strUnit()) elif axe_y == 'y': plt.ylabel('Y ' + self.unit_y.strUnit()) else: plt.ylabel('V ' + self.unit_v.strUnit()) if is None: plt.title('Set of points') else: plt.title( plt.axis('equal') return plot
[docs] def display3D(self, kind='plot', xlabel='', ylabel='', zlabel='', title='', **plotargs): """ Display the points on a 3D graph. Parameters ---------- kind : string, optional Kind of graph to use, can be 'plot' or 'surf'. xlabel, ylabel, zlabel : string, optional Label fo each axis (respectively 'x', 'y', and 'v') title : strin, optional Title **plotargs : Additional parameters feeded to matplotlib """ # create 3D plot ax = plt.gca(projection='3d') # display data if kind == 'plot': ax.plot(self.xy[:, 0], self.xy[:, 1], self.v, **plotargs) elif kind == 'surf': ax.plot_trisurf(self.xy[:, 0], self.xy[:, 1], self.v) else: raise ValueError() # labels ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_zlabel(zlabel) # title ax.set_title(title) # plt.tight_layout() return ax
[docs] def export_to_profile(self, axe_x='x', axe_y='y'): """ Export the unsorted point object to a sorted Profile object. Parameters ---------- axe_x, axe_y : strings in ['x', 'y', 'v'] Which value used to construct the profile """ # check if axe_x not in ['x', 'y', 'v']: raise ValueError() if axe_y not in ['x', 'y', 'v']: raise ValueError() # get data if axe_x == 'x': x = self.xy[:, 0] unit_x = self.unit_x elif axe_x == 'y': x = self.xy[:, 1] unit_x = self.unit_y else: x = self.v unit_x = self.unit_v if axe_y == 'x': y = self.xy[:, 0] unit_y = self.unit_x elif axe_y == 'y': y = self.xy[:, 1] unit_y = self.unit_y else: y = self.v unit_y = self.unit_v # construct profile from .profile import Profile prof = Profile(x=x, y=y, mask=False, unit_x=unit_x, unit_y=unit_y) return prof