Source code for categorize.droplet

"""This module has functions for liquid layer detection.
"""
import numpy as np
import scipy.signal
from numpy import ma

import cloudnetpy.categorize.atmos
from cloudnetpy import utils
from cloudnetpy.categorize.containers import ClassData


[docs] def correct_liquid_top( obs: ClassData, is_liquid: np.ndarray, is_freezing: np.ndarray, limit: float = 200, ) -> np.ndarray: """Corrects lidar detected liquid cloud top using radar data. Args: obs: The :class:`ClassData` instance. is_liquid: 2-D boolean array denoting liquid clouds from lidar data. is_freezing: 2-D boolean array of sub-zero temperature, derived from the model temperature and melting layer based on radar data. limit: The maximum correction distance (m) above liquid cloud top. Returns: Corrected liquid cloud array. References: Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ. """ is_liquid_corrected = np.copy(is_liquid) liquid_tops = cloudnetpy.categorize.atmos.find_cloud_tops(is_liquid) top_above = utils.n_elements(obs.height, limit) for prof, top in zip(*np.where(liquid_tops), strict=True): ind = _find_ind_above_top(is_freezing[prof, top:], top_above) rad = obs.z[prof, top : top + ind + 1] if not (rad.mask.all() or ~rad.mask.any()): first_masked = ma.where(rad.mask)[0][0] is_liquid_corrected[prof, top : top + first_masked] = True return is_liquid_corrected
def _find_ind_above_top(is_freezing_from_peak: np.ndarray, top_above: int) -> int: first_point_below_zero = np.where(is_freezing_from_peak)[0][0] ind = first_point_below_zero + top_above return min(len(is_freezing_from_peak) - 1, ind)
[docs] def find_liquid( obs: ClassData, peak_amp: float = 1e-6, max_width: float = 300, min_points: int = 3, min_top_der: float = 1e-7, min_lwp: float = 0, min_alt: float = 100, ) -> np.ndarray: """Estimate liquid layers from SNR-screened attenuated backscatter. Args: obs: The :class:`ClassData` instance. peak_amp: Minimum value of peak. Default is 1e-6. max_width: Maximum width of peak. Default is 300 (m). min_points: Minimum number of valid points in peak. Default is 3. min_top_der: Minimum derivative above peak, defined as (beta_peak-beta_top) / (alt_top-alt_peak). Default is 1e-7. min_lwp: Minimum value from linearly interpolated lwp (kg m-2) measured by the mwr. Default is 0. min_alt: Minimum altitude of the peak from the ground. Default is 100 (m). Returns: 2-D boolean array denoting liquid layers. References: The method is based on Tuononen, M. et.al, 2019, https://acp.copernicus.org/articles/19/1985/2019/. """ def _is_proper_peak() -> bool: conditions = ( npoints >= min_points, peak_width < max_width, top_der > min_top_der, is_positive_lwp, peak_alt > min_alt, ) return all(conditions) lwp_int = interpolate_lwp(obs) beta = ma.copy(obs.beta) height = obs.height is_liquid = np.zeros(beta.shape, dtype=bool) base_below_peak = utils.n_elements(height, 200) top_above_peak = utils.n_elements(height, 150) difference = ma.array(np.diff(beta, axis=1)) beta_diff = difference.filled(0) beta = beta.filled(0) peak_indices = _find_strong_peaks(beta, peak_amp) for n, peak in zip(*peak_indices, strict=True): lprof = beta[n, :] dprof = beta_diff[n, :] try: base = ind_base(dprof, peak, base_below_peak, 4) top = ind_top(dprof, peak, height.shape[0], top_above_peak, 4) except IndexError: continue npoints = np.count_nonzero(lprof[base : top + 1]) peak_width = height[top] - height[base] peak_alt = height[peak] - height[0] top_der = (lprof[peak] - lprof[top]) / (height[top] - height[peak]) is_positive_lwp = lwp_int[n] > min_lwp if _is_proper_peak(): is_liquid[n, base : top + 1] = True return is_liquid
[docs] def ind_base(dprof: np.ndarray, ind_peak: int, dist: int, lim: float) -> int: """Finds base index of a peak in profile. Return the lowermost index of profile where 1st order differences below the peak exceed a threshold value. Args: dprof: 1-D array of 1st discrete difference. Masked values should be 0, e.g. dprof = np.diff(masked_prof).filled(0) ind_peak: Index of (possibly local) peak in the original profile. Note that the peak must be found with some other method before calling this function. dist: Number of elements investigated below *p*. If ( *p* - *dist*)<0, search starts from index 0. lim: Parameter for base index. Values greater than 1.0 are valid. Values close to 1 most likely return the point right below the maximum 1st order difference (within *dist* points below *p*). Values larger than 1 more likely accept some other point, lower in the profile. Returns: Base index of the peak. Raises: IndexError: Can't find proper base index (probably too many masked values in the profile). Examples: Consider a profile >>> x = np.array([0, 0.5, 1, -99, 4, 8, 5]) that contains one bad, masked value >>> mx = ma.masked_array(x, mask=[0, 0, 0, 1, 0, 0, 0]) [0, 0.5, 1.0, --, 4.0, 8.0, 5.0] The 1st order difference is now >>> dx = np.diff(mx).filled(0) [0.5, 0.5, 0, 0, 4, -3] From the original profile we see that the peak index is 5. Let's assume our base can't be more than 4 elements below peak and the threshold value is 2. Thus we call >>> ind_base(dx, 5, 4, 2) 4 When x[4] is the lowermost point that satisfies the condition. Changing the threshold value would alter the result >>> ind_base(dx, 5, 4, 10) 1 See Also: droplet.ind_top() """ start = max(ind_peak - dist, 0) # should not be negative diffs = dprof[start:ind_peak] mind = np.argmax(diffs) return start + np.where(diffs > diffs[mind] / lim)[0][0]
[docs] def ind_top(dprof: np.ndarray, ind_peak: int, nprof: int, dist: int, lim: float) -> int: """Finds top index of a peak in profile. Return the uppermost index of profile where 1st order differences above the peak exceed a threshold value. Args: dprof: 1-D array of 1st discrete difference. Masked values should be 0, e.g. dprof = np.diff(masked_prof).filled(0) nprof: Length of the profile. Top index can't be higher than this. ind_peak: Index of (possibly local) peak in the profile. Note that the peak must be found with some other method before calling this function. dist: Number of elements investigated above *p*. If (*p* + *dist*) > *nprof*, search ends to *nprof*. lim: Parameter for top index. Values greater than 1.0 are valid. Values close to 1 most likely return the point right above the maximum 1st order difference (within *dist* points above *p*). Values larger than 1 more likely accept some other point, higher in the profile. Returns: Top index of the peak. Raises: IndexError: Can not find proper top index (probably too many masked values in the profile). See Also: droplet.ind_base() """ end = min(ind_peak + dist, nprof) # should not be greater than len(profile) diffs = dprof[ind_peak:end] mind = np.argmin(diffs) return ind_peak + np.where(diffs < diffs[mind] / lim)[0][-1] + 1
[docs] def interpolate_lwp(obs: ClassData) -> np.ndarray: """Linear interpolation of liquid water path to fill masked values. Args: obs: The :class:`ClassData` instance. Returns: Liquid water path where the masked values are filled by interpolation. """ if obs.lwp.all() is ma.masked: return np.zeros(obs.time.shape) ind = ma.where(obs.lwp) return np.interp(obs.time, obs.time[ind], obs.lwp[ind])
def _find_strong_peaks(data: np.ndarray, threshold: float) -> tuple: """Finds local maximums from data (greater than *threshold*).""" peaks = scipy.signal.argrelextrema(data, np.greater, order=4, axis=1) strong_peaks = np.where(data[peaks] > threshold) return peaks[0][strong_peaks], peaks[1][strong_peaks]