"""Module containing low-level functions to classify gridded
radar / lidar measurements.
"""
import numpy as np
import skimage
from numpy import ma
import cloudnetpy.categorize.atmos
from cloudnetpy import utils
from cloudnetpy.categorize import droplet, falling, freezing, insects, melting
from cloudnetpy.categorize.containers import ClassData, ClassificationResult
[docs]
def classify_measurements(data: dict) -> ClassificationResult:
"""Classifies radar/lidar observations.
This function classifies atmospheric scatterers from the input data.
The input data needs to be averaged or interpolated to the common
time / height grid before calling this function.
Args:
data: Containing :class:`Radar`, :class:`Lidar`, :class:`Model`
and :class:`Mwr` instances.
Returns:
A :class:`ClassificationResult` instance.
References:
The Cloudnet classification scheme is based on methodology proposed by
Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ and its
proprietary Matlab implementation.
Notes:
Some individual classification methods are changed in this Python
implementation compared to the original Cloudnet methodology.
Especially methods classifying insects, melting layer and liquid droplets.
"""
obs = ClassData(data)
bits: list[np.ndarray] = [np.array([])] * 6
bits[3] = melting.find_melting_layer(obs)
bits[2] = freezing.find_freezing_region(obs, bits[3])
liquid_from_lidar = droplet.find_liquid(obs)
if obs.lv0_files is not None and len(obs.lv0_files) > 0:
if "rpg-fmcw-94" not in obs.radar_type.lower():
msg = "VoodooNet is only implemented for RPG-FMCW-94 radar."
raise NotImplementedError(msg)
import voodoonet
from voodoonet.utils import VoodooOptions
options = VoodooOptions(progress_bar=False)
target_time = voodoonet.utils.decimal_hour2unix(obs.date, obs.time)
liquid_prob = voodoonet.infer(
obs.lv0_files, target_time=target_time, options=options
)
liquid_from_radar = liquid_prob > 0.55
liquid_from_radar = _remove_false_radar_liquid(
liquid_from_radar,
liquid_from_lidar,
)
liquid_from_radar[~bits[2]] = 0
is_liquid = liquid_from_radar | liquid_from_lidar
else:
is_liquid = liquid_from_lidar
liquid_prob = None
bits[0] = droplet.correct_liquid_top(obs, is_liquid, bits[2], limit=500)
bits[5], insect_prob = insects.find_insects(obs, bits[3], bits[0])
bits[1] = falling.find_falling_hydrometeors(obs, bits[0], bits[5])
bits, filtered_ice = _filter_falling(bits)
for _ in range(5):
bits[3] = _fix_undetected_melting_layer(bits)
bits = _filter_insects(bits)
bits[4] = _find_aerosols(obs, bits[1], bits[0])
bits[4][filtered_ice] = False
return ClassificationResult(
category_bits=_bits_to_integer(bits),
is_rain=obs.is_rain,
is_clutter=obs.is_clutter,
insect_prob=insect_prob,
liquid_prob=liquid_prob,
)
def _remove_false_radar_liquid(
liquid_from_radar: np.ndarray,
liquid_from_lidar: np.ndarray,
) -> np.ndarray:
"""Removes radar-liquid below lidar-detected liquid bases."""
lidar_liquid_bases = cloudnetpy.categorize.atmos.find_cloud_bases(liquid_from_lidar)
for prof, base in zip(*np.where(lidar_liquid_bases), strict=True):
liquid_from_radar[prof, 0:base] = 0
return liquid_from_radar
[docs]
def fetch_quality(
data: dict,
classification: ClassificationResult,
attenuations: dict,
) -> dict:
"""Returns Cloudnet quality bits.
Args:
data: Containing :class:`Radar` and :class:`Lidar` instances.
classification: A :class:`ClassificationResult` instance.
attenuations: Dictionary containing keys `liquid_corrected`,
`liquid_uncorrected`.
Returns:
Dictionary containing `quality_bits`, an integer array with the bits:
- bit 0: Pixel contains radar data
- bit 1: Pixel contains lidar data
- bit 2: Pixel contaminated by radar clutter
- bit 3: Molecular scattering present (currently not implemented!)
- bit 4: Pixel was affected by liquid attenuation
- bit 5: Liquid attenuation was corrected
- bit 6: Data gap in radar or lidar data
"""
bits: list[np.ndarray] = [np.ndarray([])] * 7
radar_echo = data["radar"].data["Z"][:]
bits[0] = ~radar_echo.mask
bits[1] = ~data["lidar"].data["beta"][:].mask
bits[2] = classification.is_clutter
bits[4] = attenuations["liquid_corrected"] | attenuations["liquid_uncorrected"]
bits[5] = attenuations["liquid_corrected"]
qbits = _bits_to_integer(bits)
return {"quality_bits": qbits}
def _find_aerosols(
obs: ClassData,
is_falling: np.ndarray,
is_liquid: np.ndarray,
) -> np.ndarray:
"""Estimates aerosols from lidar backscattering.
Aerosols are lidar signals that are: a) not falling, b) not liquid droplets.
Args:
obs: A :class:`ClassData` instance.
is_falling: 2-D boolean array of falling hydrometeors.
is_liquid: 2-D boolean array of liquid droplets.
Returns:
2-D boolean array containing aerosols.
"""
is_beta = ~obs.beta.mask
return is_beta & ~is_falling & ~is_liquid
def _fix_undetected_melting_layer(bits: list) -> np.ndarray:
melting_layer = bits[3]
drizzle_and_falling = _find_drizzle_and_falling(*bits[:3])
transition = ma.diff(drizzle_and_falling, axis=1) == -1
melting_layer[:, 1:][transition] = True
return melting_layer
def _find_drizzle_and_falling(
is_liquid: np.ndarray,
is_falling: np.ndarray,
is_freezing: np.ndarray,
) -> np.ndarray:
"""Classifies pixels as falling, drizzle and others.
Args:
is_liquid: 2D boolean array denoting liquid layers.
is_falling: 2D boolean array denoting falling pixels.
is_freezing: 2D boolean array denoting subzero temperatures.
Returns:
2D array where values are 1 (falling, drizzle, supercooled liquids),
2 (drizzle), and masked (all others).
"""
falling_dry = is_falling & ~is_liquid
supercooled_liquids = is_liquid & is_freezing
drizzle = falling_dry & ~is_freezing
drizzle_and_falling = falling_dry.astype(int) + drizzle.astype(int)
drizzle_and_falling = ma.copy(drizzle_and_falling)
drizzle_and_falling[supercooled_liquids] = 1
drizzle_and_falling[drizzle_and_falling == 0] = ma.masked
return drizzle_and_falling
def _bits_to_integer(bits: list) -> np.ndarray:
"""Creates array of integers from individual boolean arrays.
Args:
bits: List of bit fields (of similar sizes) to be saved in the resulting
array of integers. bits[0] is saved as bit 0, bits[1] as bit 1, etc.
Returns:
Array of integers containing the information of the individual boolean arrays.
"""
int_array = np.zeros_like(bits[0], dtype=int)
for n, bit in enumerate(bits):
ind = np.where(bit) # works also if bit is None
int_array[ind] = utils.setbit(int_array[ind].astype(int), n)
return int_array
def _filter_insects(bits: list) -> list:
is_melting_layer = bits[3]
is_insects = bits[5]
is_falling = bits[1]
# Remove above melting layer
above_melting = utils.ffill(is_melting_layer)
ind = np.where(is_insects & above_melting)
is_falling[ind] = True
is_insects[ind] = False
# remove around melting layer:
original_insects = np.copy(is_insects)
n_gates = 5
for x, y in zip(*np.where(is_melting_layer), strict=True):
try:
# change insects to drizzle below melting layer pixel
ind1 = np.arange(y - n_gates, y)
ind11 = np.where(original_insects[x, ind1])[0]
n_drizzle = sum(is_falling[x, :y])
if n_drizzle > 5:
is_falling[x, ind1[ind11]] = True
is_insects[x, ind1[ind11]] = False
else:
continue
# change insects on the right and left of melting layer pixel to drizzle
ind1 = np.arange(x - n_gates, x + n_gates + 1)
ind11 = np.where(original_insects[ind1, y])[0]
is_falling[ind1[ind11], y - 1 : y + 2] = True
is_insects[ind1[ind11], y - 1 : y + 2] = False
except IndexError:
continue
bits[1] = is_falling
bits[5] = is_insects
return bits
def _filter_falling(bits: list) -> tuple:
# filter falling ice speckle noise
is_freezing = bits[2]
is_falling = bits[1]
is_falling_filtered = skimage.morphology.remove_small_objects(
is_falling,
10,
connectivity=1,
)
is_filtered = is_falling & ~np.array(is_falling_filtered)
ice_ind = np.where(is_freezing & is_filtered)
is_falling[ice_ind] = False
# in warm these are (probably) insects
insect_ind = np.where(~is_freezing & is_filtered)
is_falling[insect_ind] = False
bits[1] = is_falling
bits[5][insect_ind] = True
return bits, ice_ind