Source code for IMTreatment.core.spatialfields

# -*- coding: utf-8 -*-
#!/bin/env python3

# Copyright (C) 2003-2007 Gaby Launay

# Author: Gaby Launay  <>
# URL:
# Version: 1.0

# This file is part of IMTreatment.

# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.

# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.

import matplotlib.pyplot as plt
import numpy as np
import unum
import copy
from ..utils import make_unit
from . import fields as flds
from . import scalarfield as sf
from . import vectorfield as vf
from . import profile as prof

[docs]class SpatialFields(flds.Fields): """ """ def __init__(self): self.unit_x = make_unit('') self.unit_y = make_unit('') self.unit_values = make_unit('') self.fields_type = None def __neg__(self): tmp_tf = self.copy() for i in np.arange(len(self.fields)): tmp_tf.fields[i] = -tmp_tf.fields[i] return tmp_tf def __mul__(self, other): if isinstance(other, (NUMBERTYPES, unum.Unum)): final_vfs = self.__class__() for field in self.fields: final_vfs.add_field(field*other) return final_vfs else: raise TypeError("You can only multiply a temporal velocity field " "by numbers") __rmul__ = __mul__ def __truediv__(self, other): if isinstance(other, (NUMBERTYPES, unum.Unum)): final_vfs = self.__class__.__init__() for field in self.fields: final_vfs.add_field(field/other) return final_vfs else: raise TypeError("You can only divide a temporal velocity field " "by numbers") __div__ = __truediv__ def __pow__(self, number): if not isinstance(number, NUMBERTYPES): raise TypeError("You only can use a number for the power " "on a Vectorfield") final_vfs = self.__class__() for field in self.fields: final_vfs.add_field(np.power(field, number)) return final_vfs def __iter__(self): for i in np.arange(len(self.fields)): yield self.times[i], self.fields[i] @property def mask(self): dim = (len(self.fields), self.shape[0], self.shape[1]) mask_f = np.empty(dim, dtype=bool) for i, field in enumerate(self.fields): mask_f[i, :, :] = field.mask[:, :] return mask_f @property def mask_as_sf(self): dim = len(self.fields) mask_f = np.empty(dim, dtype=object) for i, field in enumerate(self.fields): mask_f[i] = field.mask_as_sf return mask_f @property def unit_x(self): return self.__unit_x @unit_x.setter def unit_x(self, unit): self.__unit_x = unit for field in self.fields: field.unit_x = unit @property def unit_y(self): return self.__unit_y @unit_y.setter def unit_y(self, unit): self.__unit_y = unit for field in self.fields: field.unit_y = unit @property def unit_values(self): return self._SpatialFields__unit_values @unit_values.setter def unit_values(self, unit): self.__unit_values = unit for field in self.fields: field.unit_values = unit @property def x_min(self): return np.min([field.axe_x[0] for field in self.fields]) @property def x_max(self): return np.max([field.axe_x[-1] for field in self.fields]) @property def y_min(self): return np.min([field.axe_y[0] for field in self.fields]) @property def y_max(self): return np.max([field.axe_y[-1] for field in self.fields])
[docs] def add_field(self, field, copy=True): """ """ # check if not isinstance(field, self.fields_type): raise TypeError() # first field if len(self.fields) == 0: self.unit_x = field.unit_x self.unit_y = field.unit_y self.unit_values = field.unit_values # other ones else: try: field.change_unit('x', self.unit_x) field.change_unit('y', self.unit_y) field.change_unit('values', self.unit_values) except unum.IncompatibleUnitsError: raise ValueError("Inconsistent unit system") # crop fields field.crop_masked_border(hard=False, inplace=True) # add field flds.Fields.add_field(self, field, copy=copy)
[docs] def get_value(self, x, y, unit=False, error=True): """ Return the field component(s) on the point (x, y). Parameters ---------- x, y : number Point coordinates unit : boolean, optional If 'True', component(s) is(are) returned with its unit. error : boolean, optional If 'True', raise an error if the asked point is outside the fields. If 'False', return 'None' """ # get interesting fields inter_ind = [] for i, field in enumerate(self.fields): if (x > field.axe_x[0] and x < field.axe_x[-1] and y > field.axe_y[0] and y < field.axe_y[-1]): inter_ind.append(i) # get values (mean over fields if necessary) if len(inter_ind) == 0: if error: raise ValueError("coordinates outside the fields") else: return None elif len(inter_ind) == 1: values = self.fields[inter_ind[0]].get_value(x, y, ind=False, unit=False) else: values = self.fields[inter_ind[0]].get_value(x, y, ind=False, unit=False) for field in self.fields[inter_ind][1::]: values += field.get_value(x, y, ind=False, unit=False) values /= len(inter_ind) return values
[docs] def get_values_on_grid(self, axe_x, axe_y): """ Return a all the fields in a single evenly-spaced grid. (Use interpolation to get the data on the grid points) Parameters ---------- axe_x, axe_y : arrays of ndim 1 Representing the grid axis. """ # check if not isinstance(axe_x, ARRAYTYPES): raise TypeError() if not isinstance(axe_y, ARRAYTYPES): raise TypeError() axe_x = np.array(axe_x) axe_y = np.array(axe_y) if isinstance(self.fields[0], sf.ScalarField): values = np.zeros(shape=(len(axe_x), len(axe_y)), dtype=float) mask = np.zeros(shape=(len(axe_x), len(axe_y)), dtype=bool) for i, x in enumerate(axe_x): for j, y in enumerate(axe_y): val = self.get_value(x, y, unit=False, error=False) if val is None: mask[i, j] = True else: values[i, j] = val tmp_f = sf.ScalarField tmp_f.import_from_arrays(axe_x, axe_y, values, mask=mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) elif isinstance(self.fields[0], vf.VectorField): Vx = np.zeros(shape=(len(axe_x), len(axe_y)), dtype=float) Vy = np.zeros(shape=(len(axe_x), len(axe_y)), dtype=float) mask = np.zeros(shape=(len(axe_x), len(axe_y)), dtype=bool) for i, x in enumerate(axe_x): for j, y in enumerate(axe_y): val = self.get_value(x, y, unit=False, error=False) if val is None: mask[i, j] = True else: Vx[i, j] = val[0] Vy[i, j] = val[1] tmp_f = vf.VectorField() tmp_f.import_from_arrays(axe_x, axe_y, Vx, Vy, mask=mask, unit_x=self.unit_x, unit_y=self.unit_y, unit_values=self.unit_values) else: raise Exception() return tmp_f
[docs] def get_profile(self, direction, position, component=None): """ Return a profile of the current fields. Parameters ---------- direction : integer Direction along which we choose a position (1 for x and 2 for y) position : float, interval of float Position, interval in which we want a profile component : string Component wanted for the profile. Returns ------- profile : Profile object Wanted profile """ # getting data if self.fields_type == vf.VectorField: if component is None: component = 'magnitude' else: try: comp = self.__getattribute__("{}_as_sf".format(component)) except AttributeError: raise ValueError() elif self.fields_type == sf.ScalarField: if component is None: component = "values" try: comp = self.__getattribute__("{}_as_sf".format(component)) except AttributeError: raise ValueError() # get fields of interest inter_ind = [] if direction == 1: for i, field in enumerate(self.fields): if np.any(position < field.axe_x[-1]) and \ np.any(position > field.axe_x[0]): inter_ind.append(i) elif direction == 2: for i, field in enumerate(self.fields): if np.any(position < field.axe_y[-1]) and \ np.any(position > field.axe_y[0]): inter_ind.append(i) # get profiles if len(inter_ind) == 0: return None elif len(inter_ind) == 1: return comp[inter_ind[0]].get_profile(direction=direction, position=position, ind=False)[0] else: # get profiles x = np.array([]) y = np.array([]) for ind in inter_ind: tmp_comp = comp[ind] tmp_prof = tmp_comp.get_profile(direction=direction, position=position, ind=False)[0] x = np.concatenate((x, tmp_prof.x)) y = np.concatenate((y, tmp_prof.y)) # recreate profile object ind_sort = np.argsort(x) x = x[ind_sort] y = y[ind_sort] fin_prof = prof.Profile(x, y, mask=False, unit_x=tmp_prof.unit_x, unit_y=tmp_prof.unit_y) return fin_prof
def _display(self, compo=None, **plotargs): """ """ # check params if len(self.fields) == 0: raise Exception() # getting data if self.fields_type == vf.VectorField: if compo == 'V' or compo is None: comp = self.fields else: try: comp = self.__getattribute__("{}_as_sf".format(compo)) except AttributeError: raise ValueError() elif self.fields_type == sf.ScalarField: if compo is None: compo = "values" try: comp = self.__getattribute__("{}_as_sf".format(compo)) except AttributeError: raise ValueError() else: raise TypeError() # getting max and min v_min = np.min([field.min for field in comp]) v_max = np.max([field.max for field in comp]) if 'vmin' in list(plotargs.keys()): v_min = plotargs.pop('vmin') if 'vmax' in list(plotargs.keys()): v_max = plotargs.pop('vmax') norm = plt.Normalize(v_min, v_max) if 'norm' not in list(plotargs.keys()): plotargs['norm'] = norm # setting default kind of display if 'kind' not in list(plotargs.keys()): plotargs['kind'] = None if plotargs['kind'] == 'stream': if 'density' not in list(plotargs.keys()): plotargs['density'] = 1. plotargs['density'] = plotargs['density']/(len(self.fields))**.5 # display for field in comp: field._display(**plotargs) plt.xlim(self.x_min, self.x_max) plt.ylim(self.y_min, self.y_max)
[docs] def display(self, compo=None, **plotargs): self._display(compo=compo, **plotargs) plt.axis('image') cb = plt.colorbar() cb.set_label(self.unit_values.strUnit()) plt.tight_layout()
[docs] def copy(self): return copy.deepcopy(self)