# -*- coding: utf-8 -*-
#!/bin/env python3
# Copyright (C) 2003-2007 Gaby Launay
# Author: Gaby Launay <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0
# This file is part of IMTreatment.
# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
import scipy.interpolate as spinterp
from ..utils.types import ARRAYTYPES, INTEGERTYPES, NUMBERTYPES, STRINGTYPES
from ..utils import make_unit
from . import scalarfield as sf
from . import profile as prof
from . import temporalscalarfields as tsf
from . import temporalfields as tf
[docs]class TemporalVectorFields(tf.TemporalFields):
"""
Class representing a set of time-evolving velocity fields.
"""
@property
def Vx_as_sf(self):
values = tsf.TemporalScalarFields()
for i, field in enumerate(self.fields):
values.add_field(field.comp_x_as_sf, time=self.times[i],
unit_times=self.unit_times)
return values
@property
def Vx(self):
dim = (len(self), self.shape[0], self.shape[1])
values = np.empty(dim, dtype=float)
for i, field in enumerate(self.fields):
values[i, :, :] = field.comp_x[:, :]
return values
@property
def Vy_as_sf(self):
values = tsf.TemporalScalarFields()
for i, field in enumerate(self.fields):
values.add_field(field.comp_y_as_sf, time=self.times[i],
unit_times=self.unit_times)
return values
@property
def Vy(self):
dim = (len(self), self.shape[0], self.shape[1])
values = np.empty(dim, dtype=float)
for i, field in enumerate(self.fields):
values[i, :, :] = field.comp_y[:, :]
return values
@property
def magnitude_as_sf(self):
values = tsf.TemporalScalarFields()
for i, field in enumerate(self.fields):
values.add_field(field.magnitude_as_sf, time=self.times[i],
unit_times=self.unit_times)
return values
@property
def magnitude(self):
dim = (len(self), self.shape[0], self.shape[1])
values = np.empty(dim, dtype=float)
for i, field in enumerate(self.fields):
values[i, :, :] = field.magnitude[:, :]
return values
@property
def theta_as_sf(self):
values = tsf.TemporalScalarFields()
for i, field in enumerate(self.fields):
values.add_field(field.theta_as_sf, time=self.times[i],
unit_times=self.unit_times)
return values
@property
def theta(self):
dim = (len(self), self.shape[0], self.shape[1])
values = np.empty(dim, dtype=float)
for i, field in enumerate(self.fields):
values[i, :, :] = field.theta[:, :]
return values
[docs] def get_time_auto_correlation(self):
"""
Return auto correlation based on Vx and Vy.
"""
Vx0 = self.fields[0].comp_x
Vy0 = self.fields[0].comp_y
norm = np.mean(Vx0*Vx0 + Vy0*Vy0)
corr = np.zeros((len(self.times),))
for i, time in enumerate(self.times):
Vxi = self.fields[i].comp_x
Vyi = self.fields[i].comp_y
corr[i] = np.mean(Vx0*Vxi + Vy0*Vyi)/norm
return prof.Profile(self.times, corr, mask=False,
unit_x=self.unit_times,
unit_y=make_unit(''))
[docs] def get_mean_kinetic_energy(self):
"""
Calculate the mean kinetic energy.
"""
final_sf = sf.ScalarField()
mean_vf = self.get_mean_field()
values_x = mean_vf.comp_x_as_sf
values_y = mean_vf.comp_y_as_sf
final_sf = 1./2*(values_x**2 + values_y**2)
return final_sf
[docs] def get_tke(self):
"""
Calculate the turbulent kinetic energy.
"""
mean_field = self.get_mean_field()
mean_x = mean_field.comp_x_as_sf
mean_y = mean_field.comp_y_as_sf
del mean_field
tke = tsf.TemporalScalarFields()
for i in np.arange(len(self.fields)):
comp_x = self.fields[i].comp_x_as_sf - mean_x
comp_y = self.fields[i].comp_y_as_sf - mean_y
tke.add_field(1./2*(comp_x**2 + comp_y**2),
time=self.times[i],
unit_times=self.unit_times)
return tke
[docs] def get_turbulent_intensity(self):
"""
Calculate the turbulent intensity.
TI = sqrt(2/3*k)/sqrt(Vx**2 + Vy**2)
"""
turb_int = (2./3.*self.get_tke())**(.5)/self.magnitude_as_sf
return turb_int
[docs] def get_mean_tke(self):
tke = self.get_tke()
mean_tke = tke.get_mean_field()
return mean_tke
[docs] def get_reynolds_stress(self, nmb_val_min=1):
"""
Calculate the reynolds stress.
Returns
-------
Re_xx, Re_yy, Re_xy : ScalarField objects
Reynolds shear stress
"""
# getting fluctuating velocities
turb_vf = self.get_fluctuant_fields()
u_p = turb_vf.Vx
v_p = turb_vf.Vy
mask = turb_vf.mask
# rs_xx
rs_xx = np.zeros(self.shape, dtype=float)
rs_yy = np.zeros(self.shape, dtype=float)
rs_xy = np.zeros(self.shape, dtype=float)
mask_rs = np.zeros(self.shape, dtype=bool)
# boucle sur les points du champ
for i in np.arange(self.shape[0]):
for j in np.arange(self.shape[1]):
# boucle sur le nombre de champs
nmb_val = 0
for n in np.arange(len(turb_vf.fields)):
# check if masked
if not mask[n][i, j]:
rs_yy[i, j] += v_p[n][i, j]**2
rs_xx[i, j] += u_p[n][i, j]**2
rs_xy[i, j] += u_p[n][i, j]*v_p[n][i, j]
nmb_val += 1
if nmb_val > nmb_val_min:
rs_xx[i, j] /= nmb_val
rs_yy[i, j] /= nmb_val
rs_xy[i, j] /= nmb_val
else:
rs_xx[i, j] = 0
rs_yy[i, j] = 0
rs_xy[i, j] = 0
mask_rs[i, j] = True
# masking and storing
axe_x, axe_y = self.axe_x, self.axe_y
unit_x, unit_y = self.unit_x, self.unit_y
unit_values = self.unit_values
rs_xx_sf = sf.ScalarField()
rs_xx_sf.import_from_arrays(axe_x, axe_y, rs_xx, mask_rs,
unit_x, unit_y, unit_values**2)
rs_yy_sf = sf.ScalarField()
rs_yy_sf.import_from_arrays(axe_x, axe_y, rs_yy, mask_rs,
unit_x, unit_y, unit_values**2)
rs_xy_sf = sf.ScalarField()
rs_xy_sf.import_from_arrays(axe_x, axe_y, rs_xy, mask_rs,
unit_x, unit_y, unit_values**2)
return (rs_xx_sf, rs_yy_sf, rs_xy_sf)
[docs] def get_spectral_filtering(self, fmin, fmax, order=2, inplace=False):
"""
Perform a temporal spectral filtering
Parameters
----------
fmin, fmax : numbers
Minimal and maximal frequencies
order : integer, optional
Butterworth filter order
Returns
-------
filt_tvf : TemporalVectorFields
Filtered temporal field
"""
# prepare
if inplace:
tvf = self
else:
tvf = self.copy()
# make spectral filtering on Vx and Vy
ftsfx = self._get_comp_spectral_filtering('Vx', fmin=fmin,
fmax=fmax, order=order)
ftsfy = self._get_comp_spectral_filtering('Vy', fmin=fmin,
fmax=fmax, order=order)
for i in range(len(self)):
tvf.fields[i].comp_x = ftsfx.fields[i].values
tvf.fields[i].comp_y = ftsfy.fields[i].values
# return
if not inplace:
return tvf
[docs] def fill(self, tof='spatial', kind='linear', value=[0., 0.],
inplace=False, crop=False):
"""
Fill the masked part of the array in place.
Parameters
----------
tof : string
Can be 'temporal' for temporal interpolation, or 'spatial' for
spatial interpolation.
kind : string, optional
Type of algorithm used to fill.
'value' : fill with a given value
'nearest' : fill with nearest available data
'linear' : fill using linear interpolation
'cubic' : fill using cubic interpolation
value : 2x1 array
Value for filling, '[Vx, Vy]' (only usefull with tof='value')
inplace : boolean, optional
.
crop : boolean, optional
If 'True', TVF borders are croped before filling.
"""
# TODO : utiliser prof.Profile.fill au lieu d'une nouvelle méthode de
# filling
# checking parameters coherence
if len(self.fields) < 3 and tof == 'temporal':
raise ValueError("Not enough fields to fill with temporal"
" interpolation")
if not isinstance(tof, STRINGTYPES):
raise TypeError()
if tof not in ['temporal', 'spatial']:
raise ValueError()
if not isinstance(kind, STRINGTYPES):
raise TypeError()
if kind not in ['value', 'nearest', 'linear', 'cubic']:
raise ValueError()
if isinstance(value, NUMBERTYPES):
value = [value, value]
elif not isinstance(value, ARRAYTYPES):
raise TypeError()
value = np.array(value)
if crop:
self.crop_masked_border(hard=False, inplace=True)
# temporal interpolation
if tof == 'temporal':
# getting super mask (0 where no value are masked and where all
# values are masked)
masks = self.mask
sum_masks = np.sum(masks, axis=0)
super_mask = np.logical_and(0 < sum_masks,
sum_masks < len(self.fields) - 2)
# loop on each field position
for i, j in np.argwhere(super_mask):
# get time profiles
prof_x = self.get_time_profile(component='comp_x', pt=[i, j],
ind=True)
prof_y = self.get_time_profile(component='comp_y', pt=[i, j],
ind=True)
# getting masked position on profile
inds_masked_x = np.where(prof_x.mask)[0]
inds_masked_y = np.where(prof_y.mask)[0]
# creating interpolation function
if kind == 'value':
def interp_x(x):
return value[0]
def interp_y(x):
return value[1]
elif kind == 'nearest':
raise Exception("Not implemented yet")
elif kind == 'linear':
prof_filt = np.logical_not(prof_x.mask)
interp_x = spinterp.interp1d(prof_x.x[prof_filt],
prof_x.y[prof_filt],
kind='linear')
prof_filt = np.logical_not(prof_y.mask)
interp_y = spinterp.interp1d(prof_y.x[prof_filt],
prof_y.y[prof_filt],
kind='linear')
elif kind == 'cubic':
prof_filt = np.logical_not(prof_x.mask)
interp_x = spinterp.interp1d(prof_x.x[prof_filt],
prof_x.y[prof_filt],
kind='cubic')
prof_filt = np.logical_not(prof_y.mask)
interp_y = spinterp.interp1d(prof_y.x[prof_filt],
prof_y.y[prof_filt],
kind='cubic')
else:
raise ValueError("Invalid value for 'kind'")
# inplace or not
fields = self.fields.copy()
# loop on all x profile masked points
for ind_masked in inds_masked_x:
try:
interp_val = interp_x(prof_x.x[ind_masked])
except ValueError:
continue
# putting interpolated value in the field
fields[ind_masked].comp_x[i, j] = interp_val
fields[ind_masked].mask[i, j] = False
# loop on all y profile masked points
for ind_masked in inds_masked_y:
try:
interp_val = interp_y(prof_y.x[ind_masked])
except ValueError:
continue
# putting interpolated value in the field
fields[ind_masked].comp_y[i, j] = interp_val
fields[ind_masked].mask[i, j] = False
# spatial interpolation
elif tof == 'spatial':
if inplace:
fields = self.fields
else:
tmp_tvf = self.copy()
fields = tmp_tvf.fields
for i, field in enumerate(fields):
fields[i].fill(kind=kind, value=value, inplace=True)
else:
raise ValueError("Unknown parameter for 'tof' : {}".format(tof))
# returning
if inplace:
self.fields = fields
else:
tmp_tvf = self.copy()
tmp_tvf.fields = fields
return tmp_tvf