Source code for instruments.disdrometer.thies

import datetime
from collections import defaultdict
from os import PathLike
from typing import Any

import numpy as np

from cloudnetpy import output
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.constants import MM_TO_M, SEC_IN_HOUR
from cloudnetpy.exceptions import DisdrometerDataError, ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.toa5 import read_toa5

from .common import ATTRIBUTES, Disdrometer

TELEGRAM4 = [
    (1, "_serial_number"),
    (2, "_software_version"),
    (3, "_date"),
    (4, "_time"),
    (5, "_synop_5min_ww"),
    (6, "_synop_5min_WaWa"),
    (7, "_metar_5min_4678"),
    (8, "_rainfall_rate_5min"),
    (9, "synop_WW"),  # 1min
    (10, "synop_WaWa"),  # 1min
    (11, "_metar_1_min_4678"),
    (12, "rainfall_rate_1min_total"),
    (13, "rainfall_rate"),  # liquid, mm h-1
    (14, "rainfall_rate_1min_solid"),
    (15, "_precipition_amount"),  # mm
    (16, "visibility"),
    (17, "radar_reflectivity"),
    (18, "measurement_quality"),
    (19, "maximum_hail_diameter"),
    (20, "status_laser"),
    (21, "static_signal"),
    (22, "status_T_laser_analogue"),
    (23, "status_T_laser_digital"),
    (24, "status_I_laser_analogue"),
    (25, "status_I_laser_digital"),
    (26, "status_sensor_supply"),
    (27, "status_laser_heating"),
    (28, "status_receiver_heating"),
    (29, "status_temperature_sensor"),
    (30, "status_heating_supply"),
    (31, "status_heating_housing"),
    (32, "status_heating_heads"),
    (33, "status_heating_carriers"),
    (34, "status_laser_power"),
    (35, "_status_reserve"),
    (36, "T_interior"),
    (37, "T_laser_driver"),  # 0-80 C
    (38, "I_mean_laser"),
    (39, "V_control"),  # mV 4005-4015
    (40, "V_optical_output"),  # mV 2300-6500
    (41, "V_sensor_supply"),  # 1/10V
    (42, "I_heating_laser_head"),  # mA
    (43, "I_heating_receiver_head"),  # mA
    (44, "T_ambient"),  # C
    (45, "_V_heating_supply"),
    (46, "_I_housing"),
    (47, "_I_heating_heads"),
    (48, "_I_heating_carriers"),
    (49, "n_particles"),
]


