Source code for datasource

"""Datasource module, containing the :class:`DataSource class.`"""
import logging
import os
from import Callable
from datetime import datetime, timezone

import netCDF4
import numpy as np

from cloudnetpy import utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.exceptions import ValidTimeStampError

[docs] class DataSource: """Base class for all Cloudnet measurements and model data. Args: full_path: Calibrated instrument / model NetCDF file. radar: Indicates if data is from cloud radar. Default is False. Attributes: filename (str): Filename of the input file. dataset (netCDF4.Dataset): A netCDF4 Dataset instance. source (str): Global attribute `source` read from the input file. time (ndarray): Time array of the instrument. altitude (float): Altitude of instrument above mean sea level (m). data (dict): Dictionary containing :class:`CloudnetArray` instances. """ calc_wet_bulb: Callable add_meta: Callable rebin_to_grid: Callable interpolate_to_grid: Callable interpolate_to_common_height: Callable filter_stripes: Callable calc_errors: Callable remove_incomplete_pixels: Callable filter_1st_gate_artifact: Callable screen_sparse_fields: Callable filter_speckle_noise: Callable correct_atten: Callable radar_frequency: float data_dense: dict data_sparse: dict source_type: str def __init__(self, full_path: os.PathLike | str, *, radar: bool = False): self.filename = os.path.basename(full_path) self.dataset = netCDF4.Dataset(full_path) self.source = getattr(self.dataset, "source", "") self.time: np.ndarray = self._init_time() self.altitude = self._init_altitude() self.height = self._init_height() dict = {} self._is_radar = radar
[docs] def getvar(self, *args) -> np.ndarray: """Returns data array from the source file variables. Returns just the data (and no attributes) from the original variables dictionary, fetched from the input netCDF file. Args: *args: possible names of the variable. The first match is returned. Returns: ndarray: The actual data. Raises: RuntimeError: The variable is not found. """ for arg in args: if arg in self.dataset.variables: return self.dataset.variables[arg][:] msg = f"Missing variable {args[0]} in the input file." raise RuntimeError(msg)
[docs] def append_data( self, variable: netCDF4.Variable | np.ndarray | float, key: str, name: str | None = None, units: str | None = None, dtype: str | None = None, ) -> None: """Adds new CloudnetVariable or RadarVariable into `data` attribute. Args: variable: netCDF variable or data array to be added. key: Key used with *variable* when added to `data` attribute (dictionary). name: attribute. Default value is *key*. units: CloudnetArray.units attribute. """[key] = CloudnetArray(variable, name or key, units, data_type=dtype)
[docs] def get_date(self) -> list: """Returns date components. Returns list: Date components [YYYY, MM, DD]. Raises RuntimeError: Not found or invalid date. """ try: year = str(self.dataset.year) month = str(self.dataset.month).zfill(2) day = str( datetime.strptime(f"{year}{month}{day}", "%Y%m%d").replace( tzinfo=timezone.utc, ) except (AttributeError, ValueError) as read_error: msg = "Missing or invalid date in global attributes." raise RuntimeError(msg) from read_error return [year, month, day]
[docs] def close(self) -> None: """Closes the open file.""" self.dataset.close()
[docs] @staticmethod def to_m(var: netCDF4.Variable) -> np.ndarray: """Converts km to m.""" alt = var[:] if var.units == "km": alt *= 1000 elif var.units not in ("m", "meters"): msg = f"Unexpected unit: {var.units}" raise ValueError(msg) return alt
[docs] @staticmethod def to_km(var: netCDF4.Variable) -> np.ndarray: """Converts m to km.""" alt = var[:] if var.units == "m": alt /= 1000 elif var.units != "km": msg = f"Unexpected unit: {var.units}" raise ValueError(msg) return alt
def _init_time(self) -> np.ndarray: time = self.getvar("time") if len(time) == 0: msg = "Empty time vector" raise ValidTimeStampError(msg) if max(time) > 25: logging.debug("Assuming time as seconds, converting to fraction hour") time = utils.seconds2hours(time) return time def _init_altitude(self) -> float | None: """Returns altitude of the instrument (m).""" if "altitude" in self.dataset.variables: var = self.dataset.variables["altitude"] if utils.is_all_masked(var[:]): return None altitude_above_sea = self.to_m(var) return float( altitude_above_sea if utils.isscalar(altitude_above_sea) else np.mean(altitude_above_sea), ) return None def _init_height(self) -> np.ndarray | None: """Returns height array above mean sea level (m).""" if "height" in self.dataset.variables: return self.to_m(self.dataset.variables["height"]) if "range" in self.dataset.variables and self.altitude is not None: range_instrument = self.to_m(self.dataset.variables["range"]) return np.array(range_instrument + self.altitude) return None def _variables_to_cloudnet_arrays(self, keys: tuple) -> None: """Transforms netCDF4-variables into CloudnetArrays. Args: keys: netCDF4-variables to be converted. The results are saved in ** dictionary with *fields* strings as keys. Notes: The attributes of the variables are not copied. Just the data. """ for key in keys: self.append_data(self.dataset.variables[key], key) def _unknown_variable_to_cloudnet_array( self, possible_names: tuple, key: str, units: str | None = None, *, ignore_mask: bool = False, ) -> None: """Transforms single netCDF4 variable into CloudnetArray. Args: possible_names: Tuple of strings containing the possible names of the variable in the input NetCDF file. key: Key for dictionary and name-attribute for the saved CloudnetArray object. units: Units attribute for the CloudnetArray object. ignore_mask: If true, always writes an ordinary numpy array. Raises: RuntimeError: No variable found. """ for name in possible_names: if name in self.dataset.variables: array = self.dataset.variables[name] if ignore_mask is True: array = np.array(array) self.append_data(array, key, units=units) return msg = f"Missing variable {possible_names[0]} in the input file." raise RuntimeError(msg) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()