"""Model module, containing the :class:`Model` class."""
import numpy as np
from numpy import ma
from scipy.interpolate import interp1d
from cloudnetpy import utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.datasource import DataSource
from cloudnetpy.exceptions import ModelDataError
[docs]
class Model(DataSource):
"""Model class, child of DataSource.
Args:
model_file: File name of the NWP model file.
alt_site: Altitude of the site above mean sea level (m).
Attributes:
source_type (str): Model type, e.g. 'gdas1' or 'ecwmf'.
model_heights (ndarray): 2-D array of model heights (one for each time
step).
mean_height (ndarray): Mean of *model_heights*.
data_sparse (dict): Model variables in common height grid but without
interpolation in time.
data_dense (dict): Model variables interpolated to Cloudnet's dense
time / height grid.
"""
fields_dense = (
"temperature",
"pressure",
"rh",
"gas_atten",
"specific_gas_atten",
"specific_saturated_gas_atten",
"specific_liquid_atten",
)
fields_sparse = (*fields_dense, "q", "uwind", "vwind")
def __init__(self, model_file: str, alt_site: float):
super().__init__(model_file)
self.source_type = _find_model_type(model_file)
self.model_heights = self._get_model_heights(alt_site)
self.mean_height = _calc_mean_height(self.model_heights)
self.height: np.ndarray
self.data_sparse: dict = {}
self.data_dense: dict = {}
self._append_grid()
[docs]
def interpolate_to_common_height(self, wl_band: int) -> None:
"""Interpolates model variables to common height grid.
Args:
wl_band: Integer denoting the approximate wavelength band of the
cloud radar (0 = ~35.5 GHz, 1 = ~94 GHz).
"""
def _interpolate_variable(data_in: ma.MaskedArray) -> CloudnetArray:
datai = ma.zeros((len(self.time), len(self.mean_height)))
for ind, (alt, prof) in enumerate(
zip(self.model_heights, data_in, strict=True),
):
if prof.mask.all():
datai[ind, :] = ma.masked
else:
fun = interp1d(alt, prof, fill_value="extrapolate")
datai[ind, :] = fun(self.mean_height)
return CloudnetArray(datai, key, units)
for key in self.fields_sparse:
variable = self.dataset.variables[key]
data = variable[:]
units = variable.units
if "atten" in key:
data = data[wl_band, :, :]
self.data_sparse[key] = _interpolate_variable(data)
[docs]
def interpolate_to_grid(
self,
time_grid: np.ndarray,
height_grid: np.ndarray,
) -> list:
"""Interpolates model variables to Cloudnet's dense time / height grid.
Args:
time_grid: The target time array (fraction hour).
height_grid: The target height array (m).
Returns:
Indices fully masked profiles.
"""
for key in self.fields_dense:
array = self.data_sparse[key][:]
valid_profiles = _find_number_of_valid_profiles(array)
if valid_profiles < 2:
raise ModelDataError
self.data_dense[key] = utils.interpolate_2d_mask(
self.time,
self.mean_height,
array,
time_grid,
height_grid,
)
self.height = height_grid
return utils.find_masked_profiles_indices(self.data_dense["temperature"])
[docs]
def calc_wet_bulb(self) -> None:
"""Calculates wet-bulb temperature in dense grid."""
wet_bulb_temp = atmos_utils.calc_wet_bulb_temperature(self.data_dense)
self.append_data(wet_bulb_temp, "Tw", units="K")
[docs]
def screen_sparse_fields(self) -> None:
"""Removes model fields that we don't want to write in the output."""
fields_to_keep = ("temperature", "pressure", "q", "uwind", "vwind")
self.data_sparse = {key: self.data_sparse[key] for key in fields_to_keep}
def _append_grid(self) -> None:
self.append_data(np.array(self.time), "model_time")
self.append_data(self.mean_height, "model_height")
def _get_model_heights(self, alt_site: float) -> np.ndarray:
"""Returns model heights for each time step."""
try:
model_heights = self.dataset.variables["height"]
except KeyError as err:
msg = "No 'height' variable in the model file."
raise ModelDataError(msg) from err
return self.to_m(model_heights) + alt_site
def _calc_mean_height(model_heights: np.ndarray) -> np.ndarray:
mean_height = ma.mean(model_heights, axis=0)
return np.array(mean_height)
def _find_model_type(file_name: str) -> str:
"""Finds model type from the model filename."""
possible_keys = ("gdas1", "icon", "ecmwf", "harmonie", "era5")
for key in possible_keys:
if key in file_name:
return key
msg = "Unknown model type"
raise ValueError(msg)
def _find_number_of_valid_profiles(array: np.ndarray) -> int:
n_good = 0
for row in array:
if not hasattr(row, "mask") or np.sum(row.mask.astype(int)) == 0:
n_good += 1
return n_good