Source code for marine_qc.track_check_utils

"""
The New Track Check QC module provides the functions needed to perform the track check.

The main routine is mds_full_track_check which takes a
list of class`.MarineReport` from a single ship and runs the track check on them.
This is an update of the MDS system track check in that it assumes the Earth is a
sphere. In practice, it gives similar results to the cylindrical earth formerly
assumed.
"""

from __future__ import annotations
from datetime import datetime

import numpy as np
import pandas as pd

from . import spherical_geometry as sg
from . import spherical_geometry as sph
from . import time_control
from .auxiliary import (
    SequenceDatetimeType,
    SequenceFloatType,
    SequenceNumberType,
    convert_to,
    convert_units,
    ensure_arrays,
    inspect_arrays,
    isvalid,
    post_format_return_type,
)
from .spherical_geometry import (
    course_between_points,
    intermediate_point,
    sphere_distance,
)
from .time_control import time_difference






[docs] def set_speed_limits(amode: float) -> tuple[float, float, float]: """ Take a modal speed and calculate speed limits for the track checker. Parameters ---------- amode : float Modal speed in km/h. Returns ------- (float, float, float) Max speed, maximum max speed and min speed. """ amax = float(np.asarray(convert_to(15.0, "knots", "km/h"), dtype=float)) amaxx = float(np.asarray(convert_to(20.0, "knots", "km/h"), dtype=float)) amin = 0.00 if not isvalid(amode): return amax, amaxx, amin if amode <= float(np.asarray(convert_to(8.51, "knots", "km/h"), dtype=float)): return amax, amaxx, amin return amode * 1.25, float(np.asarray(convert_to(30.0, "knots", "km/h"), dtype=float)), amode * 0.75
[docs] @post_format_return_type(["alat1"], dtype=float, multiple=True) @inspect_arrays(["alat1", "alon1", "avs", "ads", "timediff"]) def increment_position( alat1: SequenceNumberType, alon1: SequenceNumberType, avs: SequenceNumberType, ads: SequenceNumberType, timediff: SequenceNumberType, ) -> tuple[np.ndarray, np.ndarray]: """ Compute latitude and longitude increments over half a time interval. This function takes latitudes and longitude, a speed, a direction and a time difference and returns increments of latitude and longitude which correspond to half the time difference. Parameters ---------- alat1 : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of Latitude at starting point in degrees. alon1 : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of Longitude at starting point in degrees. avs : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of speed of ship in km/h. ads : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of heading of ship in degrees. timediff : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of time difference between the points in hours. Returns ------- 1D np.ndarray of float Returns latitude and longitude increment or None and None if timediff is None. """ alat1 = np.array(alat1, dtype=float) alon1 = np.array(alon1, dtype=float) avs = np.array(avs, dtype=float) ads = np.array(ads, dtype=float) timediff = np.array(timediff, dtype=float) distance = avs * timediff / 2.0 lat, lon = sph.lat_lon_from_course_and_distance(alat1, alon1, ads, distance) lat = lat - alat1 lon = lon - alon1 return lat, lon
[docs] @post_format_return_type(["dsi"], dtype=float) @inspect_arrays(["dsi", "directions"]) def direction_continuity( dsi: SequenceNumberType, directions: SequenceNumberType, dsi_previous: SequenceNumberType = None, max_direction_change: float = 60.0, ) -> np.ndarray: """ Check if reported and calculated directions are within the allowed change. This function compares the heading at the previous time step with the calculated ship direction from reported positions, flagging differences that exceed the maximum allowed direction change. Parameters ---------- dsi : :py:obj:`~marine_qc.SequenceNumberType` Heading at current time step in degrees. directions : :py:obj:`~marine_qc.SequenceNumberType` Calculated ship direction from reported positions in degrees. dsi_previous : :py:obj:`~marine_qc.SequenceNumberType`, optional Heading at previous time step in degrees. If None, get dsi_previous from dsi. max_direction_change : float Largest deviations that will not be flagged in degrees. Returns ------- np.ndarray Returned array elements are 10.0 if the difference between reported and calculated direction is greater than the max_direction_change (default, 60 degrees), 0.0 otherwise. """ dsi = np.array(dsi, dtype=float) directions = np.array(directions, dtype=float) allowed_list = [0, 45, 90, 135, 180, 225, 270, 315, 360] result = np.zeros(len(dsi)) if not isvalid(max_direction_change): return result valid = np.isin(dsi, allowed_list) if dsi_previous is None: dsi_previous = np.roll(dsi, 1) dsi_previous = dsi_previous.astype(float) dsi_previous[0] = np.nan else: dsi_previous = np.array(dsi_previous, dtype=float) valid &= np.isin(dsi_previous, allowed_list) dsi_previous = np.atleast_1d(dsi_previous) selection1 = max_direction_change < abs(dsi - directions) selection2 = abs(dsi - directions) < (360 - max_direction_change) selection3 = max_direction_change < abs(dsi_previous - directions) selection4 = abs(dsi_previous - directions) < (360 - max_direction_change) result[np.logical_and(selection1, selection2)] = 10.0 result[np.logical_and(selection3, selection4)] = 10.0 result[~valid] = np.nan return result
[docs] @post_format_return_type(["vsi"], dtype=float) @inspect_arrays(["vsi", "speeds"]) def speed_continuity( vsi: SequenceNumberType, speeds: SequenceNumberType, vsi_previous: SequenceNumberType = None, max_speed_change: float | None = 10.0, ) -> np.ndarray: """ Check if reported speeds are within the allowed change from calculated speeds. This function compares the reported speed at the current and previous time steps with the speed calculated from positions. Flags positions where the change exceeds the maximum allowed speed change. Parameters ---------- vsi : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of reported speed in km/h at current time step. speeds : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional array of speed of ship calculated from locations at current and previous time steps in km/h. vsi_previous : :py:obj:`~marine_qc.SequenceNumberType`, optional One-dimensional array of reported speed in km/h at previous time step. If None, get vsi_previous from vsi. max_speed_change : float, optional Largest change of speed that will not raise flag in km/h, default 10 (km/h). Returns ------- np.ndarray Returned array elements are 10 if the reported and calculated speeds differ by more than 10 knots, 0 otherwise. """ vsi = np.array(vsi, dtype=float) speeds = np.array(speeds, dtype=float) result = np.zeros(len(vsi)) if not isvalid(max_speed_change): return result valid = isvalid(vsi) if vsi_previous is None: vsi_previous = np.roll(vsi, 1) vsi_previous = vsi_previous.astype(float) vsi_previous[0] = np.nan else: vsi_previous = np.array(vsi_previous, dtype=float) valid &= isvalid(vsi_previous) vsi_previous = np.atleast_1d(vsi_previous) selection1 = abs(vsi - speeds) > max_speed_change selection2 = abs(vsi_previous - speeds) > max_speed_change result[np.logical_and(selection1, selection2)] = 10.0 result[~valid] = np.nan return result
[docs] @post_format_return_type(["vsi"], dtype=float) @inspect_arrays(["vsi", "time_differences", "fwd_diff_from_estimated", "rev_diff_from_estimated"]) def check_distance_from_estimate( vsi: SequenceNumberType, time_differences: SequenceNumberType, fwd_diff_from_estimated: SequenceNumberType, rev_diff_from_estimated: SequenceNumberType, vsi_previous: SequenceNumberType = None, ) -> np.ndarray: """ Check that distances from estimated positions are less than calculated distance. The estimated positions are calculated forward and backwards in time. The calculated distance is the time difference multiplied by the average reported speeds. Parameters ---------- vsi : :py:obj:`~marine_qc.SequenceNumberType` Reported speed in km/h at current time step. time_differences : :py:obj:`~marine_qc.SequenceNumberType` Calculated time differences between reports in hours. fwd_diff_from_estimated : :py:obj:`~marine_qc.SequenceNumberType` Distance in km from estimated position, estimates made forward in time. rev_diff_from_estimated : :py:obj:`~marine_qc.SequenceNumberType` Distance in km from estimated position, estimates made backward in time. vsi_previous : :py:obj:`~marine_qc.SequenceNumberType`, optional One-dimensional array of reported speed in km/h at previous time step. If None, get vsi_previous from vsi. Returns ------- np.ndarray Returned array elements set to 10 if estimated and reported positions differ by more than the reported speed multiplied by the calculated time difference, 0 otherwise. Raises ------ TypeError If `inspect_arrays` does not return np.ndarrays. """ vsi = np.array(vsi, dtype=float) time_differences = np.array(time_differences, dtype=float) fwd_diff_from_estimated = np.array(fwd_diff_from_estimated, dtype=float) rev_diff_from_estimated = np.array(rev_diff_from_estimated, dtype=float) valid = isvalid(vsi) if vsi_previous is None: vsi_previous = np.roll(vsi, 1) vsi_previous[0] = np.nan else: vsi_previous = np.array(vsi_previous, dtype=float) valid &= isvalid(vsi_previous) vsi_previous = np.atleast_1d(vsi_previous) alwdis = time_differences * ((vsi + vsi_previous) / 2.0) selection = fwd_diff_from_estimated > alwdis selection = np.logical_and(selection, rev_diff_from_estimated > alwdis) selection = np.logical_and(selection, vsi > 0) selection = np.logical_and(selection, vsi_previous > 0) selection = np.logical_and(selection, time_differences > 0) result = np.zeros(len(vsi)) result[selection] = 10.0 return result
[docs] @convert_units( lat_later="degrees", lat_earlier="degrees", lon_later="degrees", lon_earlier="degrees", ) def calculate_course_parameters( lat_later: float, lat_earlier: float, lon_later: float, lon_earlier: float, date_later: datetime, date_earlier: datetime, ) -> tuple[float, float, float, float]: """ Calculate course parameters. Parameters ---------- lat_later : float Latitude in degrees of later timestamp. lat_earlier : float Latitude in degrees of earlier timestamp. lon_later : float Longitude in degrees of later timestamp. lon_earlier : float Longitude in degrees of earlier timestamp. date_later : datetime Date of later timestamp. date_earlier : datetime Date of earlier timestamp. Returns ------- tuple of float A tuple of four floats representing the speed, distance, course and time difference. """ distance = sg.sphere_distance(lat_later, lon_later, lat_earlier, lon_earlier) date_earlier = pd.Timestamp(date_earlier) date_later = pd.Timestamp(date_later) timediff = time_control.time_difference( date_earlier, date_later, ) if timediff != 0 and isvalid(timediff): speed = distance / abs(timediff) else: timediff = 0.0 speed = distance course = sg.course_between_points(lat_earlier, lon_earlier, lat_later, lon_later) return speed, distance, course, timediff
[docs] @post_format_return_type(["lat"], dtype=float, multiple=True) @inspect_arrays(["lat", "lon", "date"]) @convert_units(lat="degrees", lon="degrees") def calculate_speed_course_distance_time_difference( lat: SequenceNumberType, lon: SequenceNumberType, date: SequenceDatetimeType, alternating: bool = False, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Calculate speeds, courses, distances and time differences using consecutive reports. Parameters ---------- lat : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional latitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. lon : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional longitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. date : :py:obj:`~marine_qc.SequenceDatetimeType` One-dimensional date array. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. alternating : bool, default: False Whether to use alternating reports for calculation. Returns ------- tuple of np.ndarray, each with float values, shape (n,) A tuple containing four one-dimensional arrays representing: speed, distance, course, and time difference. """ lat = np.array(lat, dtype=float) lon = np.array(lon, dtype=float) date = np.array(date, dtype="datetime64[ns]") if alternating: distance = sphere_distance(np.roll(lat, 1), np.roll(lon, 1), np.roll(lat, -1), np.roll(lon, -1)) timediff = time_difference(np.roll(date, 1), np.roll(date, -1)) course = course_between_points(np.roll(lat, 1), np.roll(lon, 1), np.roll(lat, -1), np.roll(lon, -1)) # Alternating estimates are unavailable for the first and last elements distance[0] = np.nan distance[-1] = np.nan timediff[0] = np.nan timediff[-1] = np.nan course[0] = np.nan course[-1] = np.nan else: distance = sphere_distance(np.roll(lat, 1), np.roll(lon, 1), lat, lon) timediff = time_difference(np.roll(date, 1), date) course = course_between_points(np.roll(lat, 1), np.roll(lon, 1), lat, lon) # With the regular first differences, we don't have anything for the first element distance[0] = np.nan timediff[0] = np.nan course[0] = np.nan speed = np.zeros_like(timediff) valid = timediff != 0.0 speed[valid] = distance[valid] / timediff[valid] return speed, distance, course, timediff
[docs] @post_format_return_type(["vsi"], dtype=float) @inspect_arrays(["lat", "lon", "date", "vsi", "dsi"], sortby="date") @convert_units(vsi="km/h", dsi="degrees", lat="degrees", lon="degrees") def forward_discrepancy( lat: SequenceNumberType, lon: SequenceNumberType, date: SequenceDatetimeType, vsi: SequenceNumberType, dsi: SequenceNumberType, ) -> SequenceFloatType: """ Calculate the distance between the projected position and the actual position. The projected position is based on the reported speed and heading at the current and previous time steps. The observations are taken in time order. This takes the speed and direction reported by the ship and projects it forwards half a time step, it then projects it forwards another half time-step using the speed and direction for the next report, to which the projected location is then compared. The distances between the projected and actual locations is returned Parameters ---------- lat : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional latitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. lon : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional longitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. date : :py:obj:`~marine_qc.SequenceDatetimeType` One-dimensional date array. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. vsi : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional reported speed array in km/h. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. dsi : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional reported heading array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. Returns ------- :py:obj:`~marine_qc.SequenceFloatType` Same type as input, but with float values, shape (n,) One-dimensional array, sequence, or pandas Series containing distances from estimated positions. Raises ------ ValueError If either input is not 1-dimensional or if their lengths do not match. TypeError If decorator `inspect_arrays` does not return np.ndarrays. """ lat, lon, date, vsi, dsi = ensure_arrays(lat=lat, lon=lon, date=date, vsi=vsi, dsi=dsi) timediff = time_difference(np.roll(date, 1), date) lat1, lon1 = increment_position(np.roll(lat, 1), np.roll(lon, 1), np.roll(vsi, 1), dsi, timediff) lat2, lon2 = increment_position(lat, lon, vsi, dsi, timediff) updated_latitude = np.roll(lat, 1) + lat1 + lat2 updated_longitude = np.roll(lon, 1) + lon1 + lon2 # calculate distance between calculated position and the second reported position distance_from_est_location = sphere_distance(lat, lon, updated_latitude, updated_longitude) distance_from_est_location[0] = np.nan return distance_from_est_location
[docs] @post_format_return_type(["vsi"], dtype=float) @inspect_arrays(["lat", "lon", "date", "vsi", "dsi"], sortby="date") @convert_units(vsi="km/h", dsi="degrees", lat="degrees", lon="degrees") def backward_discrepancy( lat: SequenceNumberType, lon: SequenceNumberType, date: SequenceDatetimeType, vsi: SequenceNumberType, dsi: SequenceNumberType, ) -> SequenceFloatType: """ Calculate the distance between the projected position and the actual position. The projected position is based on the reported speed and heading at the current and previous time steps. The calculation proceeds from the final, later observation to the first (in contrast to distr1 which runs in time order) This takes the speed and direction reported by the ship and projects it forwards half a time step, it then projects it forwards another half-time step using the speed and direction for the next report, to which the projected location is then compared. The distances between the projected and actual locations is returned Parameters ---------- lat : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional latitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. lon : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional longitude array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. date : :py:obj:`~marine_qc.SequenceDatetimeType` One-dimensional date array. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. vsi : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional reported speed array in km/h. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. dsi : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional reported heading array in degrees. Can be a sequence (e.g., list or tuple), a one-dimensional NumPy array, or a pandas Series. Returns ------- :py:obj:`~marine_qc.SequenceFloatType` Same type as input, but with float values, shape (n,) One-dimensional array, sequence, or pandas Series containing distances from estimated positions. Raises ------ ValueError If either input is not 1-dimensional or if their lengths do not match. TypeError If decorator `inspect_arrays` does not return np.ndarrays. """ lat, lon, date, vsi, dsi = ensure_arrays(lat=lat, lon=lon, date=date, vsi=vsi, dsi=dsi) timediff = time_difference(np.roll(date, 1), date) lat2, lon2 = increment_position( np.roll(lat, 1), np.roll(lon, 1), np.roll(vsi, 1), np.roll(dsi, 1) - 180, timediff, ) lat1, lon1 = increment_position(lat, lon, vsi, dsi - 180, timediff) updated_latitude = lat + lat1 + lat2 updated_longitude = lon + lon1 + lon2 # calculate distance between calculated position and the second reported position distance_from_est_location = sphere_distance(np.roll(lat, 1), np.roll(lon, 1), updated_latitude, updated_longitude) distance_from_est_location[-1] = np.nan return distance_from_est_location
[docs] @post_format_return_type(["lat"], dtype=float) @inspect_arrays(["lat", "lon", "timediff"]) @convert_units(lat="degrees", lon="degrees") def calculate_midpoint( lat: SequenceNumberType, lon: SequenceNumberType, timediff: SequenceNumberType, ) -> np.ndarray: """ Interpolate between alternate reports and compare the interpolated location to the actual location. E.g. take difference between reports 2 and 4 and interpolate to get an estimate for the position at the time of report 3. Then compare the estimated and actual positions at the time of report 3. The calculation linearly interpolates the latitudes and longitudes (allowing for wrapping around the dateline and so on). Parameters ---------- lat : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional latitude array in degrees. lon : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional longitude array in degrees. timediff : :py:obj:`~marine_qc.SequenceNumberType` One-dimensional time difference array. Returns ------- 1D np.ndarray of float One-dimensional array of distances from estimated positions in kilometers. Raises ------ ValueError If either input is not 1-dimensional or if their lengths do not match. """ lat = np.array(lat, dtype=float) lon = np.array(lon, dtype=float) timediff = np.array(timediff, dtype=float) number_of_obs = len(lat) midpoint_discrepancies = np.asarray([np.nan] * number_of_obs) # type: np.ndarray t0 = timediff t1 = np.roll(timediff, -1) fraction_of_time_diff = np.zeros_like(t0) valid = (t0 + t1 != 0) & isvalid(t0) & isvalid(t1) fraction_of_time_diff[valid] = t0[valid] / (t0[valid] + t1[valid]) est_midpoint_lat, est_midpoint_lon = intermediate_point( np.roll(lat, 1), np.roll(lon, 1), np.roll(lat, -1), np.roll(lon, -1), fraction_of_time_diff, ) est_midpoint_lat[0] = np.nan est_midpoint_lat[-1] = np.nan est_midpoint_lon[0] = np.nan est_midpoint_lon[-1] = np.nan midpoint_discrepancies = sphere_distance(lat, lon, est_midpoint_lat, est_midpoint_lon) return midpoint_discrepancies