[docs] def thies2nc( disdrometer_file: str, output_file: str, site_meta: dict, uuid: str | None = None, date: str | datetime.date | None = None, ) -> str: """Converts Thies-LNM disdrometer data into Cloudnet Level 1b netCDF file. Args: disdrometer_file: Filename of disdrometer .log file. output_file: Output filename. site_meta: Dictionary containing information about the site. Required key is `name`. uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD. Returns: UUID of the generated file. Raises: DisdrometerDataError: Timestamps do not match the expected date, or unable to read the disdrometer file. Examples: >>> from cloudnetpy.instruments import thies2nc >>> site_meta = {'name': 'Lindenberg', 'altitude': 104, 'latitude': 52.2, 'longitude': 14.1} >>> uuid = thies2nc('thies-lnm.log', 'thies-lnm.nc', site_meta) """ if isinstance(date, str): date = datetime.date.fromisoformat(date) try: disdrometer = Thies(disdrometer_file, site_meta, date) except (ValueError, IndexError) as err: msg = "Unable to read disdrometer file" raise DisdrometerDataError(msg) from err disdrometer.sort_timestamps() disdrometer.remove_duplicate_timestamps() disdrometer.add_meta() disdrometer.convert_units() attributes = output.add_time_attribute(ATTRIBUTES, disdrometer.date) output.update_attributes(disdrometer.data, attributes) return output.save_level1b(disdrometer, output_file, uuid)
class Thies(Disdrometer): def __init__( self, filename: str | PathLike, site_meta: dict, expected_date: datetime.date | None = None, ): super().__init__() self.instrument = instruments.THIES self.n_velocity = 20 self.n_diameter = 22 self.site_meta = site_meta self.raw_data: dict[str, Any] = defaultdict(list) self._read_data(filename) self._screen_time(expected_date) self.data = {} self._append_data() self._create_velocity_vectors() self._create_diameter_vectors() def convert_units(self) -> None: mmh_to_ms = SEC_IN_HOUR / MM_TO_M c_to_k = 273.15 self._convert_data(("rainfall_rate_1min_total",), mmh_to_ms) self._convert_data(("rainfall_rate",), mmh_to_ms) self._convert_data(("rainfall_rate_1min_solid",), mmh_to_ms) self._convert_data(("diameter", "diameter_spread", "diameter_bnds"), 1e3) self._convert_data(("V_sensor_supply",), 10) self._convert_data(("I_mean_laser",), 100) self._convert_data(("T_interior",), c_to_k, method="add") self._convert_data(("T_ambient",), c_to_k, method="add") self._convert_data(("T_laser_driver",), c_to_k, method="add") def _read_data(self, filename: str | PathLike) -> None: with open(filename) as file: first_line = file.readline() if "TOA5" in first_line: units, process, rows = read_toa5(filename) for row in rows: self._read_line(row["RawString"], row["TIMESTAMP"]) else: with open(filename) as file: for line in file: self._read_line(line) if len(self.raw_data["time"]) == 0: raise ValidTimeStampError for key, value in self.raw_data.items(): array = np.array(value) if key == "time": array = array.astype("datetime64[s]") self.raw_data[key] = array def _append_data(self) -> None: for key, values in self.raw_data.items(): if key.startswith("_"): continue name_out = key values_out = values match key: case "spectrum": name_out = "data_raw" dimensions = ["time", "diameter", "velocity"] case "time": dimensions = [] base = values[0].astype("datetime64[D]") values_out = (values - base) / np.timedelta64(1, "h") case _: dimensions = ["time"] self.data[name_out] = CloudnetArray( values_out, name_out, dimensions=dimensions ) first_id = self.raw_data["_serial_number"][0] for sensor_id in self.raw_data["_serial_number"]: if sensor_id != first_id: msg = "Multiple serial numbers are not supported" raise DisdrometerDataError(msg) self.serial_number = first_id def _read_line(self, line: str, timestamp: datetime.datetime | None = None): raw_values = line.split(";") # Support custom truncated format used in Leipzig LIM. expected_columns = self.site_meta.get("truncate_columns", 521) if len(raw_values) != expected_columns: return for i, key in TELEGRAM4: if i >= expected_columns - 1: break value: Any if key == "_date": value = _parse_date(raw_values[i]) elif key == "_time": value = _parse_time(raw_values[i]) elif key in ( "I_heating", "T_ambient", "T_interior", "T_laser_driver", "V_power_supply", "_precipition_amount", "_rainfall_rate_5min", "maximum_hail_diameter", "radar_reflectivity", "rainfall_rate", "rainfall_rate_1min_solid", "rainfall_rate_1min_total", ): value = float(raw_values[i]) elif key in ( "_serial_number", "_software_version", "_metar_5min_4678", "_metar_1_min_4678", ): value = raw_values[i] else: value = int(raw_values[i]) self.raw_data[key].append(value) if expected_columns > 79: self.raw_data["spectrum"].append( np.array(list(map(int, raw_values[79:-2])), dtype="i2").reshape( self.n_diameter, self.n_velocity ) ) if timestamp is not None: self.raw_data["time"].append(timestamp) else: self.raw_data["time"].append( datetime.datetime.combine( self.raw_data["_date"][-1], self.raw_data["_time"][-1] ) ) def _screen_time(self, expected_date: datetime.date | None = None) -> None: if expected_date is None: self.date = self.raw_data["time"][0].astype(object).date() return self.date = expected_date valid_mask = self.raw_data["time"].astype("datetime64[D]") == self.date if np.count_nonzero(valid_mask) == 0: msg = f"No data found on {expected_date}" raise DisdrometerDataError(msg) for key in self.raw_data: self.raw_data[key] = self.raw_data[key][valid_mask] def _create_velocity_vectors(self) -> None: n_values = [5, 6, 7, 1, 1] spreads = [0.2, 0.4, 0.8, 1, 10] self.store_vectors(n_values, spreads, "velocity") def _create_diameter_vectors(self) -> None: n_values = [3, 6, 13] spreads = [0.125, 0.25, 0.5] self.store_vectors(n_values, spreads, "diameter", start=0.125) def _parse_date(date: str) -> datetime.date: day, month, year = map(int, date.split(".")) if year < 100: year += 2000 return datetime.date(year, month, day) def _parse_time(time: str) -> datetime.time: hour, minute, second = map(int, time.split(":")) return datetime.time(hour, minute, second)