"""Module for creating classification file."""
import numpy as np
from numpy import ma
from cloudnetpy import output, utils
from cloudnetpy.categorize import atmos
from cloudnetpy.datasource import DataSource
from cloudnetpy.metadata import MetaData
from cloudnetpy.products.product_tools import CategorizeBits
[docs]
def generate_classification(
categorize_file: str,
output_file: str,
uuid: str | None = None,
) -> str:
"""Generates Cloudnet classification product.
This function reads the initial classification masks from a
categorize file and creates a more comprehensive classification
for different atmospheric targets. The results are written in a
netCDF file.
Args:
categorize_file: Categorize file name.
output_file: Output file name.
uuid: Set specific UUID for the file.
Returns:
str: UUID of the generated file.
Examples:
>>> from cloudnetpy.products import generate_classification
>>> generate_classification('categorize.nc', 'classification.nc')
"""
with DataSource(categorize_file) as product_container:
categorize_bits = CategorizeBits(categorize_file)
classification = _get_target_classification(categorize_bits)
product_container.append_data(classification, "target_classification")
status = _get_detection_status(categorize_bits)
product_container.append_data(status, "detection_status")
bases, tops = _get_cloud_base_and_top_heights(classification, product_container)
product_container.append_data(bases, "cloud_base_height_amsl")
product_container.append_data(tops, "cloud_top_height_amsl")
product_container.append_data(
bases - product_container.altitude,
"cloud_base_height_agl",
)
product_container.append_data(
tops - product_container.altitude,
"cloud_top_height_agl",
)
date = product_container.get_date()
attributes = output.add_time_attribute(CLASSIFICATION_ATTRIBUTES, date)
output.update_attributes(product_container.data, attributes)
return output.save_product_file(
"classification",
product_container,
output_file,
uuid,
)
def _get_target_classification(
categorize_bits: CategorizeBits,
) -> ma.MaskedArray:
bits = categorize_bits.category_bits
clutter = categorize_bits.quality_bits["clutter"]
classification = ma.zeros(bits["cold"].shape, dtype=int)
classification[bits["droplet"] & ~bits["falling"]] = 1 # Cloud droplets
classification[~bits["droplet"] & bits["falling"]] = 2 # Drizzle or rain
classification[
bits["droplet"] & bits["falling"]
] = 3 # Drizzle or rain and droplets
classification[~bits["droplet"] & bits["falling"] & bits["cold"]] = 4 # ice
classification[
bits["droplet"] & bits["falling"] & bits["cold"]
] = 5 # ice + supercooled
classification[bits["melting"]] = 6 # melting layer
classification[bits["melting"] & bits["droplet"]] = 7 # melting + droplets
classification[bits["aerosol"]] = 8 # aerosols
classification[bits["insect"] & ~clutter] = 9 # insects
classification[
bits["aerosol"] & bits["insect"] & ~clutter
] = 10 # insects + aerosols
classification[clutter & ~bits["aerosol"]] = 0
return classification
def _get_detection_status(categorize_bits: CategorizeBits) -> np.ndarray:
bits = categorize_bits.quality_bits
status = np.zeros(bits["radar"].shape, dtype=int)
status[bits["lidar"] & ~bits["radar"]] = 1
status[bits["radar"] & bits["lidar"]] = 3
status[~bits["radar"] & bits["attenuated"] & ~bits["corrected"]] = 4
status[bits["radar"] & ~bits["lidar"] & ~bits["attenuated"]] = 5
status[~bits["radar"] & bits["attenuated"] & bits["corrected"]] = 6
status[bits["radar"] & bits["corrected"]] = 7
status[bits["radar"] & bits["attenuated"] & ~bits["corrected"]] = 2
status[bits["clutter"]] = 8
status[bits["molecular"] & ~bits["radar"]] = 9
return status
def _get_cloud_base_and_top_heights(
classification: np.ndarray,
product_container: DataSource,
) -> tuple[np.ndarray, np.ndarray]:
height = product_container.getvar("height")
cloud_mask = _find_cloud_mask(classification)
if not cloud_mask.any():
return ma.masked_all(cloud_mask.shape[0]), ma.masked_all(cloud_mask.shape[0])
lowest_bases = atmos.find_lowest_cloud_bases(cloud_mask, height)
highest_tops = atmos.find_highest_cloud_tops(cloud_mask, height)
if not (highest_tops - lowest_bases >= 0).all():
msg = "Cloud base higher than cloud top!"
raise ValueError(msg)
return lowest_bases, highest_tops
def _find_cloud_mask(classification: np.ndarray) -> np.ndarray:
cloud_mask = np.zeros(classification.shape, dtype=int)
for value in [1, 3, 4, 5]:
cloud_mask[classification == value] = 1
return cloud_mask
COMMENTS = {
"target_classification": (
"\n"
"This variable provides the main atmospheric target classifications\n"
"that can be distinguished by radar and lidar."
),
"detection_status": (
"\n"
"This variable reports on the reliability of the radar and lidar data\n"
"used to perform the classification."
),
}
DEFINITIONS = {
"target_classification": utils.status_field_definition(
{
0: "Clear sky.",
1: "Cloud liquid droplets only.",
2: "Drizzle or rain.",
3: "Drizzle or rain coexisting with cloud liquid droplets.",
4: "Ice particles.",
5: "Ice coexisting with supercooled liquid droplets.",
6: "Melting ice particles.",
7: "Melting ice particles coexisting with cloud liquid droplets.",
8: "Aerosol particles, no cloud or precipitation.",
9: "Insects, no cloud or precipitation.",
10: "Aerosol coexisting with insects, no cloud or precipitation.",
}
),
"detection_status": utils.status_field_definition(
{
0: """Clear sky.""",
1: """Lidar echo only.""",
2: """Radar echo but reflectivity may be unreliable as attenuation
by rain, melting ice or liquid cloud has not been
corrected.""",
3: """Good radar and lidar echos.""",
4: """No radar echo but rain or liquid cloud beneath mean that
attenuation that would be experienced is unknown.""",
5: """Good radar echo only.""",
6: """No radar echo but known attenuation.""",
7: """Radar echo corrected for liquid attenuation using microwave
radiometer data.""",
8: """Radar ground clutter.""",
9: """Lidar clear-air molecular scattering.""",
}
),
}
CLASSIFICATION_ATTRIBUTES = {
"target_classification": MetaData(
long_name="Target classification",
comment=COMMENTS["target_classification"],
definition=DEFINITIONS["target_classification"],
units="1",
),
"detection_status": MetaData(
long_name="Radar and lidar detection status",
comment=COMMENTS["detection_status"],
definition=DEFINITIONS["detection_status"],
units="1",
),
"cloud_top_height_amsl": MetaData(
long_name="Height of cloud top above mean sea level",
units="m",
),
"cloud_base_height_amsl": MetaData(
long_name="Height of cloud base above mean sea level",
units="m",
),
"cloud_top_height_agl": MetaData(
long_name="Height of cloud top above ground level",
units="m",
),
"cloud_base_height_agl": MetaData(
long_name="Height of cloud base above ground level",
units="m",
),
}