Commit 06aff800 authored by hugocruzz's avatar hugocruzz
Browse files

time x depth convention, envass 0.0.8 monotonic check, reordering, mask time_qual ?

parent cf936670
Pipeline #363272 passed with stage
in 4 minutes and 23 seconds
{"time": {"simple": {"numeric": "True", "bounds": [1514764800, "now"]}, "advanced": {}},"temp": {"simple": {"numeric": "True", "bounds": [4, 40]}, "advanced": {}}}
\ No newline at end of file
{"time": {"simple": {"numeric": "True", "bounds": [1514764800, "now"], "monotonic":["strictly_increasing"]}, "advanced": {}},"temp": {"simple": {"numeric": "True", "bounds": [4, 40]}, "advanced": {}}}
\ No newline at end of file
from .main import qualityassurance
\ No newline at end of file
import numpy as np
import warnings
import plotly.graph_objs as go
import plotly.offline as py
from ipywidgets import interactive, HBox, VBox
def check_data(x, time):
if type(x).__module__ != np.__name__:
raise TypeError("Input must be a numpy array.")
if len(x)!=len(time):
if len(time) in x.shape:
print("2D array recognized")
if x.shape[0]>x.shape[1]:
warnings.warn("Numbers of rows is greater than numbers of columns, rows must be depth and columns is time !")
else:
raise TypeError("Time and variable data should be of the same length")
def to_dict(kwargs):
for key in list(kwargs):
if type(kwargs[key])!=dict:
if kwargs[key]==False:
kwargs.pop(key)
else:
kwargs[key]={key:kwargs[key]}
return kwargs
def check_parameters(test, kwargs, parameters):
for arguments in kwargs[test].keys():
if arguments not in parameters[test]:
warnings.warn(f"argument {arguments} is not given to the function")
def isnt_number(n):
try:
if np.isnan(float(n)):
return True
except ValueError:
return True
else:
return False
def init_flag(time, prior_flags):
try:
if len(prior_flags):
flag = np.array(np.copy(prior_flags),dtype=bool)
except:
flag = np.zeros(time.shape, dtype=bool)
return flag
def interp_nan(time, y):
vec=np.copy(y)
nans, x = np.isnan(vec), lambda z: z.nonzero()[0]
vec[nans]=np.interp(time[nans], time[~nans],vec[~nans])
return vec
def plot_quality_assurance(df_sub):
variables=df_sub.columns
py.init_notebook_mode()
f = go.FigureWidget([go.Scatter(y = df_sub.index, x = df_sub.index, mode = 'markers')])
scatter = f.data[0]
N = len(df_sub)
scatter.marker.opacity = 0.8
def update_axes(xaxis, yaxis):
scatter = f.data[0]
scatter.x = df_sub[xaxis]
scatter.y = df_sub[yaxis]
with f.batch_update():
f.layout.xaxis.title = xaxis
f.layout.yaxis.title = yaxis
if "_qual" not in yaxis:
f.add_trace(go.Scatter(y = df_sub[yaxis][df_sub[yaxis+"_qual"]==0], x = df_sub[xaxis][df_sub[yaxis+"_qual"]==0], mode = 'markers', marker = dict(color = 'blue'), name = f'{yaxis} Trusted (=0)'))
f.add_trace(go.Scatter(y = df_sub[yaxis][df_sub[yaxis+"_qual"]==1], x = df_sub[xaxis][df_sub[yaxis+"_qual"]==1], mode = 'markers', marker = dict(color = 'darkred'), name = f'{yaxis} Not trusted (=1)'))
else:
scatter.x = df_sub[xaxis]
scatter.y = df_sub[yaxis]
axis_dropdowns = interactive(update_axes, yaxis = df_sub.columns, xaxis = df_sub.columns)
# Create a table FigureWidget that updates on selection from points in the scatter plot of f
t = go.FigureWidget([go.Table(
header=dict(values=variables,
fill = dict(color='#C2D4FF'),
align = ['left'] * 5),
cells=dict(values=[df_sub[col] for col in variables],
fill = dict(color='#F5F8FF'),
align = ['left'] * 5))])
def selection_fn(trace,points,selector):
t.data[0].cells.values = [df_sub.loc[points.point_inds][col] for col in variables]
scatter.on_selection(selection_fn)
# Put everything together
return VBox((HBox(axis_dropdowns.children),f,t))
def nan_helper(y):
return np.isnan(y), lambda z: z.nonzero()[0]
import numpy as np
from .methods import *
from .functions import check_data, to_dict
def qualityassurance(variable, time, **kwargs):
"""
Quality assurance for timeseries data.
Parameters:
variable (np.array): Data array to which to apply the quality assurance
time (np.array): Time array for the variable
kwargs (dictionary): Integrate the type of test to perform with his corresponding parameters
type of test supported: numeric, bounds, edges, IQR, variation_rate, IQR_moving, IQR_window, convolution, kmeans, kmeans_threshold, maintenance,
parameters examples: window_size, window_type,semiwindow, ncluster, threshold
e.g. {"numeric":True,"IQR":{"factor":3}}
Returns:
qa (np.array): An array of ints where > 0 means non-trusted data.
"""
check_data(variable, time)
qa = np.zeros(variable.shape, dtype=int)
kwargs = to_dict(kwargs)
if "numeric" in kwargs:
qa[qa_numeric(variable)] = 1
if "bounds" in kwargs:
qa[qa_bounds(variable, **kwargs["bounds"])] = 1
if "edges" in kwargs:
qa[qa_edges(variable, time, **kwargs["edges"])] = 1
if "monotonic" in kwargs:
qa[qa_monotonic(time, **kwargs["monotonic"])] = 1
if "IQR" in kwargs:
qa[qa_iqr(variable, time, **kwargs["IQR"])] = 1
if "variation_rate" in kwargs:
qa[qa_variation_rate(variable, time, **kwargs["variation_rate"])] = 1
if "IQR_moving" in kwargs:
qa[qa_iqr_moving(variable, time, **kwargs["IQR_moving"])] = 1
if "IQR_window" in kwargs:
qa[qa_max(variable, time, **kwargs["IQR_window"])] = 1
if "kmeans" in kwargs:
qa[qa_kmeans(variable, time, **kwargs["kmeans"])] = 1
if "kmeans_threshold" in kwargs:
qa[qa_kmeans_threshold(variable, time, **kwargs["kmeans_threshold"])] = 1
if "maintenance" in kwargs:
qa[qa_maintenance(time, **kwargs["maintenance"])] = 1
if "individual_check" in kwargs:
qa[qa_individual(time, **kwargs["individual_check"])] = 1
if "flag_value" in kwargs:
qa[qa_flagvalue(variable, **kwargs["flag_value"])] = 1
return qa
import numpy as np
from .functions import isnt_number, interp_nan, init_flag, nan_helper
import pandas as pd
from sklearn.cluster import KMeans
from datetime import datetime
def qa_numeric(variable, prior_flags=False):
"""
Indicate values that are not considered numerical
Parameters:
variable (np.array): Data array to which to apply the quality assurance
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
flags = init_flag(variable, prior_flags)
isnt_numeric = np.vectorize(isnt_number, otypes=[bool])
flags[isnt_numeric(variable)] = True
return flags
def qa_bounds(variable, bounds, prior_flags=False):
"""¨
Indicate values which are not in the range specified by the bounds
Parameters:
variable (np.array): Data array to which to apply the quality assurance
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
#data = pd.to_numeric(variable, errors='coerce').astype(np.float)
data = np.squeeze(np.array(pd.DataFrame(variable).apply(pd.to_numeric, errors='coerce').astype(np.float)))
data[qa_numeric(data)] = np.nan
flags = init_flag(variable, prior_flags)
flags[~np.isnan(data)] = np.logical_or(data[~np.isnan(data)] < float(bounds[0]), data[~np.isnan(data)] > float(bounds[1]))
return flags
def qa_edges(variable, time, edges, prior_flags=False):
"""
Indicate values on the edges of the data set
Parameters:
variable (np.array): Data array to which to apply the quality assurance
time (np.array): Time array corresponding to the Data array, time should be in seconds
edges (int): time (s) in which the data will be cutted on the edges
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
flags = np.atleast_2d(init_flag(variable, prior_flags))
flags[:,time > time[-1] - edges] = True
flags[:,time < time[0] + edges] = True
return np.squeeze(flags)
def qa_monotonic(time, monotonic, prior_flags=False):
'''
Indicate values which are not (strictly) increasing/decreasing
Parameters:
time (np.array): Time array corresponding to the Data array, time should be in seconds
type (str): Indicate if the vector should be: strictly_increasing, increasing, strictly decreasing or decreasing
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
'''
flags = init_flag(time, prior_flags)
if 'strictly_increasing'in monotonic:
flags[np.where(np.diff(time)<=0)[0]+1] = 1
if 'strictly_decreasing'in monotonic:
flags[np.where(np.diff(time)>=0)[0]+1] = 1
if 'increasing'in monotonic:
flags[np.where(np.diff(time)<0)[0]+1] = 1
if 'decreasing'in monotonic:
flags[np.where(np.diff(time)>0)[0]+1] = 1
return flags
def qa_iqr(variable, time, factor=3, prior_flags=False):
"""
Indicate values outside interquartile Range (IQR) for timeseries data.
Parameters:
variable (np.array): Data array to which to apply the quality assurance
factor (int): threshold for outlier labelling rule
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flag (np.array): An array of bools where True means non-trusted data after this outlier detection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
q75 = np.quantile(data, 0.75)
q25 = np.quantile(data, 0.25)
iqr = q75 - q25
if iqr != 0:
sup = q75 + factor * iqr
inf = q25 - factor * iqr
idx1 = np.where(data >= sup)[0]
idx2 = np.where(data <= inf)[0]
idx0 = np.r_[idx1, idx2]
else:
idx0 = np.array([])
flags[idx0] = True
return flags
def qa_variation_rate(variable, time, prior_flags=False):
"""
Indicate the trustability of the values if variation rate exceed a defined threshold.
Parameters:
variable (np.array): Data array to which to apply the quality assurance
time (np.array): Time array corresponding to the Data array
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
vec_diff = abs(np.diff(data))
if len(vec_diff) > 0:
vecdiff_quan = np.quantile(vec_diff, 0.999)
idx_vecdiff = np.where(vec_diff >= vecdiff_quan)[0]
vec_max = np.max(data)
if vec_max / np.quantile(data, 0.99) < 2:
quantile_threshold = 0.99
elif vec_max / np.quantile(data, 0.999) < 2:
quantile_threshold = 0.999
elif vec_max / np.quantile(data, 0.9999) < 2:
quantile_threshold = 0.9999
elif vec_max / np.quantile(data, 0.99999) < 2:
quantile_threshold = 0.99999
vec_quan = np.quantile(data, quantile_threshold)
idx_vec = np.where(data >= vec_quan)[0]
idx = list(set(idx_vecdiff) & set(idx_vec))
else:
idx = []
flags = np.array(flags, dtype=bool)
flags[idx] = True
return flags
def qa_iqr_moving(variable, time, window_size=15, factor=3, prior_flags=False):
"""
Indicate outliers values based on Interquartile Range (IQR) for a window of time series data
Parameters:
variable (np.array): Data array to which to apply the quality assurance
windowsize (np.int): window size of data
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flags (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
if len(data) < window_size:
print("ERROR! Window size is larger than array length.")
else:
for i in range(0, (len(data) - window_size + 1)):
data_sub = np.copy(data[i:i + window_size])
q75 = np.quantile(data_sub, 0.75)
q25 = np.quantile(data_sub, 0.25)
IQR = q75 - q25
outsup = q75 + factor * IQR
outinf = q25 - factor * IQR
idx1 = np.where(data_sub >= outsup)[0]
idx2 = np.where(data_sub <= outinf)[0]
idx0 = np.r_[idx1, idx2]
if len(idx0) != 0:
flags[i + idx0] = flags[i + idx0] + 1
flags[flags>1]=1
flags = np.array(flags, dtype=bool)
return flags
def qa_max(variable, time, factor=3, semiwindow=1000, prior_flags=False):
"""
Indicate outliers values based on Interquartile Range (IQR) for a window of time series data
Parameters:
variable (np.array): Data array to which to apply the quality assurance
semiwindow (int): window size of data
factor (int): threshold for outlier labelling rule
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flags (np.array): An array of bools where True means non-trusted data for this outlier detection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
maxy = np.nanmax(data)
n0 = np.where(data == maxy)[0][0] - semiwindow
n1 = np.where(data == maxy)[0][0] + semiwindow
if n0 < 0:
n0 = 0 # maybe not ! check for the last dataset
if n1 > (len(data) - 1):
n1 = len(data) - 1 #Yes but add the non evaluated dataset to the next bin
vec_sub = data[n0:n1]
vec_qual = np.zeros(len(vec_sub))
vec_time = time[n0:n1]
flags99 = qa_iqr(vec_sub,vec_time,factor)
if sum(flags99) > 0:
flags[n0:n1] = flags[n0:n1] + flags99 # update vec_qual
flags = np.array(flags, dtype=bool)
return flags
def qa_kmeans(variable, time, ncluster=2, prior_flags = False):
"""
Indicate outliers based on kmean clustering.
Parameters:
variable (np.array): Data array to which to apply the quality assurance
ncluster (int) : number of cluster (>=2)
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flags (np.array): An array of bools where True means non-trusted data for this outlier detection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
clusterer = KMeans(n_clusters=ncluster)
clusterer.fit(data.reshape(-1, 1))
nearest_centroid_idx = clusterer.predict(data.reshape(-1, 1))
igr1 = np.where(nearest_centroid_idx == 0)[0]
igr2 = np.where(nearest_centroid_idx == 1)[0]
val_thresh = (np.mean(data[igr2]) - np.mean(data[igr1])) / np.quantile(data, 0.90)
if val_thresh >= 5: # if there is no 2 clearly seperated groups
flags[igr2] = True
flags = np.array(flags, dtype=bool)
return flags
def qa_kmeans_threshold(variable, time, ncluster=2, threshold=1.2, prior_flags=False):
"""
Indicate outliers based on kmean clustering and threshold value.
Parameters:
variable (np.array): Data array to which to apply the quality assurance
ncluster : number of cluster (>=2)
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flags (np.array): An array of bools where True means non-trusted data for this outlier detection
"""
data = interp_nan(time, np.copy(variable))
flags = init_flag(time, prior_flags)
clusterer = KMeans(n_clusters=ncluster)
clusterer.fit(data.reshape(-1, 1))
nearest_centroid_idx = clusterer.predict(data.reshape(-1, 1))
igr1 = np.where(nearest_centroid_idx == 0)[0]
igr2 = np.where(nearest_centroid_idx == 1)[0]
if len(igr1) > len(igr2):
igrfin = igr2
else:
igrfin = igr1
val_thresh = abs((np.mean(data[igr2]) - np.mean(data[igr1]))) / np.quantile(data, 0.90)
if val_thresh < threshold: # if there is no 2 clearly seperated groups
igrfin = []
else:
flags[igrfin] = True
flags = np.array(flags, dtype=bool)
return flags
def qa_maintenance(time,path='maintenance_log.csv', prior_flags=False):
"""
Indicate the trustability of values based on the maintenance logbook
Parameters:
time (np.array): Time array to which to apply the quality assurance
prior_flags (np.array): An array of bools where True means non-trusted data
additional comments: The maintenance_log.csv should have a date format '%Y-%m-%d %H:%M:%S.%f'
Returns:
flags (np.array): An array of bools where True means non-trusted data
"""
maintenance_log=pd.read_csv(path, sep=';')
flags = init_flag(time, prior_flags)
start=maintenance_log.start.apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'))
end=maintenance_log.end.apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'))
maintenance_end=[datetime.timestamp(s) for s in end]
maintenance_start=[datetime.timestamp(s) for s in start]
mask=[]
for i in range(0,len(maintenance_end)):
mask=(time>maintenance_start[i]) & (time<maintenance_end[i])
flags[mask]=True
return flags
def qa_individual(time, individual_check, prior_flags = False):
"""
Read individual checks and flag
Parameters:
time (np.array): Time array to which to apply the quality assurance
individual_check (list): List with points in time having to be removed
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flags (np.array): An array of bools where True means non-trusted data
"""
flags = init_flag(time, prior_flags)
for i in individual_check:
flag_idx = np.where(i==time)[0]
flags[flag_idx] = True
return flags
def qa_moving_average_limit(variable, window=100, limit=5, prior_flags=False):
"""
Indicate values on the edges of the data set
Parameters:
variable (np.array): Data array to which to apply the quality assurance
window (int): Size of window for moving average
limit (flaot): Allowable distance from the moving average
prior_flags (np.array): An array of bools where True means non-trusted data
Returns:
flag (np.array): An array of bools where True means non-trusted data for this outlier dectection
"""
flags = init_flag(variable, prior_flags)
ma = np.convolve(variable, np.ones(window), 'same') / window
nans, xx = nan_helper(ma)
ma[nans] = np.interp(xx(nans), xx(~nans), ma[~nans])
flags[np.abs(ma - variable) > limit] = True
return flags
def qa_flagvalue(variable, value, prior_flags=False):
flags = init_flag(variable, prior_flags)
flags[np.where(variable==value)[0]]=1
return flags
......@@ -6,6 +6,17 @@ import os
from shutil import move
import copy
from scipy.interpolate import griddata
from scipy import stats
import xarray as xr
def xr_interp(grid_time,grid_depth,time,depth,temp):
data_vars = {'temp':(["depth","time"], temp)}
coords = {'time':(['time'], time), 'depth':(['depth'], depth)}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
ds_nan=ds.interpolate_na(dim="time").interpolate_na(dim="depth")
ds_interp_nan = ds_nan.interp(depth=grid_depth).interp(time=grid_time)
return ds_interp_nan["temp"].values
def interp_nan_grid(time,depth,temp, method='linear'):
......@@ -31,15 +42,25 @@ def interp_rescale(time, depth, time_grid, depth_grid, temp, method='linear'):
temp_rescaled = griddata((time_mesh.ravel(),depth_mesh.ravel()),temp.ravel(),(time_mesh_grid,depth_mesh_grid),method=method)
return temp_rescaled
def low_pass_hypolimnion(vec, vec_depth, vec_qual, meta_bot,threshold=1.4, method="z_score"):
hypo_index=vec_depth>meta_bot
hypolimion_Tw= vec[hypo_index]
if len(hypolimion_Tw):
if method=="quantile":
low_pass=np.array(hypolimion_Tw>np.quantile(hypolimion_Tw,threshold), dtype=int)
else:
low_pass=(stats.zscore(hypolimion_Tw) > threshold).astype(int)
vec_qual[hypo_index]=low_pass
return vec_qual.astype(int)
def quality_assurance_tchain(depth_vec, df, df_qual, time_epilimnion_grad_threshold = 1.5,
time_hypolimnion_grad_threshold = 0.4 ,depth_grad_threshold = 1, perc_good = 0.8):
time_hypolimnion_grad_threshold = 0.4 ,depth_grad_threshold = 1, low_pass_threshold=1.3, perc_good = 0.8, meta_bot=30):
log("Performing gradient check along time and depth")
dfplot = df.copy()
dfqual = df_qual.copy()
ndepth = dfplot.shape[0]
for depth in range(0, 30):
for depth in range(0, meta_bot):
vec = np.copy(dfplot[depth, :])
vec_qual = np.copy(dfqual[depth, :])
vec_index = np.array(range(0, len(vec)))
......@@ -74,7 +95,7 @@ def quality_assurance_tchain(depth_vec, df, df_qual, time_epilimnion_grad_thresh
dfqual[depth, :] = 1
dfqual[depth, vec_index] = 0
for depth in range(31, ndepth):
for depth in range(meta_bot+1, ndepth):
vec = np.copy(dfplot[depth, :])
vec_qual = np.copy(dfqual[depth, :])
vec_index = np.array(range(0, len(vec)))
......@@ -126,6 +147,16 @@ def quality_assurance_tchain(depth_vec, df, df_qual, time_epilimnion_grad_thresh
vec_depth = np.copy(depth_vec)[vec_index]
vec_qual = np.zeros(len(vec))
vec_qual = low_pass_hypolimnion(vec, vec_depth, vec_qual, meta_bot, threshold=low_pass_threshold)
if sum(vec_qual):