Source code for products.product_tools

"""General helper classes and functions for all products."""
import os
from typing import NamedTuple

import netCDF4
import numpy as np
import requests
from numpy import ma

from cloudnetpy import constants, utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.datasource import DataSource


[docs] class IceCoefficients(NamedTuple): """Coefficients for ice effective radius retrieval.""" K2liquid0: float ZT: float T: float Z: float c: float
[docs] class CategorizeBits: """Class holding information about category and quality bits. Args: categorize_file (str): Categorize file name. Attributes: category_bits (dict): Dictionary containing boolean fields for `droplet`, `falling`, `cold`, `melting`, `aerosol`, `insect`. quality_bits (dict): Dictionary containing boolean fields for `radar`, `lidar`, `clutter`, `molecular`, `attenuated` and `corrected`. """ category_keys = ( "droplet", "falling", "cold", "melting", "aerosol", "insect", ) quality_keys = ( "radar", "lidar", "clutter", "molecular", "attenuated", "corrected", ) def __init__(self, categorize_file: str): self._categorize_file = categorize_file self.category_bits = self._read_bits("category") self.quality_bits = self._read_bits("quality") def _read_bits(self, bit_type: str) -> dict: """Converts bitfield into dictionary.""" with netCDF4.Dataset(self._categorize_file) as nc: try: bitfield = nc.variables[f"{bit_type}_bits"][:] except KeyError as err: raise KeyError from err keys = getattr(CategorizeBits, f"{bit_type}_keys") return {key: utils.isbit(bitfield, i) for i, key in enumerate(keys)}
[docs] class ProductClassification(CategorizeBits): """Base class for creating different classifications in the child classes of various Cloudnet products. Child of CategorizeBits class. Args: categorize_file (str): Categorize file name. Attributes: is_rain (ndarray): 1D array denoting rainy profiles. """ def __init__(self, categorize_file: str): super().__init__(categorize_file) self.is_rain = get_is_rain(categorize_file)
[docs] class IceClassification(ProductClassification): """Class storing the information about different ice types. Child of ProductClassification(). """ def __init__(self, categorize_file: str): super().__init__(categorize_file) self.is_ice = self._find_ice() self.would_be_ice = self._find_would_be_ice() self.corrected_ice = self._find_corrected_ice() self.uncorrected_ice = self._find_uncorrected_ice() self.ice_above_rain = self._find_ice_above_rain() self.cold_above_rain = self._find_cold_above_rain() def _find_ice(self) -> np.ndarray: return ( self.category_bits["falling"] & self.category_bits["cold"] & ~self.category_bits["melting"] & ~self.category_bits["insect"] ) def _find_would_be_ice(self) -> np.ndarray: warm_falling = ( self.category_bits["falling"] & ~self.category_bits["cold"] & ~self.category_bits["insect"] ) return warm_falling | self.category_bits["melting"] def _find_corrected_ice(self) -> np.ndarray: return ( self.is_ice & self.quality_bits["attenuated"] & self.quality_bits["corrected"] ) def _find_uncorrected_ice(self) -> np.ndarray: return ( self.is_ice & self.quality_bits["attenuated"] & ~self.quality_bits["corrected"] ) def _find_ice_above_rain(self) -> np.ndarray: is_rain = utils.transpose(self.is_rain) return (self.is_ice * is_rain) == 1 def _find_cold_above_rain(self) -> np.ndarray: is_cold = self.category_bits["cold"] is_rain = utils.transpose(self.is_rain) is_cold_rain = (is_cold * is_rain) == 1 return is_cold_rain & ~self.category_bits["melting"]
[docs] class IceSource(DataSource): """Base class for different ice products.""" def __init__(self, categorize_file: str, product: str): super().__init__(categorize_file) self.wl_band = utils.get_wl_band(float(self.getvar("radar_frequency"))) self.temperature = get_temperature(categorize_file) self.product = product self.coefficients = self._get_coefficients()
[docs] def append_main_variable_including_rain( self, ice_classification: IceClassification, ) -> None: """Adds the main variable (including ice above rain).""" data_including_rain = self._convert_z() data_including_rain[~ice_classification.is_ice] = ma.masked self.append_data(data_including_rain, f"{self.product}_inc_rain")
[docs] def append_main_variable(self, ice_classification: IceClassification) -> None: """Adds the main variable (excluding rain).""" data = ma.copy(self.data[f"{self.product}_inc_rain"][:]) data[ice_classification.ice_above_rain] = ma.masked self.append_data(data, self.product)
[docs] def append_status(self, ice_classification: IceClassification) -> None: """Adds the status of retrieval.""" data = self.data[self.product][:] retrieval_status = np.zeros(data.shape, dtype=int) is_data = ~data.mask retrieval_status[is_data] = 1 retrieval_status[is_data & ice_classification.corrected_ice] = 3 retrieval_status[is_data & ice_classification.uncorrected_ice] = 2 retrieval_status[~is_data & ice_classification.is_ice] = 4 retrieval_status[ice_classification.cold_above_rain] = 6 retrieval_status[ice_classification.ice_above_rain] = 5 retrieval_status[ice_classification.would_be_ice & (retrieval_status == 0)] = 7 self.append_data(retrieval_status, f"{self.product}_retrieval_status")
def _get_coefficients(self) -> IceCoefficients: """Returns coefficients for ice effective radius retrieval. References Hogan et.al. 2006, https://doi.org/10.1175/JAM2340.1 """ if self.product == "ier": if self.wl_band == 0: return IceCoefficients(0.878, -0.000205, -0.0015, 0.0016, -1.52) return IceCoefficients(0.669, -0.000296, -0.00193, -0.000, -1.502) if self.wl_band == 0: return IceCoefficients(0.878, 0.000242, -0.0186, 0.0699, -1.63) return IceCoefficients(0.669, 0.000580, -0.00706, 0.0923, -0.992) def _convert_z(self, z_variable: str = "Z") -> np.ndarray: """Calculates temperature weighted z, i.e. ice effective radius [m].""" if self.product not in ("iwc", "ier"): msg = f"Invalid product: {self.product}" raise ValueError(msg) if z_variable not in ("Z", "Z_sensitivity"): msg = f"Invalid z_variable: {z_variable}" raise ValueError(msg) temperature = ( self.temperature if z_variable == "Z" else ma.mean(self.temperature, axis=0) ) z_scaled = self.getvar(z_variable) + self._get_z_factor() g_to_kg = 0.001 m_to_mu = 1e6 scale = ( g_to_kg if self.product == "iwc" else 3 / (2 * constants.RHO_ICE) * m_to_mu ) return ( 10 ** ( self.coefficients.ZT * z_scaled * temperature + self.coefficients.T * temperature + self.coefficients.Z * z_scaled + self.coefficients.c ) * scale ) def _get_z_factor(self) -> float: """Returns empirical scaling factor for radar echo.""" k2 = np.array(self.coefficients.K2liquid0) / 0.93 return float(utils.lin2db(k2))
def get_is_rain(filename: str) -> np.ndarray: try: is_rain = read_nc_field(filename, "rain_detected") except KeyError: try: rainfall_rate = read_nc_field(filename, "rainfall_rate") except KeyError: rainfall_rate = read_nc_field(filename, "rain_rate") is_rain = rainfall_rate != 0 is_rain[is_rain.mask] = True return np.array(is_rain) def read_nc_field(nc_file: str, name: str) -> ma.MaskedArray: with netCDF4.Dataset(nc_file) as nc: return nc.variables[name][:]
[docs] def read_nc_fields(nc_file: str, names: list[str]) -> list[ma.MaskedArray]: """Reads selected variables from a netCDF file. Args: nc_file: netCDF file name. names: Variables to be read, e.g. ['ldr', 'lwp']. Returns: List of numpy arrays. """ with netCDF4.Dataset(nc_file) as nc: return [nc.variables[name][:] for name in names]
[docs] def interpolate_model(cat_file: str, names: str | list) -> dict[str, np.ndarray]: """Interpolates 2D model field into dense Cloudnet grid. Args: cat_file: Categorize file name. names: Model variable to be interpolated, e.g. 'temperature' or ['temperature', 'pressure']. Returns: dict: Interpolated variables. """ def _interp_field(var_name: str) -> np.ndarray: values = read_nc_fields( cat_file, ["model_time", "model_height", var_name, "time", "height"], ) return utils.interpolate_2d(*values) names = [names] if isinstance(names, str) else names return {name: _interp_field(name) for name in names}
[docs] def get_temperature(categorize_file: str) -> np.ndarray: """Returns interpolated temperatures in Celsius.""" atmosphere = interpolate_model(categorize_file, "temperature") return atmos_utils.k2c(atmosphere["temperature"])
def get_mwrpy_coeffs(nc_file: str) -> str: with netCDF4.Dataset(nc_file) as nc: return nc.mwrpy_coefficients.split(", ") def get_read_mwrpy_coeffs(mwr_l1c_file, folder: str) -> list: coeffs = [] for link in get_mwrpy_coeffs(mwr_l1c_file): full_path = os.path.join(folder, link.split("/")[-1]) with open(full_path, "wb") as f: f.write(requests.get(link, timeout=10).content) coeffs.append(full_path) return coeffs