# -*- coding: utf-8 -*-
#!/bin/env python3
# Copyright (C) 2003-2007 Gaby Launay
# Author: Gaby Launay <gaby.launay@tutanota.com>
# URL: https://framagit.org/gabylaunay/IMTreatment
# Version: 1.0
# This file is part of IMTreatment.
# IMTreatment is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# IMTreatment is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
import psutil
import warnings
import matplotlib.pyplot as plt
from .. import file_operation as imtio
from .. import pod as imtpod
from .. import vortex_detection as imtvod
from os.path import join
import os
[docs]class POD_CP_protocol(object):
def __init__(self, name, imtpath, respath, crop_x=[-np.inf, np.inf],
crop_y=[-np.inf, np.inf], crop_t=[-np.inf, np.inf],
hard_crop=True,
pod_coh=0.05, mirroring=[[2, 0], [1, 0]], eps_traj=15.,
red_fact=1., nmb_min_in_traj=1, det_fact=1,
thread='all', remove_weird=False):
self.name = name
self.imtpath = imtpath
self.respath = respath
self.crop_x = crop_x
self.crop_y = crop_y
self.crop_t = crop_t
self.hard_crop = hard_crop
self.pod = None
self.pod_coh = pod_coh
self.mirroring = mirroring
self.eps_traj = eps_traj
if det_fact > 1:
self.detailled = True
else:
self.detailled = False
self.det_fact = det_fact
self.red_fact = red_fact
self.nmb_min_in_traj = nmb_min_in_traj
self.len_data = None
if thread == "all":
self.thread = psutil.cpu_coun()
else:
self.thread = thread
self.remove_weird = remove_weird
[docs] def prepare_data(self):
print(" Preparing data ")
# import data
tvf = imtio.import_from_file(join(self.imtpath,
"{}.cimt".format(self.name)))
# crop
if self.hard_crop:
tvf.crop_masked_border(hard=True, inplace=True)
else:
tvf.crop_masked_border(inplace=True)
tvf.crop(intervx=self.crop_x, intervy=self.crop_y, intervt=self.crop_t,
inplace=True)
# adjust temporal resolution
if self.red_fact < 1:
tvf.reduce_temporal_resolution(int(np.round(1./self.red_fact)),
mean=False, inplace=True)
# check for weird fields
if self.remove_weird:
tvf.remove_weird_fields(std_coef=3., treatment='interpolate',
inplace=True)
# save a display
mean_field = tvf.get_mean_field()
mean_field.crop_masked_border(hard=False, inplace=True)
plt.figure()
mean_field.display(kind='stream', density=3)
plt.savefig(join(self.respath, "{}/mean_field.png".format(self.name)))
plt.close(plt.gcf())
# store
imtio.export_to_file(tvf,
join(self.imtpath,
"{}_cln.cimt".format(self.name)))
imtio.export_to_file(mean_field,
join(self.imtpath,
"{}_mean.cimt".format(self.name)))
del tvf
[docs] def pod_decomp(self):
print(" Making POD decomposition ")
# improt data
tvf = imtio.import_from_file(join(self.imtpath, "{}_cln.cimt"
.format(self.name)))
# make the decomposition
pod = imtpod.modal_decomposition(tvf, kind='pod',
wanted_modes='all',
max_vecs_per_node=len(tvf) + 1,
verbose=False)
# save a display
pod.display()
plt.savefig(join(self.respath, "{}/pod.png".format(self.name)))
plt.close(plt.gcf())
# save data
imtio.export_to_file(pod, join(self.imtpath, "{}_pod.cimt"
.format(self.name)))
del pod
[docs] def pod_reconstr(self, detailled=False):
if self.detailled:
print(" Making detailled POD reconstruction ")
else:
print(" Making POD reconstruction ")
# improt data
pod = imtio.import_from_file(join(self.imtpath, "{}_pod.cimt"
.format(self.name)))
# make reconstruction
if detailled:
pod.augment_temporal_resolution(fact=self.det_fact,
interp='linear',
inplace=True)
wanted_modes = pod.modes_nmb[pod.get_spatial_coherence(raw=True) >
self.pod_coh]
wanted_modes = np.array(wanted_modes)
wanted_modes = wanted_modes[wanted_modes < len(pod.modes)/2.]
rec = pod.reconstruct(wanted_modes=wanted_modes)
coh = pod.get_spatial_coherence()
del pod
# save a display
plt.figure()
coh.display(color='k')
plt.xlim(0, np.max(wanted_modes)*2)
plt.axhline(self.pod_coh, linestyle='--', color='r')
plt.plot(wanted_modes, coh.y[wanted_modes], 'or')
if detailled:
plt.savefig(join(self.respath, "{}/rec_det.png".format(self.name)))
else:
plt.savefig(join(self.respath, "{}/rec.png".format(self.name)))
plt.close(plt.gcf())
# save data
if detailled:
imtio.export_to_file(rec, join(self.imtpath, "{}_rec_det.cimt"
.format(self.name)))
else:
imtio.export_to_file(rec, join(self.imtpath, "{}_rec.cimt"
.format(self.name)))
del rec
[docs] def CP_detection(self, detailled=False):
# improt data
if self.detailled:
print(" Making detailled CP detection")
else:
print(" Making CP detection")
mem_available = psutil.virtual_memory().free
if detailled:
rec = imtio.import_from_file(join(self.imtpath, "{}_rec_det.cimt"
.format(self.name)))
else:
rec = imtio.import_from_file(join(self.imtpath, "{}_rec.cimt"
.format(self.name)))
# Check if there is enough memory to use
filesize = mem_available - psutil.virtual_memory().free
possible_nmb_of_copies = int(mem_available/(filesize*2))
if possible_nmb_of_copies < 1:
possible_nmb_of_copies = 1
if possible_nmb_of_copies < self.thread:
warnings.warn("Use {} instead of {} threads because of "
"memory limitation"
.format(possible_nmb_of_copies, self.thread))
self.thread = possible_nmb_of_copies
# getting cp positions
traj = imtvod.get_critical_points(rec, kind='pbi',
mirroring=self.mirroring,
verbose=False, thread=self.thread)
traj.compute_traj(epsilon=self.eps_traj)
traj.clean_traj(self.nmb_min_in_traj)
# save display
plt.figure()
traj.display_traj('x', filt=[True, True, False, False, False],
marker=None)
if detailled:
plt.savefig(join(self.respath, "{}/traj_x_cln_det.png"
.format(self.name)))
else:
plt.savefig(join(self.respath, "{}/traj_x_cln.png"
.format(self.name)))
plt.close(plt.gcf())
plt.figure()
traj.display_traj('y', filt=[True, True, False, False, False],
marker=None)
if detailled:
plt.savefig(join(self.respath, "{}/traj_y_cln_det.png"
.format(self.name)))
else:
plt.savefig(join(self.respath, "{}/traj_y_cln.png"
.format(self.name)))
plt.close(plt.gcf())
# save
if detailled:
imtio.export_to_file(traj, join(self.imtpath, "{}_traj_det.cimt"
.format(self.name)))
else:
imtio.export_to_file(traj, join(self.imtpath, "{}_traj.cimt"
.format(self.name)))
del traj
[docs] def compute_everything(self):
if not os.path.exists(join(self.respath, "{}".format(self.name))):
os.mkdir(join(self.respath, "{}".format(self.name)))
if not os.path.exists(join(self.imtpath, "{}_cln.cimt"
.format(self.name))):
self._display_heading()
self.prepare_data()
self.pod_decomp()
self.pod_reconstr()
self.CP_detection()
elif not os.path.exists(join(self.imtpath, "{}_pod.cimt"
.format(self.name))):
self._display_heading()
self.pod_decomp()
self.pod_reconstr()
self.CP_detection()
elif not os.path.exists(join(self.imtpath, "{}_rec.cimt"
.format(self.name))):
self._display_heading()
self.pod_reconstr()
self.CP_detection()
elif not os.path.exists(join(self.imtpath, "{}_traj.cimt"
.format(self.name))):
self._display_heading()
self.CP_detection()
else:
pass
# detailled study
if self.detailled:
if not os.path.exists(join(self.imtpath, "{}_rec_det.cimt"
.format(self.name))):
self._display_heading()
self.pod_reconstr(detailled=True)
self.CP_detection(detailled=True)
elif not os.path.exists(join(self.imtpath, "{}_traj_det.cimt"
.format(self.name))):
self._display_heading()
self.CP_detection(detailled=True)
else:
pass
[docs] def recompute_everything(self):
if not os.path.exists(join(self.respath, "{}".format(self.name))):
os.mkdir(join(self.respath, "{}".format(self.name)))
self.prepare_data()
self.pod_decomp()
self.pod_reconstr()
self.CP_detection()
def _display_heading(self):
title = "= {} =".format(self.name)
print(("\n" + "="*len(title)))
print(title)
print(("="*len(title)))