"""
QC of sequential reports.
Module containing QC functions for track checking which could be applied on a DataBundle.
"""
from __future__ import annotations
from typing import cast
import numpy as np
import pandas as pd
from scipy.ndimage import label
from .auxiliary import (
SequenceDatetimeType,
SequenceIntType,
SequenceNumberType,
convert_units,
ensure_arrays,
failed,
inspect_arrays,
isvalid,
passed,
post_format_return_type,
)
from .spherical_geometry import sphere_distance
from .time_control import time_difference
from .track_check_utils import (
backward_discrepancy,
calculate_course_parameters,
calculate_midpoint,
calculate_speed_course_distance_time_difference,
check_distance_from_estimate,
direction_continuity,
forward_discrepancy,
modal_speed,
set_speed_limits,
speed_continuity,
)
[docs]
@post_format_return_type(["value"])
@inspect_arrays(["value", "lat", "lon", "date"], sortby="date")
@convert_units(lat="degrees", lon="degrees")
def do_spike_check(
value: SequenceNumberType,
lat: SequenceNumberType,
lon: SequenceNumberType,
date: SequenceDatetimeType,
max_gradient_space: float,
max_gradient_time: float,
delta_t: float,
n_neighbours: int,
) -> SequenceIntType:
"""
Perform IQUAM-like spike check.
Parameters
----------
value : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of values to be analyzed.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lat : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of latitudes in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lon : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of longitudes in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
date : :py:obj:`~marine_qc.SequenceDatetimeType`
One-dimensional array of datetime values.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
max_gradient_space : float, default: 0.5
Maximum allowed spatial gradient.
The unit is "units of value" per kilometer.
max_gradient_time : float, default: 1.0
Maximum allowed temporal gradient.
The unit is "units of value" per hour.
delta_t : float, default: 2.0
Temperature delta used in the comparison.
Typically set to 2.0 for ships and 1.0 for drifting buoys.
n_neighbours : int, default: 5
Number of neighboring points considered in the analysis.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if the spike check fails.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If either input is not 1-dimensional or if their lengths do not match.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
In previous versions, default values for the parameters were:
* max_gradient_space: float = 0.5
* max_gradient_time: float = 1.0
* delta_t: float = 2.0
* n_neighbours: int = 5
"""
value, lat, lon, date = ensure_arrays(value=value, lat=lat, lon=lon, date=date)
value = cast(np.ndarray, value)
lat = cast(np.ndarray, lat)
lon = cast(np.ndarray, lon)
date = cast(np.ndarray, date)
gradient_violations = []
count_gradient_violations = []
number_of_obs = len(value)
spike_qc = np.asarray([passed] * number_of_obs) # type: np.ndarray
for t1 in range(number_of_obs):
violations_for_this_report = []
count_violations_this_report = 0.0
lo = max(0, t1 - n_neighbours)
hi = min(number_of_obs, t1 + n_neighbours + 1)
for t2 in range(lo, hi):
if not isvalid(value[t1]) or not isvalid(value[t2]):
continue
distance = sphere_distance(lat[t1], lon[t1], lat[t2], lon[t2])
delta = pd.Timestamp(date[t2]) - pd.Timestamp(date[t1])
time_diff = abs(delta.days * 24 + delta.seconds / 3600.0)
val_change = abs(value[t2] - value[t1])
iquam_condition = max(
[
delta_t,
abs(distance) * max_gradient_space,
abs(time_diff) * max_gradient_time,
]
)
if val_change > iquam_condition:
violations_for_this_report.append(t2)
count_violations_this_report += 1.0
gradient_violations.append(violations_for_this_report)
count_gradient_violations.append(count_violations_this_report)
while np.sum(count_gradient_violations) > 0.0:
most_fails = int(np.argmax(count_gradient_violations))
spike_qc[most_fails] = failed
for index in gradient_violations[most_fails]:
if most_fails in gradient_violations[index]:
gradient_violations[index].remove(most_fails)
count_gradient_violations[index] -= 1.0
count_gradient_violations[most_fails] = 0
return spike_qc
[docs]
@post_format_return_type(["vsi"])
@inspect_arrays(["vsi", "dsi", "lat", "lon", "date"], sortby="date")
@convert_units(vsi="km/h", dsi="degrees", lat="degrees", lon="degrees")
def do_track_check(
vsi: SequenceNumberType,
dsi: SequenceNumberType,
lat: SequenceNumberType,
lon: SequenceNumberType,
date: SequenceDatetimeType,
max_direction_change: float,
max_speed_change: float,
max_absolute_speed: float,
max_midpoint_discrepancy: float,
) -> SequenceIntType:
"""
Perform one pass of the track check.
This is an implementation of the MDS track check code which was originally written in the 1990s.
I don't know why this piece of historic trivia so exercises my mind, but it does: the 1990s!
I wish my code would last so long.
Parameters
----------
vsi : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional reported speed array in km/h.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
dsi : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional reported heading array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lat : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional latitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lon : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional longitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
date : :py:obj:`~marine_qc.SequenceDatetimeType`
One-dimensional date array.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
max_direction_change : float, default: 60.0
Maximum valid direction change in degrees.
max_speed_change : float, default: 10.0
Maximum valid speed change in km/h.
max_absolute_speed : float, default: 40.0
Maximum valid absolute speed in km/h.
max_midpoint_discrepancy : float, default: 150.0
Maximum valid midpoint discrepancy in meters.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if the track check fails.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If either input is not 1-dimensional or if their lengths do not match.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
If number of observations is less than three, the track check always passes.
In previous versions, the default values of the parameters were:
* max_direction_change = 60.0
* max_speed_change = 10.0
* max_absolute_speed = 40.0
* max_midpoint_discrepancy = 150.0
"""
vsi, dsi, lat, lon, date = ensure_arrays(vsi=vsi, dsi=dsi, lat=lat, lon=lon, date=date)
number_of_obs = len(lat)
# no obs in, no qc outcomes out
if number_of_obs == 0:
return np.asarray([])
# fewer than three obs - set the fewsome flag
if number_of_obs < 3:
return np.asarray([passed] * number_of_obs)
# work out speeds and distances between alternating points
speed_alt, _distance_alt, _course_alt, _timediff_alt = calculate_speed_course_distance_time_difference(
lat=lat,
lon=lon,
date=date,
alternating=True,
)
speed, _distance, course, timediff = calculate_speed_course_distance_time_difference(
lat=lat,
lon=lon,
date=date,
)
# what are the mean and mode speeds?
ms = modal_speed(speed)
# set speed limits based on modal speed
amax, _amaxx, _amin = set_speed_limits(ms)
# compare reported speeds and positions if we have them
forward_diff_from_estimated = forward_discrepancy(
lat=lat,
lon=lon,
date=date,
vsi=vsi,
dsi=dsi,
)
reverse_diff_from_estimated = backward_discrepancy(
lat=lat,
lon=lon,
date=date,
vsi=vsi,
dsi=dsi,
)
midpoint_diff_from_estimated = calculate_midpoint(
lat=lat,
lon=lon,
timediff=timediff,
)
thisqc_a = np.zeros(number_of_obs)
thisqc_b = np.zeros(number_of_obs)
speed_alt_previous = np.roll(speed_alt, 1)
speed_alt_next = np.roll(speed_alt, -1)
speed_next = np.roll(speed, -1)
selection1 = speed > amax
selection2 = speed_alt_previous > amax
selection_a = np.logical_and(selection1, selection2)
selection1 = speed_next > amax
selection2 = speed_alt_next > amax
selection_b = np.logical_and(selection1, selection2)
selection1 = speed > amax
selection2 = speed_next > amax
selection_c = np.logical_and(selection1, selection2)
thisqc_a[selection_c] = thisqc_a[selection_c] + 3.00
thisqc_a[selection_b] = thisqc_a[selection_b] + 2.00
thisqc_a[selection_a] = thisqc_a[selection_a] + 1.00
# Quality-control by examining the distance
# between the calculated and reported second position.
thisqc_b += check_distance_from_estimate(vsi, timediff, forward_diff_from_estimated, reverse_diff_from_estimated)
# Check for continuity of direction
thisqc_b += direction_continuity(dsi, course, max_direction_change=max_direction_change)
# Check for continuity of speed.
thisqc_b += speed_continuity(vsi, speed, max_speed_change=max_speed_change)
thisqc_b[speed > max_absolute_speed] = thisqc_b[speed > max_absolute_speed] + 10.0
fails = (midpoint_diff_from_estimated > max_midpoint_discrepancy) & (thisqc_a > 0) & (thisqc_b > 0)
trk = np.where(fails, failed, passed)
return trk
[docs]
@post_format_return_type(["value"])
@inspect_arrays(["value"])
def do_few_check(
value: SequenceNumberType,
) -> SequenceIntType:
"""
Check if number of observations is less than 3.
Parameters
----------
value : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of values to be analyzed.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if number of observations is less than 3.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If either input is not 1-dimensional.
TypeError
If `inspect_arrays` does not return np.ndarrays.
"""
(value,) = ensure_arrays(value=value)
number_of_obs = len(value)
# no obs in, no qc outcomes out
if number_of_obs == 0:
return [failed] * number_of_obs
# fewer than three obs - set the fewsome flag
if number_of_obs < 3:
return [failed] * number_of_obs
return [passed] * number_of_obs
[docs]
@post_format_return_type(["at"])
@inspect_arrays(["at", "dpt", "lat", "lon", "date"], sortby="date")
@convert_units(at="K", dpt="K", lat="degrees", lon="degrees")
def find_saturated_runs(
at: SequenceNumberType,
dpt: SequenceNumberType,
lat: SequenceNumberType,
lon: SequenceNumberType,
date: SequenceDatetimeType,
min_time_threshold: float,
shortest_run: int,
) -> SequenceIntType:
"""
Perform checks on persistence of 100% rh while going through the voyage.
While going through the voyage repeated strings of 100 %rh (AT == DPT) are noted.
If a string extends beyond 20 reports and two days/48 hrs in time then all values are set to
fail the repsat qc flag.
Parameters
----------
at : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional air temperature array.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
dpt : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional dew point temperature array.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lat : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional latitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lon : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional longitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
date : :py:obj:`~marine_qc.SequenceDatetimeType`
One-dimensional date array.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
min_time_threshold : float, default: 48.0
Minimum time threshold in hours.
shortest_run : int, default: 4
Shortest number of observations.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if a saturated run is found.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If either input is not 1-dimensional or if their lengths do not match.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
In previous version, default values for the parameters were:
* min_time_threshold = 48.0
* shortest_run = 4
"""
at, dpt, lat, lon, date = ensure_arrays(at=at, dpt=dpt, lat=lat, lon=lon, date=date)
saturated = at == dpt
# Label contiguous runs of saturation
labeled_array, num_features = label(saturated)
# Initialize result array
qc_flags = np.zeros_like(at, dtype=int) + passed
for run_id in range(1, num_features + 1):
indices = np.where(labeled_array == run_id)[0]
if len(indices) < shortest_run:
continue
i_start = indices[0]
i_end = indices[-1]
# Time difference in hours
tdiff = time_difference(date[i_start], date[i_end])
if tdiff >= min_time_threshold:
qc_flags[indices] = failed
return qc_flags
[docs]
@post_format_return_type(["value"])
@inspect_arrays(["value"])
def find_multiple_rounded_values(value: SequenceNumberType, min_count: int, threshold: float) -> SequenceIntType:
"""
Find instances when more than "threshold" of the observations are whole numbers and set the 'round' flag.
Used in the humidity QC where there are times when the values are rounded and this may have caused a bias.
Parameters
----------
value : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of values.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
min_count : int, default: 20
Minimum number of rounded figures that will trigger the test.
threshold : float, default: 0.5
Minimum fraction of all observations that will trigger the test.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if the value is a whole number.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If `threshold` is not between 0.0 and 1.0.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
Previous versions had default values for the parameters of:
* min_count = 20
* threshold = 0.5
"""
if not (0.0 <= threshold <= 1.0):
raise ValueError(f"Invalid threshold: {threshold}. Must be between 0.0 and 1.0.")
(value,) = ensure_arrays(value=value)
number_of_obs = len(value)
if number_of_obs == 0:
return [passed] * number_of_obs
rounded = np.asarray([passed] * number_of_obs) # type: np.ndarray
valid_indices = isvalid(value)
allcount = np.count_nonzero(valid_indices)
if allcount <= min_count:
return rounded
# Find rounded values by checking where value mod 1 equals zero and set to failed if they exceed threshold
rounded_values = np.equal(np.mod(value[valid_indices], 1), 0)
cutoff = allcount * threshold
if np.count_nonzero(rounded_values) > cutoff:
rounded[valid_indices & rounded_values] = failed
return rounded
[docs]
@post_format_return_type(["value"])
@inspect_arrays(["value"])
def find_repeated_values(value: SequenceNumberType, min_count: int, threshold: float) -> SequenceIntType:
"""
Find cases where more than a given proportion of SSTs have the same value.
This function goes through a voyage and finds any cases where more than a threshold fraction of
the observations have the same values for a specified variable.
Parameters
----------
value : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional array of values.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
min_count : int, default: 20
Minimum number of repeated values that will trigger the test.
threshold : float, default: 0.7
Smallest fraction of all observations that will trigger the test.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if the value is repeated.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
- If `threshold` is not between 0.0 and 1.0.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
Previous versions had default values for the parameters of:
* min_count = 20
* threshold = 0.7
"""
if not (0.0 <= threshold <= 1.0):
raise ValueError(f"Invalid threshold: {threshold}. Must be between 0.0 and 1.0.")
(value,) = ensure_arrays(value=value)
number_of_obs = len(value)
if number_of_obs == 0:
return [passed] * number_of_obs
rep = np.asarray([passed] * number_of_obs) # type: np.ndarray
valid_indices = isvalid(value)
allcount = np.count_nonzero(valid_indices)
if allcount <= min_count:
return rep
_, unique_inverse, counts = np.unique(value[valid_indices], return_inverse=True, return_counts=True)
cutoff = threshold * allcount
exceedances = counts > cutoff
exceedances = np.where(exceedances, failed, passed)
pass_fail = exceedances[unique_inverse]
rep[valid_indices] = pass_fail
return rep
[docs]
@post_format_return_type(["lat"])
@inspect_arrays(["lat", "lon", "date"], sortby="date")
@convert_units(lat="degrees", lon="degrees")
def do_iquam_track_check(
lat: SequenceNumberType,
lon: SequenceNumberType,
date: SequenceDatetimeType,
speed_limit: float,
delta_d: float,
delta_t: float,
n_neighbours: int,
) -> SequenceIntType:
"""
Perform the IQUAM track check as detailed in Xu and Ignatov 2013.
The track check calculates speeds between pairs of observations and
counts how many exceed a threshold speed. The ob with the most
violations of this limit is flagged as bad and removed from the
calculation. Then the next worst is found and removed until no
violations remain.
Parameters
----------
lat : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional latitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
lon : :py:obj:`~marine_qc.SequenceNumberType`
One-dimensional longitude array in degrees.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
date : :py:obj:`~marine_qc.SequenceDatetimeType`
One-dimensional date array.
Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series.
speed_limit : float
Speed limit of platform in kilometers per hour.
Typically, 60.0 for ships and 15.0 for drifting buoys.
delta_d : float
Latitude tolerance in degrees.
delta_t : float
Time tolerance in hundredths of an hour.
n_neighbours : int
Number of neighbouring points considered in the analysis.
Returns
-------
:py:obj:`~marine_qc.SequenceIntType`
Same type as input, but with integer values
- Returns array/sequence/Series of 1s if the IQUAM QC fails.
- Returns array/sequence/Series of 0s otherwise.
Raises
------
ValueError
If either input is not 1-dimensional or if their lengths do not match.
TypeError
If `inspect_arrays` does not return np.ndarrays.
Notes
-----
Previous versions had default values for the parameters of:
* speed_limit = 60.0 for ships and 15.0 for drifting buoys
* delta_d = 1.11
* delta_t = 0.01
* n_neighbours = 5
"""
lat, lon, date = ensure_arrays(lat=lat, lon=lon, date=date)
number_of_obs = len(lat)
if number_of_obs == 0:
return [passed] * number_of_obs
speed_violations = []
count_speed_violations = []
iquam_track = np.asarray([passed] * number_of_obs) # type: np.ndarray
for t1 in range(0, number_of_obs):
violations_for_this_report = []
count_violations_this_report = 0.0
lo = max(0, t1 - n_neighbours)
hi = min(number_of_obs, t1 + n_neighbours + 1)
for t2 in range(lo, hi):
_, distance, _, timediff = calculate_course_parameters(
lat_later=lat[t2],
lat_earlier=lat[t1],
lon_later=lon[t2],
lon_earlier=lon[t1],
date_later=date[t2],
date_earlier=date[t1],
)
iquam_condition = max([abs(distance) - delta_d, 0.0]) / (abs(timediff) + delta_t)
if iquam_condition > speed_limit:
violations_for_this_report.append(t2)
count_violations_this_report += 1.0
speed_violations.append(violations_for_this_report)
count_speed_violations.append(count_violations_this_report)
while np.sum(count_speed_violations) > 0.0:
most_fails = int(np.argmax(count_speed_violations))
iquam_track[most_fails] = failed
for index in speed_violations[most_fails]:
if most_fails in speed_violations[index]:
speed_violations[index].remove(most_fails)
count_speed_violations[index] -= 1.0
count_speed_violations[most_fails] = 0.0
return iquam_track