# -*- coding: utf-8 -*-
#!/bin/env python3
# Copyright (C) 2003-2007 Gaby Launay
# Author: Gaby Launay <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0
# This file is part of IMTreatment.
# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import matplotlib.pyplot as plt
from .. import plotlib as pplt
import numpy as np
import unum
import copy
from scipy import stats
from scipy import ndimage
from ..utils.types import ARRAYTYPES, INTEGERTYPES, NUMBERTYPES, STRINGTYPES
from ..utils import make_unit
[docs]class Points(object):
def __init__(self, xy=np.empty((0, 2), dtype=float), v=[],
unit_x='', unit_y='', unit_v='', name=''):
"""
Class representing a set of points.
You can use 'make_unit' to provide unities.
Parameters
----------
xy : nx2 array.
Representing the coordinates of each point of the set (n points).
v : n array, optional
Representing values attached at each points.
unit_x : Unit object, optional
X unit_y.
unit_y : Unit object, optional
Y unit_y.
unit_v : Unit object, optional
values unit_y.
name : string, optional
Name of the points set
"""
self.__v = []
if len(xy) == 0:
xy = np.empty((0, 2), dtype=float)
self.xy = xy
self.v = v
self.unit_v = unit_v
self.unit_x = unit_x
self.unit_y = unit_y
self.name = name
def __iter__(self):
if self.v is None or len(self.v) == 0:
for i in np.arange(len(self.xy)):
yield self.xy[i]
else:
for i in np.arange(len(self.xy)):
yield self.xy[i], self.v[i]
def __len__(self):
return self.xy.shape[0]
def __add__(self, another):
if isinstance(another, Points):
# trivial additions
if len(self.xy) == 0:
return another.copy()
elif len(another.xy) == 0:
return self.copy()
# checking unit systems
if len(self.xy) != 0:
try:
self.unit_x + another.unit_x
self.unit_y + another.unit_y
if self.v is not None and another.v is not None:
self.unit_v + another.unit_v
except unum.IncompatibleUnitsError:
raise ValueError("Units system are not the same")
else:
self.unit_x = another.unit_x
self.unit_y = another.unit_y
if another.v is not None:
self.unit_v = another.unit_v
# compacting coordinates
if another.xy.shape == (0,):
new_xy = self.xy
elif self.xy.shape == (0,):
xy = another.xy
xy[:, 0] = xy[:, 0]*(self.unit_x/another.unit_x).asNumber()
xy[:, 1] = xy[:, 1]*(self.unit_y/another.unit_y).asNumber()
new_xy = xy
elif another.xy.shape == (0,) and self.xy.shape == (0,):
new_xy = np.array([[]])
else:
xy = another.xy
xy[:, 0] = xy[:, 0]*(self.unit_x/another.unit_x).asNumber()
xy[:, 1] = xy[:, 1]*(self.unit_y/another.unit_y).asNumber()
new_xy = np.append(self.xy, xy, axis=0)
# testing v presence
v_presence = True
if self.v is None and another.v is None:
if len(self.xy) != 0:
v_presence = False
elif self.v is not None and another.v is not None:
v_presence = True
else:
raise Exception()
# compacting points and returning
if v_presence:
if self.v is None and another.v is None:
v = np.array([])
elif self.v is None:
v = another.v*(self.unit_v/another.unit_v).asNumber()
elif another.v is None:
v = self.v
else:
v_tmp = another.v*(self.unit_v/another.unit_v).asNumber()
v = np.append(self.v, v_tmp)
return Points(new_xy, v,
unit_x=self.unit_x,
unit_y=self.unit_y,
unit_v=self.unit_v)
else:
return Points(new_xy,
unit_x=self.unit_x,
unit_y=self.unit_y)
else:
raise Exception("You can't add {} to Points objects"
.format(type(another)))
def __eq__(self, obj):
if not isinstance(obj, Points):
return False
if not np.all(self.xy == obj.xy):
return False
if not np.all(self.v == obj.v):
return False
if not self.unit_x == obj.unit_x:
return False
if not self.unit_y == obj.unit_y:
return False
if not self.unit_v == obj.unit_v:
return False
return True
@property
def xy(self):
return self.__xy
@xy.setter
def xy(self, values):
values = np.asarray(values, dtype=float)
if len(values != 0):
if not values.ndim == 2:
raise ValueError("ndim of xy is {} and should be 2"
.format(values.ndim))
if not values.shape[1] == 2:
raise ValueError()
self.__xy = values
if len(values) != len(self.__v):
self.__v = np.array([])
@xy.deleter
def xy(self):
raise Exception("Nope, can't do that")
@property
def v(self):
return self.__v
@v.setter
def v(self, values):
if values is None:
self__v = None
else:
values = np.array(values, subok=True)
if not values.ndim == 1:
raise ValueError()
if not len(values) in [0, len(self.__xy)]:
raise ValueError()
self.__v = values
@v.deleter
def v(self):
raise Exception("Nope, can't do that")
@property
def unit_x(self):
return self.__unit_x
@unit_x.setter
def unit_x(self, unit):
if isinstance(unit, unum.Unum):
self.__unit_x = unit
elif isinstance(unit, STRINGTYPES):
try:
self.__unit_x = make_unit(unit)
except (ValueError, TypeError):
raise Exception()
else:
raise Exception()
@unit_x.deleter
def unit_x(self):
raise Exception("Nope, can't delete 'unit_x'")
@property
def unit_y(self):
return self.__unit_y
@unit_y.setter
def unit_y(self, unit):
if isinstance(unit, unum.Unum):
self.__unit_y = unit
elif isinstance(unit, STRINGTYPES):
try:
self.__unit_y = make_unit(unit)
except (ValueError, TypeError):
raise Exception()
else:
raise Exception()
@unit_y.deleter
def unit_y(self):
raise Exception("Nope, can't delete 'unit_y'")
@property
def unit_v(self):
return self.__unit_v
@unit_v.setter
def unit_v(self, unit):
if isinstance(unit, unum.Unum):
self.__unit_v = unit
elif isinstance(unit, STRINGTYPES):
try:
self.__unit_v = make_unit(unit)
except (ValueError, TypeError):
raise Exception()
else:
raise Exception()
@unit_v.deleter
def unit_v(self):
raise Exception("Nope, can't delete 'unit_v'")
@property
def name(self):
return self.__name
@name.setter
def name(self, name):
if isinstance(name, STRINGTYPES):
self.__name = name
else:
raise Exception()
@name.deleter
def name(self):
raise Exception("Nope, can't delete 'name'")
[docs] def copy(self):
"""
Return a copy of the Points object.
"""
return copy.deepcopy(self)
[docs] def get_points_density(self, bw_method=None, resolution=100,
output_format='normalized', raw=False):
"""
Return a ScalarField with points density.
Parameters
----------
bw_method : str, scalar or callable, optional
The method used to calculate the estimator bandwidth.
This can be 'scott', 'silverman', a scalar constant or
a callable. If a scalar, this will be used as percent of
the data std. If a callable, it should take a gaussian_kde
instance as only parameter and return a scalar.
If None (default), 'scott' is used.
resolution : integer, optional
Resolution for the resulting field.
output_format : string, optional
'normalized' (default) : give position probability
(integral egal 1).
'ponderated' : give position probability ponderated by the number
or points (integral egal number of points).
'concentration' : give local concentration (in point per surface).
raw : boolean, optional
If 'False' (default), return a ScalarField object,
if 'True', return numpy array.
Returns
-------
density : array, ScalarField object or None
Return 'None' if there is not enough points in the cloud.
"""
# checking points length
if len(self.xy) < 2:
return None
# getting data
min_x = np.min(self.xy[:, 0])
max_x = np.max(self.xy[:, 0])
min_y = np.min(self.xy[:, 1])
max_y = np.max(self.xy[:, 1])
# getting min, max values and resolution in each direction
width_x = max_x - min_x
width_y = max_y - min_y
if width_x == 0 and width_y == 0:
raise Exception()
elif width_x == 0:
min_x = min_x - width_y/2.
max_x = max_x + width_y/2.
width_x = width_y
res_x = resolution
res_y = resolution
elif width_y == 0:
min_y = min_y - width_x/2.
max_y = max_y + width_x/2.
width_y = width_x
res_x = resolution
res_y = resolution
elif width_x > width_y:
res_x = resolution
res_y = int(np.round(resolution*width_y/width_x))
else:
res_y = resolution
res_x = int(np.round(resolution*width_x/width_y))
if res_x < 2 or res_y < 2:
raise ValueError()
# check potential singular covariance matrix situations
if (np.all(self.xy[:, 0] == self.xy[0, 0]) or
np.all(self.xy[:, 1] == self.xy[0, 1])):
return None
# get kernel using scipy
std = np.mean([np.std(self.xy[:, 0]), np.std(self.xy[:, 1])])
if isinstance(bw_method, NUMBERTYPES):
if width_x > width_y:
ad_len = width_y
else:
ad_len = width_x
ad_bw_method = bw_method*std/ad_len
else:
ad_bw_method = bw_method
kernel = stats.gaussian_kde(self.xy.transpose(),
bw_method=ad_bw_method)
# little adaptation to avoid streched density map
if width_x > width_y:
kernel.inv_cov[0, 0] = np.max([kernel.inv_cov])
else:
kernel.inv_cov[1, 1] = np.max([kernel.inv_cov])
kernel.inv_cov[0, 1] *= 0
kernel.inv_cov[1, 0] *= 0
# creating grid
if width_x > width_y:
dx_border = kernel.factor*width_y/2.
dy_border = dx_border
else:
dx_border = kernel.factor*width_x/2.
dy_border = dx_border
axe_x = np.linspace(min_x - dx_border, max_x + dx_border, res_x)
axe_y = np.linspace(min_y - dy_border, max_y + dy_border, res_y)
X, Y = np.meshgrid(axe_x, axe_y)
X = X.flatten()
Y = Y.flatten()
positions = np.array([[X[i], Y[i]]
for i in np.arange(len(X))]).transpose()
# estimating density
values = kernel(positions)
values = values.reshape((res_y, res_x)).transpose()
# normalize (not normalized yet because of the modification of inv_cov)
dx = axe_x[1] - axe_x[0]
dy = axe_y[1] - axe_y[0]
values /= np.sum(np.sum(values))*(dx)*(dy)
# adapt to wanted output_format
if output_format is None or output_format == "normalized":
unit_values = make_unit('')
elif output_format == 'ponderated':
values = values*len(self.xy)
unit_values = make_unit('')
elif output_format == "percentage":
values = values*100
unit_values = make_unit('')
elif output_format == "concentration":
unit_values = 1/self.unit_x/self.unit_y
values = values*len(self.xy)
else:
raise ValueError()
# return
if np.all(np.isnan(values)) or np.all(values == np.inf):
return None
if raw:
return values
else:
from .scalarfield import ScalarField
sf = ScalarField()
sf.import_from_arrays(axe_x, axe_y, values, mask=False,
unit_x=self.unit_x, unit_y=self.unit_y,
unit_values=unit_values)
return sf
[docs] def get_points_density2(self, res, subres=None, raw=False,
ponderated=False):
"""
Return a ScalarField with points density.
Parameters
----------
res : number or 2x1 array of numbers
fdensity field number of subdivision.
Can be the same number for both axis, or one number per axis
(need to give a tuple).
raw : boolean, optional
If 'False' (default), return a ScalarField object,
if 'True', return numpy array.
ponderated : boolean, optiona
If 'True', values associated to points are used to ponderate the
density field. Default is 'False'.
subres : odd integer, optional
If specified, a subgrid of resolution res*subres is used to
make result more accurate.
"""
# checking parameters
if isinstance(res, int):
res_x = res
res_y = res
elif isinstance(res, ARRAYTYPES):
if len(res) != 2:
raise ValueError()
res_x = res[0]
res_y = res[1]
else:
raise TypeError()
if not isinstance(raw, bool):
raise TypeError()
if not isinstance(ponderated, bool):
raise TypeError()
if isinstance(subres, int) and subres > 0:
subres = np.floor(subres/2)*2
subres2 = (subres)/2
elif subres is None:
pass
else:
raise TypeError()
# If we use a subgrid
if subres is not None:
# creating grid
min_x = np.min(self.xy[:, 0])
max_x = np.max(self.xy[:, 0])
min_y = np.min(self.xy[:, 1])
max_y = np.max(self.xy[:, 1])
dx = (max_x - min_x)/(res_x)
dy = (max_y - min_y)/(res_y)
sub_dx = dx/subres
sub_dy = dy/subres
axe_x = np.arange(min_x - dx/2, max_x + dx/2 + sub_dx, sub_dx)
axe_y = np.arange(min_y - dy/2, max_y + dy/2 + sub_dy, sub_dy)
values = np.zeros((len(axe_x), len(axe_y)))
# filling grid with density
for i, pt in enumerate(self.xy):
x = pt[0]
y = pt[1]
ind_x = np.argmin(np.abs(axe_x - x))
ind_y = np.argmin(np.abs(axe_y - y))
slic_x = slice(ind_x - subres2 + 1, ind_x + subres2)
slic_y = slice(ind_y - subres2 + 1, ind_y + subres2)
if ponderated:
values[slic_x, slic_y] += self.v[i]
else:
values[slic_x, slic_y] += 1
values /= (dx*dy)
values = values[subres2:-subres2, subres2:-subres2]
axe_x = axe_x[subres2:-subres2]
axe_y = axe_y[subres2:-subres2]
# if we do not use a subgrid
else:
# creating grid
min_x = np.min(self.xy[:, 0])
max_x = np.max(self.xy[:, 0])
min_y = np.min(self.xy[:, 1])
max_y = np.max(self.xy[:, 1])
axe_x, dx = np.linspace(min_x, max_x, res_x, retstep=True)
axe_y, dy = np.linspace(min_y, max_y, res_y, retstep=True)
values = np.zeros((len(axe_x), len(axe_y)))
# filling grid with density
for i, pt in enumerate(self.xy):
x = pt[0]
y = pt[1]
ind_x = np.argmin(np.abs(axe_x - x))
ind_y = np.argmin(np.abs(axe_y - y))
if ponderated:
values[ind_x, ind_y] += self.v[i]
else:
values[ind_x, ind_y] += 1
values /= (dx*dy)
# return the field
if raw:
return values
else:
from .scalarfield import ScalarField
sf = ScalarField()
if ponderated:
unit_values = self.unit_v/self.unit_x/self.unit_y
else:
unit_values = 1/self.unit_x/self.unit_y
sf.import_from_arrays(axe_x, axe_y, values, mask=False,
unit_x=self.unit_x, unit_y=self.unit_y,
unit_values=unit_values)
return sf
[docs] def get_clusters(self, eps, min_samples=5):
"""
Perform DBSCAN clustering from vector array or distance matrix.
(see sklearn.cluster.DBSCAN)
Notes
------
DBSCAN - Density-Based Spatial Clustering of Applications with Noise.
Finds core samples of high density and expands clusters from them.
Good for data which contains clusters of similar density.
"""
try:
import sklearn.cluster as clst
except ImportError:
raise Exception("You need to install `sklearn` to use this "
"functionnality")
X = self.xy
db = clst.DBSCAN(eps=eps, min_samples=min_samples).fit(X)
labels = db.labels_
uniq_labels = np.array(list(set(labels)))
nmb_cluster = len(uniq_labels)
if -1 in uniq_labels:
nmb_cluster -= 1
# create separated points objects
points = []
for lab in uniq_labels:
filt = labels == lab
if len(self.v) == len(self.xy):
new_v = self.v[filt]
else:
new_v = self.v
tmp_pts = Points(self.xy[filt], new_v, unit_x=self.unit_x,
unit_y=self.unit_y, unit_v=self.unit_v)
points.append(tmp_pts)
return points
[docs] def get_envelope(self, alpha=None):
"""
Return the convex or concave hull (if alpha specified) for the set of
points.
Parameters
----------
alpha : number
maximum distance between two points of the hull.
Notes
-----
Credit to mlaloux
(https://github.com/mlaloux/Python--alpha-shape_concave_hull)
"""
# import shapely functions
try:
from shapely.geometry import mapping
from shapely.geometry import MultiLineString
from shapely.ops import polygonize, cascaded_union
except ImportError:
raise Exception("This functionnality need 'shapely' module")
if alpha is None:
alpha = np.inf
# triangulate
from scipy.spatial import Delaunay
points = self.xy
tri = Delaunay(points)
# add and sort points
edges = set()
edge_points = []
def add_edge(i, j):
"""Add a line between the i-th and j-th points,
if not in the list already"""
if (i, j) in edges or (j, i) in edges:
return
edges.add((i, j))
edge_points.append(points[[i, j]])
points = np.array(points)
for ia, ib, ic in tri.vertices:
pa = points[ia]
pb = points[ib]
pc = points[ic]
# Lengths of sides of triangle
a = np.sqrt((pa[0]-pb[0])**2 + (pa[1]-pb[1])**2)
b = np.sqrt((pb[0]-pc[0])**2 + (pb[1]-pc[1])**2)
c = np.sqrt((pc[0]-pa[0])**2 + (pc[1]-pa[1])**2)
# Semiperimeter of triangle
s = (a + b + c)/2.0
# Area of triangle by Heron's formula
area = np.sqrt(s*(s-a)*(s-b)*(s-c))
circum_r = a*b*c/(4.0*area)
# Here's the radius filter.
#if circum_r < 1.0/alpha:
if circum_r < alpha:
add_edge(ia, ib)
add_edge(ib, ic)
add_edge(ic, ia)
# concatenate polygons
m = MultiLineString(edge_points)
triangles = list(polygonize(m))
hull = mapping(cascaded_union(triangles))['coordinates'][0]
hull = np.array(hull, dtype=float)
if hull.ndim == 3:
hull = hull[0]
# transform to Points object
pt = Points(xy=hull, v=[], unit_x=self.unit_x,
unit_y=self.unit_y)
return pt
[docs] def get_velocity(self, incr=1, smooth=0, xaxis='time'):
"""
Assuming that associated 'v' values are times for each points,
compute the velocity of the trajectory.
Parameters
----------
incr : integer, optional
Increment use to get used points (default is 1).
smooth : number, optional
Cut off frequency for the lowpass filter.
xaxis : string, optional
Value to put in the profile x axis, can be 'time' (default), 'x'
or 'y'.
Returns
-------
Vx : Profile object
Profile of x velocity versus time.
Vy : Profile object
Profile of y velocity versus time.
"""
if smooth < 0:
raise ValueError()
if xaxis not in ['time', 'x', 'y']:
raise ValueError()
# checking 'v' presence
if len(self.v) == 0:
raise Exception()
# sorting points by time
ind_sort = np.argsort(self.v)
times = self.v[ind_sort]
xy = self.xy[ind_sort]
x = xy[:, 0]
y = xy[:, 1]
# using increment if necessary
if incr != 1:
x = x[::incr]
y = y[::incr]
times = times[::incr]
dx = x[1::] - x[:-1]
dy = y[1::] - y[:-1]
dt = times[1::] - times[:-1]
# smoothing if necessary
if smooth != 0:
tmp_pts = Points(list(zip(x, y)), times)
tmp_pts.smooth(tos='lowpass', size=smooth, inplace=True)
x = tmp_pts.xy[:, 0]
y = tmp_pts.xy[:, 1]
# getting velocity between points
Vx = np.array([(x[i + 1] - x[i])/dt[i]
for i in np.arange(len(x) - 1)])
Vy = np.array([(y[i + 1] - y[i])/dt[i]
for i in np.arange(len(y) - 1)])
# getting xaxis
if xaxis == 'time':
x_prof = times[:-1] + dt/2.
elif xaxis == 'x':
x_prof = x[:-1] + dx/2.
elif xaxis == 'y':
x_prof = y[:-1] + dy/2.
# returning profiles
from .profile import Profile
unit_Vx = self.unit_x/self.unit_v
Vx *= unit_Vx.asNumber()
unit_Vx /= unit_Vx.asNumber()
prof_x = Profile(x_prof, Vx, mask=False, unit_x=self.unit_v,
unit_y=unit_Vx)
unit_Vy = self.unit_y/self.unit_v
Vy *= unit_Vy.asNumber()
unit_Vy /= unit_Vy.asNumber()
prof_y = Profile(x_prof, Vy, mask=False, unit_x=self.unit_v,
unit_y=unit_Vy)
return prof_x, prof_y
[docs] def get_evolution_on_sf(self, SF, axe_x=None):
"""
Return the evolution of the value represented by a scalar field, on
the path of the trajectory.
Parameters
----------
SF : ScalarField object
axe_x : string, optional
What put in the x axis (can be 'x', 'y', 'v').
default is 'v' when available and 'x' else.
Returns
-------
evol : Profile object
"""
# check parameters
from .scalarfield import ScalarField
if not isinstance(SF, ScalarField):
raise TypeError()
if len(self.xy) == 0:
from .profile import Profile
return Profile()
if axe_x is None:
if len(self.v) == len(self.xy):
axe_x = 'v'
else:
axe_x = 'x'
if not isinstance(axe_x, STRINGTYPES):
raise TypeError()
# get x values
if axe_x == 'v':
if len(self.v) == 0:
raise ValueError()
x_prof = self.v
unit_x = self.unit_v
elif axe_x == 'x':
x_prof = self.xy[:, 0]
unit_x = self.unit_x
elif axe_x == 'y':
x_prof = self.xy[:, 1]
unit_x = self.unit_y
else:
raise ValueError()
# get the y value
y_prof = np.empty((len(self.xy)), dtype=float)
for i, pt in enumerate(self.xy):
y_prof[i] = SF.get_value(*pt, ind=False, unit=False)
mask = np.isnan(y_prof)
unit_y = SF.unit_values
# returning
evol = Profile(x_prof, y_prof, mask=mask, unit_x=unit_x,
unit_y=unit_y)
return evol
[docs] def get_evolution_on_tsf(self, TSF, axe_x=None):
"""
Return the evolution of the value represented by scalar fields, on
the path of the trajectory.
Timse of the TSF must be consistent with the times of the Points.
Parameters
----------
TSF : tsf.TemporalScalarField object
axe_x : string, optional
What put in the x axis (can be 'x', 'y', 'v').
default is 'v' (associated with time)
Returns
-------
evol : Profile object
"""
# check parameters
from . import temporalscalarfields as tsf
from .profile import Profile
if not isinstance(TSF, tsf.TemporalScalarFields):
raise TypeError()
if len(self.xy) == 0:
return Profile()
if axe_x is None:
axe_x = 'v'
if not isinstance(axe_x, STRINGTYPES):
raise TypeError()
# get x values
if axe_x == 'v':
if len(self.v) == 0:
raise ValueError()
x_prof = self.v
unit_x = self.unit_v
elif axe_x == 'x':
x_prof = self.xy[:, 0]
unit_x = self.unit_x
elif axe_x == 'y':
x_prof = self.xy[:, 1]
unit_x = self.unit_y
else:
raise ValueError()
# get the y value
times = self.v
y_prof = np.empty((len(self.xy)), dtype=float)
for i, pt in enumerate(self.xy):
time = times[i]
SF = TSF.fields[TSF.times == time][0]
y_prof[i] = SF.get_value(*pt, ind=False, unit=False)
mask = np.isnan(y_prof)
unit_y = TSF.unit_values
# returning
evol = Profile(x_prof, y_prof, mask=mask, unit_x=unit_x,
unit_y=unit_y)
return evol
[docs] def fit(self, kind='polynomial', order=2, simplify=False):
"""
Return the parametric coefficients of the fitting curve on the points.
Parameters
----------
kind : string, optional
The kind of fitting used. Can be 'polynomial' or 'ellipse'.
order : integer
Approximation order for the fitting.
Simplify : boolean or string, optional
Can be False (default), 'x' or 'y'. Perform a simplification
(see Points.Siplify()) before the fitting.
Returns
-------
p : array, only for polynomial fitting
Polynomial coefficients, highest power first
radii : array, only for ellipse fitting
Ellipse demi-axes radii.
center : array, only for ellipse fitting
Ellipse center coordinates.
alpha : number
Angle between the x axis and the major axis.
"""
if not isinstance(order, int):
raise TypeError("'order' must be an integer")
if not isinstance(kind, STRINGTYPES):
raise TypeError("'kind' must be a string")
if not simplify:
xytmp = self.xy
elif simplify == 'x':
xytmp = self.simplify(axe=0).xy
elif simplify == 'y':
xytmp = self.simplify(axe=1).xy
if kind == 'polynomial':
p = np.polyfit(xytmp[:, 0], xytmp[:, 1], deg=order)
return p
elif kind == 'ellipse':
import fit_ellipse as fte
res = fte.fit_ellipse(xytmp)
radii, center, alpha = fte.get_parameters(res)
return radii, center, alpha
[docs] def add(self, pt, v=None):
"""
Add a new point.
Parameters
----------
pt : 2x1 array of numbers
Point to add.
v : number, optional
Value of the point (needed if other points have values).
"""
# check parameters
if not isinstance(pt, ARRAYTYPES):
raise TypeError()
pt = np.array(pt, subok=True)
if not pt.shape == (2,):
raise ValueError()
if not isinstance(pt[0], NUMBERTYPES):
raise TypeError()
if v is not None:
if not isinstance(v, NUMBERTYPES):
raise TypeError()
# store new data
self.__xy = np.append(self.xy, [pt], axis=0)
if v is None and self.v.shape[0] != 0:
raise ValueError('You should specify an associated value : v')
if v is not None:
self.__v = np.append(self.v, v)
[docs] def remove(self, ind):
"""
Remove the point number 'ind' of the points cloud.
In place.
Parameters
----------
ind : integer or array of integer
"""
if isinstance(ind, INTEGERTYPES):
ind = [ind]
elif isinstance(ind, ARRAYTYPES):
if not np.all([isinstance(val, int) for val in ind]):
raise TypeError("'ind' must be an integer or an array of"
" integer")
ind = np.array(ind)
else:
raise TypeError("'ind' must be an integer or an array of integer")
tmp_v = self.v.copy()
self.xy = np.delete(self.xy, ind, axis=0)
if len(self.v) == len(self.xy) + 1:
self.v = np.delete(tmp_v, ind, axis=0)
[docs] def change_unit(self, axe, new_unit):
"""
Change the unit of an axe.
Parameters
----------
axe : string
'y' for changing the profile y axis unit
'x' for changing the profile x axis unit
'v' for changing the profile values unit
new_unit : Unum.unit object or string
The new unit.
"""
if isinstance(new_unit, STRINGTYPES):
new_unit = make_unit(new_unit)
if not isinstance(new_unit, unum.Unum):
raise TypeError()
if not isinstance(axe, STRINGTYPES):
raise TypeError()
if axe == 'x':
old_unit = self.unit_x
new_unit = old_unit.asUnit(new_unit)
fact = new_unit.asNumber()
self.xy[:, 0] *= fact
self.unit_x = new_unit/fact
elif axe == 'y':
old_unit = self.unit_y
new_unit = old_unit.asUnit(new_unit)
fact = new_unit.asNumber()
self.xy[:, 1] *= fact
self.unit_y = new_unit/fact
elif axe == 'v':
old_unit = self.unit_v
new_unit = old_unit.asUnit(new_unit)
fact = new_unit.asNumber()
self.v *= fact
self.unit_v = new_unit/fact
else:
raise ValueError()
[docs] def set_origin(self, x=None, y=None):
"""
Set the given point (x, y) as the new referential.
Parameters
----------
x: number
.
y: number
.
"""
# Check
if x is not None:
if not isinstance(x, NUMBERTYPES):
raise TypeError()
self.xy[:, 0] -= x
if y is not None:
if not isinstance(y, NUMBERTYPES):
raise TypeError()
self.xy[:, 1] -= y
[docs] def crop(self, intervx=None, intervy=None, intervv=None, inplace=True,
ind=False):
"""
Crop the points cloud.
Parameters
----------
intervx : 2x1 tuple
Interval on x axis
intervy : 2x1 tuple
Interval on y axis
intervv : 2x1 tuple
Interval on v values
Returns
-------
tmp_pts : Points object
croped version of the point cloud.
"""
#
if inplace:
tmp_pts = self
else:
tmp_pts = self.copy()
# Getting cropping mask
if ind:
if intervx is not None or intervy is not None:
raise ValueError()
mask = np.ones(len(self.xy), dtype=bool)
mask[intervv[0]:intervv[1]] = False
else:
mask = np.zeros(len(self.xy), dtype=bool)
if intervx is not None:
out_zone = np.logical_or(self.xy[:, 0] < intervx[0],
self.xy[:, 0] > intervx[1])
mask = np.logical_or(mask, out_zone)
if intervy is not None:
out_zone = np.logical_or(self.xy[:, 1] < intervy[0],
self.xy[:, 1] > intervy[1])
mask = np.logical_or(mask, out_zone)
if intervv is not None and len(self.v) != 0:
out_zone = np.logical_or(self.v < intervv[0],
self.v > intervv[1])
mask = np.logical_or(mask, out_zone)
# Cropping values
tmp_pts.__xy = tmp_pts.xy[~mask, :]
if len(tmp_pts.v) != 0:
tmp_pts.__v = tmp_pts.v[~mask]
# returning
if not inplace:
return tmp_pts
[docs] def cut(self, intervx=None, intervy=None):
"""
Return a point cloud where the given area has been removed.
Parameters
----------
intervx : 2x1 tuple
Interval on x axis
intervy : 2x1 tuple
Interval on y axis
Returns
-------
tmp_pts : Points object
Cutted version of the point cloud.
"""
tmp_pts = self.copy()
mask = np.ones(len(self.xy))
if intervx is not None:
out_zone = np.logical_and(self.xy[:, 0] > intervx[0],
self.xy[:, 0] < intervx[1])
mask = np.logical_and(mask, out_zone)
if intervy is not None:
out_zone = np.logical_and(self.xy[:, 1] > intervy[0],
self.xy[:, 1] < intervy[1])
mask = np.logical_and(mask, out_zone)
tmp_pts.xy = tmp_pts.xy[~mask, :]
if len(tmp_pts.v) != 0:
tmp_pts.v = tmp_pts.v[~mask]
return tmp_pts
[docs] def scale(self, scalex=1., scaley=1., scalev=1., inplace=False):
"""
Change the scale of the axis.
Parameters
----------
scalex, scaley, scalev : numbers or Unum objects
scales along x, y and v
inplace : boolean, optional
If 'True', scaling is done in place, else, a new instance is
returned.
"""
# check params
if not isinstance(scalex, NUMBERTYPES + (unum.Unum, )):
raise TypeError()
if not isinstance(scaley, NUMBERTYPES + (unum.Unum, )):
raise TypeError()
if not isinstance(scalev, NUMBERTYPES + (unum.Unum, )):
raise TypeError()
if not isinstance(inplace, bool):
raise TypeError()
if inplace:
tmp_pt = self
else:
tmp_pt = self.copy()
# adapt unit
if isinstance(scalex, unum.Unum):
new_unit = scalex*tmp_pt.unit_x
fact = new_unit.asNumber()
new_unit /= fact
tmp_pt.unit_x = new_unit
scalex = fact
if isinstance(scaley, unum.Unum):
new_unit = scaley*tmp_pt.unit_y
fact = new_unit.asNumber()
new_unit /= fact
tmp_pt.unit_y = new_unit
scaley = fact
if isinstance(scalev, unum.Unum):
new_unit = scalev*tmp_pt.unit_v
fact = new_unit.asNumber()
new_unit /= fact
tmp_pt.unit_v = new_unit
scalev = fact
# loop
if scalex != 1. or scaley != 1.:
tmp_pt.xy *= np.array([scalex, scaley])
if scalev != 1.:
tmp_pt.v *= scalev
# returning
if not inplace:
return tmp_pt
[docs] def rotate(self, angle, inplace=False):
"""
Rotate the point set.
Parameters
----------
angle: number
Rotation angle in radian.
"""
# Checks
if inplace:
tmpp = self
else:
tmpp = self.copy()
#
x, y = self.xy[:, 0], self.xy[:, 1]
new_x = x*np.cos(angle) - y*np.sin(angle)
new_y = y*np.cos(angle) + x*np.sin(angle)
tmpp.xy[:, 0] = new_x
tmpp.xy[:, 1] = new_y
# Return
return tmpp
[docs] def remove_doublons(self, method='average', inplace=False, eps_rel=1e-6):
"""
Replace values associated to the same 'v' by their average.
Parameters
----------
method : string in {'average', 'max', 'min'}
Method used to remove the doublons.
"""
if len(self.v) == 0:
raise Exception()
if inplace:
tmp_pt = self
else:
tmp_pt = self.copy()
vs = tmp_pt.v
ord_magn = (np.sum(vs**2)/len(vs))**.5
nmb_dec = -int(round(np.log10(ord_magn*eps_rel)))
tmp_vs = np.round(vs, decimals=nmb_dec)
new_vs = np.sort(list(set(tmp_vs)))
if method == 'average':
new_xy = [np.mean(tmp_pt.xy[tmp_vs == vi], axis=0)
for vi in new_vs]
elif method == 'min':
new_xy = [np.min(tmp_pt.xy[tmp_vs == vi], axis=0)
for vi in new_vs]
elif method == 'max':
new_xy = [np.max(tmp_pt.xy[tmp_vs == vi], axis=0)
for vi in new_vs]
else:
raise ValueError()
tmp_pt.xy = new_xy
tmp_pt.v = new_vs
# returning
if not inplace:
return tmp_pt
[docs] def reverse(self):
"""
Return a Points object where x and y axis are swaped.
"""
tmp_pt = self.copy()
xy_tmp = tmp_pt.xy*0
xy_tmp[:, 0] = tmp_pt.xy[:, 1]
xy_tmp[:, 1] = tmp_pt.xy[:, 0]
tmp_pt.xy = xy_tmp
return tmp_pt
[docs] def decompose(self):
"""
return a tuple of Points object, with only one point per object.
"""
if len(self) == 1:
return [self]
if len(self) != len(self.v):
raise Exception()
pts_tupl = []
for i in np.arange(len(self)):
pts_tupl.append(Points([self.xy[i]], [self.v[i]], self.unit_x,
self.unit_y, self.unit_v, self.name))
return pts_tupl
[docs] def sort(self, ref='x', inplace=False):
"""
Sort the points according to the reference.
Parameters
----------
ref : string or array of indice
can be 'x', 'y' or 'v' to sort according those values or an
array of indice
inplace ; boolean
If 'True', sort in place, else, return an new sorted instance.
"""
# check parameters
if isinstance(ref, STRINGTYPES):
if ref not in ['x', 'y', 'v']:
raise ValueError
elif isinstance(ref, ARRAYTYPES):
ref = np.array(ref, dtype=int)
if len(ref) != len(self.xy):
raise ValueError()
else:
raise TypeError()
if not isinstance(inplace, bool):
raise TypeError()
# get order
if ref == 'x':
order = np.argsort(self.xy[:, 0])
elif ref == 'y':
order = np.argsort(self.xy[:, 1])
elif ref == 'v':
if len(self.v) == 0:
raise ValueError()
order = np.argsort(self.v)
else:
order = ref
# reordering
if inplace:
tmp_pt = self
else:
tmp_pt = self.copy()
tmp_pt.xy = tmp_pt.xy[order]
if len(tmp_pt.v) == len(tmp_pt.xy):
tmp_pt.v = tmp_pt.v[order]
# returning
if not inplace:
return tmp_pt
[docs] def order_on_line(self, inplace=False):
"""
Re-order the set of points to try to create a line.
"""
if inplace:
tmp_pts = self
else:
tmp_pts = self.copy()
#
from sklearn.neighbors import NearestNeighbors
clf = NearestNeighbors(2).fit(self.xy)
G = clf.kneighbors_graph()
import networkx as nx
T = nx.from_scipy_sparse_matrix(G)
order = list(nx.dfs_preorder_nodes(T, 0))
paths = [list(nx.dfs_preorder_nodes(T, i)) for i in range(len(self.xy))]
mindist = np.inf
minidx = 0
for i in range(len(self.xy)):
p = paths[i] # order of nodes
ordered = self.xy[p] # ordered nodes
# find cost of that order by the sum of euclidean distances between points (i) and (i+1)
cost = (((ordered[:-1] - ordered[1:])**2).sum(1)).sum()
if cost < mindist:
mindist = cost
minidx = i
opt_order = paths[minidx]
new_x = self.xy[:, 0][opt_order]
new_y = self.xy[:, 1][opt_order]
# return
tmp_pts.xy = np.array(list(zip(new_x, new_y)))
return tmp_pts
[docs] def remove_nans(self, inplace=False):
"""
Remove the points containing nans values.
"""
if inplace:
tmp_pts = self
else:
tmp_pts = self.copy()
# get indices to remove
inds = np.logical_or(np.isnan(tmp_pts.xy[:, 0]),
np.isnan(tmp_pts.xy[:, 1]))
inds = np.logical_or(inds, np.isnan(tmp_pts.v))
# remove
for ind in np.where(inds)[0][::-1]:
tmp_pts.remove(ind)
# return
if not inplace:
return tmp_pts
[docs] def augment_resolution(self, fact=2, interp='linear', inplace=False):
"""
Augment the temporal resolution of the points.
Only have sense if points are sorted to set some kind of trajectory.
Parameters
----------
fact : integer
Resolution augmentation needed (default is '2', for a result
profile with twice more points)
interp : string in ['linear', 'nearest', slinear', 'quadratic, 'cubic']
Specifies the kind of interpolation as a string
(Default is 'linear'). slinear', 'quadratic' and 'cubic' refer
to a spline interpolation of first, second or third order.
inplace bool
.
Note
----
If masked values are present, they are interpolated as well, using the
surrounding values.
"""
if inplace:
tmp_pts = self
else:
tmp_pts = self.copy()
is_v = len(self.v) == len(self.xy)
# get and interpolate
from .profile import Profile
tmp_x = Profile(list(range(len(self.xy))), self.xy[:, 0])
tmp_x.augment_resolution(fact=fact, interp=interp, inplace=True)
tmp_y = Profile(list(range(len(self.xy))), self.xy[:, 1])
tmp_y.augment_resolution(fact=fact, interp=interp, inplace=True)
if is_v:
tmp_v = Profile(list(range(len(self.xy))), self.v)
tmp_v.augment_resolution(fact=fact, interp=interp, inplace=True)
# store
tmp_pts.xy = list(zip(tmp_x.y, tmp_y.y))
if is_v:
tmp_pts.v = tmp_v.y
if not inplace:
return tmp_pts
[docs] def smooth(self, tos='gaussian', size=None, inplace=False, **kw):
"""
Return a smoothed points field.
Parameters
----------
tos : string, optional
Type of smoothing, can be 'uniform' (default), 'gaussian'
or 'lowpass'.
size : number, optional
radius of the smoothing for 'uniform',
radius of the smoothing for 'gaussian',
cut off frequency for 'lowpass'
Default are 3 for 'uniform', 1 for 'gaussian' and 0.1 for '
lowpass'.
inplace : boolean
If 'False', return a smoothed points field
else, smooth in place.
kw : dic
Additional parameters for ndimage methods
(See ndimage documentation)
"""
if not isinstance(tos, STRINGTYPES):
raise TypeError("'tos' must be a string")
if size is None and tos == 'uniform':
size = 3
elif size is None and tos == 'gaussian':
size = 1
elif size is None and tos == 'lowpass':
size = 0.1
# default smoothing border mode to 'nearest'
if tos in ['uniform', 'gaussian'] and 'mode' not in list(kw.keys()):
kw.update({'mode': 'nearest'})
# getting data
if not inplace:
tmp_pts = self.copy()
y = self.xy[:, 1]
x = self.xy[:, 0]
if len(self.v) != 0:
v = self.v
# smoothing
if tos == "uniform":
x = ndimage.uniform_filter(x, size, **kw)
y = ndimage.uniform_filter(y, size, **kw)
if len(self.v) != 0:
v = ndimage.uniform_filter(v, size, **kw)
elif tos == "gaussian":
x = ndimage.gaussian_filter(x, size, **kw)
y = ndimage.gaussian_filter(y, size, **kw)
if len(self.v) != 0:
v = ndimage.gaussian_filter(v, size, **kw)
elif tos == 'lowpass':
from scipy import signal
x = self.xy[:, 0]
y = self.xy[:, 1]
N = 2
Wn = size
B, A = signal.butter(N, Wn, output='ba')
x = signal.filtfilt(B, A, x)
y = signal.filtfilt(B, A, y)
if len(self.v) != 0:
v = signal.filtfilt(B, A, v)
else:
raise ValueError("'tos' must be 'uniform', 'gaussian' or "
"'lowpass'")
# storing
if inplace:
self.xy[:, 0] = x
self.xy[:, 1] = y
if len(self.v) != 0:
self.v = v
else:
tmp_pts.xy[:, 0] = x
tmp_pts.xy[:, 1] = y
if len(tmp_pts.v) != 0:
tmp_pts.v = v
return tmp_pts
def _display(self, kind=None, axe_x=None, axe_y=None, axe_color=None,
**plotargs):
if len(self.xy) == 0:
return None
# x values
if axe_x == 'x' or axe_x is None:
x_values = self.xy[:, 0]
elif axe_x == 'y':
x_values = self.xy[:, 1]
elif axe_x == 'v':
if len(self.v) != len(self.xy):
raise Exception()
x_values = self.v
else:
raise ValueError()
# y values
if axe_y == 'x':
y_values = self.xy[:, 0]
elif axe_y == 'y' or axe_y is None:
y_values = self.xy[:, 1]
elif axe_y == 'v':
if len(self.v) != len(self.xy):
raise Exception()
y_values = self.v
else:
raise ValueError()
# color values
if axe_color == 'x':
color_values = self.xy[:, 0]
elif axe_color == 'y':
color_values = self.xy[:, 1]
elif axe_color in [None, 'v']:
if len(self.v) != len(self.xy):
color_values = None
if ('c' not in plotargs.keys()
and 'color' not in plotargs.keys()):
plotargs['color'] = 'k'
color_values = self.v
else:
raise ValueError()
# check c and color
if 'c' in plotargs.keys():
plotargs['color'] = None
if 'color' in plotargs.keys():
plotargs['c'] = None
# display
dp = pplt.Displayer(x=x_values, y=y_values, values=color_values,
kind=kind, **plotargs)
ax = dp.draw()
return ax
[docs] def display(self, kind=None, axe_x=None, axe_y=None, axe_color=None,
**plotargs):
"""
Display the set of points.
Parameters
----------
kind : string, optional
Can be 'plot' (default if points have not values).
or 'scatter' (default if points have values).
or 'colored_plot'.
axe_x, axe_y, axe_color : strings in ['x', 'y', 'v']
To determine wich value has to be plotted along which axis, and
whith value is used to color the scattered points.
Default plot 'y' to 'x' with colors from 'v'.
**plotargs : dic
Additionnal arguments sent to 'plot' or 'scatter'
"""
# default values
if axe_x is None:
if axe_y != 'x':
axe_x = 'x'
else:
axe_x = 'y'
if axe_y is None:
if axe_x != 'y':
axe_y = 'y'
else:
axe_y = 'x'
if axe_color is None:
axes = ['x', 'y', 'v']
try:
axes.remove(axe_x)
axes.remove(axe_y)
except ValueError:
axes = ['v']
axe_color = axes[0]
# display the values
plot = self._display(kind, axe_x=axe_x, axe_y=axe_y,
axe_color=axe_color, **plotargs)
if len(self.v) != 0 and kind is not 'plot':
cb = plt.colorbar(plot)
cb.set_label(self.unit_v.strUnit())
# cb label
if axe_color == 'x':
cb.set_label('X ' + self.unit_x.strUnit())
elif axe_color == 'y':
cb.set_label('Y ' + self.unit_y.strUnit())
else:
cb.set_label('V ' + self.unit_v.strUnit())
# x axis label
if axe_x == 'x':
plt.xlabel('X ' + self.unit_x.strUnit())
elif axe_x == 'y':
plt.xlabel('Y ' + self.unit_y.strUnit())
else:
plt.xlabel('V ' + self.unit_v.strUnit())
# y axis label
if axe_y == 'x':
plt.ylabel('X ' + self.unit_x.strUnit())
elif axe_y == 'y':
plt.ylabel('Y ' + self.unit_y.strUnit())
else:
plt.ylabel('V ' + self.unit_v.strUnit())
if self.name is None:
plt.title('Set of points')
else:
plt.title(self.name)
plt.axis('equal')
return plot
[docs] def display3D(self, kind='plot', xlabel='', ylabel='', zlabel='',
title='', **plotargs):
"""
Display the points on a 3D graph.
Parameters
----------
kind : string, optional
Kind of graph to use, can be 'plot' or 'surf'.
xlabel, ylabel, zlabel : string, optional
Label fo each axis (respectively 'x', 'y', and 'v')
title : strin, optional
Title
**plotargs :
Additional parameters feeded to matplotlib
"""
# create 3D plot
ax = plt.gca(projection='3d')
# display data
if kind == 'plot':
ax.plot(self.xy[:, 0], self.xy[:, 1], self.v, **plotargs)
elif kind == 'surf':
ax.plot_trisurf(self.xy[:, 0], self.xy[:, 1], self.v)
else:
raise ValueError()
# labels
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_zlabel(zlabel)
# title
ax.set_title(title)
#
plt.tight_layout()
return ax
[docs] def export_to_profile(self, axe_x='x', axe_y='y'):
"""
Export the unsorted point object to a sorted Profile object.
Parameters
----------
axe_x, axe_y : strings in ['x', 'y', 'v']
Which value used to construct the profile
"""
# check
if axe_x not in ['x', 'y', 'v']:
raise ValueError()
if axe_y not in ['x', 'y', 'v']:
raise ValueError()
# get data
if axe_x == 'x':
x = self.xy[:, 0]
unit_x = self.unit_x
elif axe_x == 'y':
x = self.xy[:, 1]
unit_x = self.unit_y
else:
x = self.v
unit_x = self.unit_v
if axe_y == 'x':
y = self.xy[:, 0]
unit_y = self.unit_x
elif axe_y == 'y':
y = self.xy[:, 1]
unit_y = self.unit_y
else:
y = self.v
unit_y = self.unit_v
# construct profile
from .profile import Profile
prof = Profile(x=x, y=y, mask=False, unit_x=unit_x, unit_y=unit_y)
return prof