Source code for instruments.weather_station

import datetime

import numpy as np
from numpy import ma

from cloudnetpy import output
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.exceptions import ValidTimeStampError, WeatherStationDataError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.cloudnet_instrument import CloudnetInstrument
from cloudnetpy.instruments.toa5 import read_toa5
from cloudnetpy.metadata import MetaData
from cloudnetpy.utils import datetime2decimal_hours


[docs] def ws2nc( weather_station_file: str, output_file: str, site_meta: dict, uuid: str | None = None, date: str | None = None, ) -> str: """Converts weather station data into Cloudnet Level 1b netCDF file. Args: weather_station_file: Filename of weather-station ASCII file. output_file: Output filename. site_meta: Dictionary containing information about the site. Required key is `name`. uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD. Returns: UUID of the generated file. Raises: WeatherStationDataError : Unable to read the file. ValidTimeStampError: No valid timestamps found. """ try: ws: WS if site_meta["name"] == "Palaiseau": ws = PalaiseauWS(weather_station_file, site_meta) elif site_meta["name"] == "Granada": ws = GranadaWS(weather_station_file, site_meta) else: msg = "Unsupported site" raise ValueError(msg) # noqa: TRY301 if date is not None: ws.screen_timestamps(date) ws.convert_time() ws.add_date() ws.add_site_geolocation() ws.add_data() ws.convert_units() attributes = output.add_time_attribute(ATTRIBUTES, ws.date) output.update_attributes(ws.data, attributes) except ValueError as err: raise WeatherStationDataError from err return output.save_level1b(ws, output_file, uuid)
class WS(CloudnetInstrument): date: list[str] def convert_time(self) -> None: pass def screen_timestamps(self, date: str) -> None: pass def add_date(self) -> None: pass def add_data(self) -> None: pass def convert_units(self) -> None: pass class PalaiseauWS(WS): def __init__(self, filename: str, site_meta: dict): super().__init__() self.filename = filename self.site_meta = site_meta self.instrument = instruments.GENERIC_WEATHER_STATION self._data = self._read_data() def _read_data(self) -> dict: timestamps, values, header = [], [], [] with open(self.filename, encoding="latin-1") as f: data = f.readlines() for row in data: splat = row.split() try: timestamp = datetime.datetime.strptime( splat[0], "%Y-%m-%dT%H:%M:%SZ", ).replace(tzinfo=datetime.timezone.utc) temp: list[str | float] = list(splat) temp[1:] = [float(x) for x in temp[1:]] values.append(temp) timestamps.append(timestamp) except ValueError: header.append("".join(splat)) # Simple validation for now: expected_identifiers = [ "DateTime(yyyy-mm-ddThh:mm:ssZ)", "Windspeed(m/s)", "Winddirection(degres)", "Airtemperature(°C)", "Relativehumidity(%)", "Pressure(hPa)", "Precipitationrate(mm/min)", "24-hrcumulatedprecipitationsince00UT(mm)", ] column_titles = [row for row in header if "Col." in row] error_msg = "Unexpected weather station file format" if len(column_titles) != len(expected_identifiers): raise ValueError(error_msg) for title, identifier in zip(column_titles, expected_identifiers, strict=True): if identifier not in title: raise ValueError(error_msg) return {"timestamps": timestamps, "values": values} def convert_time(self) -> None: decimal_hours = datetime2decimal_hours(self._data["timestamps"]) self.data["time"] = CloudnetArray(decimal_hours, "time") def screen_timestamps(self, date: str) -> None: dates = [str(d.date()) for d in self._data["timestamps"]] valid_ind = [ind for ind, d in enumerate(dates) if d == date] if not valid_ind: raise ValidTimeStampError for key in self._data: self._data[key] = [ x for ind, x in enumerate(self._data[key]) if ind in valid_ind ] def add_date(self) -> None: first_date = self._data["timestamps"][0].date() self.date = [ str(first_date.year), str(first_date.month).zfill(2), str(first_date.day).zfill(2), ] def add_data(self) -> None: keys = ( "wind_speed", "wind_direction", "air_temperature", "relative_humidity", "air_pressure", "rainfall_rate", "rainfall_amount", ) for ind, key in enumerate(keys): array = [row[ind + 1] for row in self._data["values"]] array_masked = ma.masked_invalid(array) self.data[key] = CloudnetArray(array_masked, key) def convert_units(self) -> None: temperature_kelvins = atmos_utils.c2k(self.data["air_temperature"][:]) self.data["air_temperature"].data = temperature_kelvins self.data["relative_humidity"].data = self.data["relative_humidity"][:] / 100 self.data["air_pressure"].data = self.data["air_pressure"][:] * 100 # hPa -> Pa rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate / 60 / 1000 # mm/min -> m/s self.data["rainfall_amount"].data = ( self.data["rainfall_amount"][:] / 1000 ) # mm -> m class GranadaWS(WS): def __init__(self, filename: str, site_meta: dict): super().__init__() self.filename = filename self.site_meta = site_meta self.instrument = instruments.GENERIC_WEATHER_STATION self._data = self._read_data() def _read_data(self) -> dict: keymap = { "TIMESTAMP": "time", "air_t_Avg": "air_temperature", "rh_Avg": "relative_humidity", "pressure_Avg": "air_pressure", "wind_speed_avg": "wind_speed", "wind_dir_avg": "wind_direction", "rain_Tot": "rainfall_rate", } expected_units = { "air_t_Avg": "degC", "rh_Avg": "%", "pressure_Avg": "hPa", "wind_speed_avg": "m/s", "wind_dir_avg": "Deg", "rain_Tot": "mm", } units, process, rows = read_toa5(self.filename) for key in units: if key in expected_units and expected_units[key] != units[key]: msg = ( f"Expected {key} to have units {expected_units[key]}," f" got {units[key]} instead" ) raise ValueError(msg) data: dict[str, list] = {keymap[key]: [] for key in units if key in keymap} for row in rows: for key, value in row.items(): if key not in keymap: continue parsed = value if keymap[key] != "time": parsed = float(value) data[keymap[key]].append(parsed) return data def convert_time(self) -> None: pass def screen_timestamps(self, date: str) -> None: dates = [str(d.date()) for d in self._data["time"]] valid_ind = [ind for ind, d in enumerate(dates) if d == date] if not valid_ind: raise ValidTimeStampError for key in self._data: self._data[key] = [ x for ind, x in enumerate(self._data[key]) if ind in valid_ind ] def add_date(self) -> None: first_date = self._data["time"][0].date() self.date = [ str(first_date.year), str(first_date.month).zfill(2), str(first_date.day).zfill(2), ] def add_data(self) -> None: for key, value in self._data.items(): parsed = datetime2decimal_hours(value) if key == "time" else np.array(value) self.data[key] = CloudnetArray(parsed, key) self.data["rainfall_amount"] = CloudnetArray( np.cumsum(self._data["rainfall_rate"]), "rainfall_amount" ) def convert_units(self) -> None: temperature_kelvins = atmos_utils.c2k(self.data["air_temperature"][:]) self.data["air_temperature"].data = temperature_kelvins self.data["relative_humidity"].data = self.data["relative_humidity"][:] / 100 self.data["air_pressure"].data = self.data["air_pressure"][:] * 100 # hPa -> Pa rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate / 60 / 1000 # mm/min -> m/s self.data["rainfall_amount"].data = ( self.data["rainfall_amount"][:] / 1000 ) # mm -> m ATTRIBUTES = { "rainfall_amount": MetaData( long_name="Rainfall amount", standard_name="thickness_of_rainfall_amount", units="m", comment="Cumulated precipitation since 00:00 UTC", ), }