Source code for categorize.melting

"""Functions to find melting layer from data."""
import numpy as np
from numpy import ma
from scipy.ndimage import gaussian_filter

from cloudnetpy import utils
from cloudnetpy.categorize import droplet
from cloudnetpy.categorize.containers import ClassData
from cloudnetpy.constants import T0


[docs] def find_melting_layer(obs: ClassData, *, smooth: bool = True) -> np.ndarray: """Finds melting layer from model temperature, ldr, and velocity. Melting layer is detected using linear depolarization ratio, *ldr*, Doppler velocity, *v*, and wet-bulb temperature, *Tw*. The algorithm is based on *ldr* having a clear Gaussian peak around the melting layer. This signature is caused by the growth of ice crystals into snowflakes that are much larger. In addition, when snow and ice melt, emerging heavy water droplets start to drop rapidly towards ground. Thus, there is also a similar positive peak in the first difference of *v*. The peak in *ldr* is the primary parameter we analyze. If *ldr* has a proper peak, and *v* < -1 m/s in the base, melting layer has been found. If *ldr* is missing we only analyze the behaviour of *v*, which is always present, to detect the melting layer. Model temperature is used to limit the melting layer search to a certain temperature range around 0 C. For ECMWF the range is -4..+3, and for the rest -8..+6. Notes: This melting layer detection method is novel and needs to be validated. Also note that there might be some detection problems with strong updrafts of air. In these cases the absolute values for speed do not make sense (rain drops can even move upwards instead of down). Args: obs: The :class:`ClassData` instance. smooth: If True, apply a small Gaussian smoother to the melting layer. Default is True. Returns: 2-D boolean array denoting the melting layer. """ melting_layer = np.zeros(obs.tw.shape, dtype=bool) ldr_prof: np.ndarray | None = None ldr_dprof: np.ndarray | None = None ldr_diff: np.ndarray | None = None width_prof = None if hasattr(obs, "ldr"): # Required for peak detection diffu = ma.array(np.diff(obs.ldr, axis=1)) ldr_diff = diffu.filled(0) t_range = _find_model_temperature_range(obs.model_type) for ind, t_prof in enumerate(obs.tw): temp_indices = _get_temp_indices(t_prof, t_range) if len(temp_indices) <= 1: continue z_prof = obs.z[ind, temp_indices] v_prof = obs.v[ind, temp_indices] if ldr_diff is not None: if not hasattr(obs, "ldr"): msg = "ldr_diff is not None but obs.ldr does not exist" raise RuntimeError(msg) ldr_prof = obs.ldr[ind, temp_indices] ldr_dprof = ldr_diff[ind, temp_indices] if ma.count(ldr_prof) > 3 or ma.count(v_prof) > 3: try: if ldr_prof is None or ldr_dprof is None: msg = "ldr_prof or ldr_dprof is None" raise AssertionError(msg) # noqa: TRY301 indices = _find_melting_layer_from_ldr( ldr_prof, ldr_dprof, v_prof, z_prof, ) except (ValueError, IndexError, AssertionError): height = obs.height[temp_indices] if hasattr(obs, "width"): width_prof = obs.width[ind, temp_indices] indices = _find_melting_layer_from_v(v_prof, width_prof, height) if indices is not None: melting_layer[ind, temp_indices[indices]] = True if smooth: smoothed_layer = gaussian_filter(np.array(melting_layer, dtype=float), (2, 0.1)) melting_layer = (smoothed_layer > 0.2).astype(bool) return melting_layer
def _find_melting_layer_from_ldr( ldr_prof: np.ndarray, ldr_dprof: np.ndarray, v_prof: np.ndarray, z_prof: np.ndarray, ) -> np.ndarray | None: if ldr_prof is None: raise ValueError peak = int(np.argmax(ldr_prof)) base, top = _basetop(ldr_dprof, peak) conditions = ( ldr_prof[peak] - ldr_prof[base] > 4, ldr_prof[peak] > -30, z_prof[base] > -25, v_prof[base] < -1.5, ) if all(conditions): base = int(np.floor(base + (peak - base) / 2)) return np.arange(base, top) return None def _find_melting_layer_from_v( v_prof: np.ndarray, width_prof: np.ndarray | None, height: np.ndarray, ) -> np.ndarray | None: v = np.copy(v_prof[:-1]) v_diff = np.diff(v_prof) v[v_diff < 0] = 0 v[v_diff > 0] = 1 n_increasing = utils.cumsumr(v) try: top = int(np.argmax(n_increasing)) base = np.where(n_increasing[:top] == 0)[0][-1] except IndexError: return None if width_prof is not None: conditions = [ width_prof[base] - width_prof[top] > 0.2, v_prof[top] - v_prof[base] > 0.5, 50 < (height[top] - height[base]) < 1000, v_prof[base] < -2, ] else: conditions = [ v_prof[top] - v_prof[base] > 2, 50 < (height[top] - height[base]) < 1000, v_prof[base] < -2, ] if all(conditions): base = int(round(top - (top - base) / 2)) return np.arange(base, top) return None def _basetop(dprof: np.ndarray, pind: int) -> tuple[int, int]: """Finds the base and top of ldr peak.""" top = droplet.ind_top(dprof, pind, len(dprof), 10, 2) base = droplet.ind_base(dprof, pind, 10, 2) return base, top def _get_temp_indices(t_prof: np.ndarray, t_range: tuple) -> np.ndarray: """Finds indices of temperature profile covering the given range.""" ind = np.where((t_prof > min(t_range) + T0) & (t_prof < max(t_range) + T0))[0] return np.array([]) if len(ind) == 0 else np.arange(min(ind), max(ind) + 1) def _find_model_temperature_range(model_type: str) -> tuple[float, float]: """Returns temperature range around 0C for given model type.""" if "gdas1" in model_type.lower(): return -8, 6 return -4, 3