Source code for eoio.readers.landsat.data_io
"""eoio.readers.landsat.data_io - data reading functionality for Landsat meas vars"""
from __future__ import annotations
from typing import Dict, List, Optional
import warnings
import numpy as np
import xarray as xr
from eoio.deps import lazy_rioxarray
from eoio.readers.landsat.layout import LandsatLayout
from eoio.readers.subset.roi_subset import ResolvedROISubset
from eoio.readers.landsat.metadata import LSMetadataExtractor
from eoio.utils.rasterio_utils import suggest_raster_chunks
[docs]
def read_bands_into_dataset(
*,
ds: xr.Dataset,
layout: LandsatLayout,
meas_vars: List[str],
subset: None | ResolvedROISubset,
mtd: LSMetadataExtractor,
chunks: Optional[Dict[str, int]] = None,
use_chunks: bool = False,
mask_zero_as_nodata: bool = True,
) -> xr.Dataset:
"""
Reads Landsat bands into the dataset.
Minimal example:
- Opens each TIF file via rioxarray.
- Applies clip_box/geometries if present.
- Stores each band as ds[varname].
:param ds: xarray.Dataset to populate.
:param layout: LandsatLayout object defining the product structure.
:param meas_vars: List of measurement variables to read, e.g. ["B2", "B3"].
:param meas_var_res: Dictionary mapping measurement variables to their resolutions.
:param subset: Subset information for ROI and geometries.
:param chunks:
Optional mapping of dimension names to chunk sizes passed directly to
``rioxarray.open_rasterio``. If provided, this overrides any
automatically derived chunk sizes.
:param use_chunks:
If ``True`` and ``chunks`` is not provided, enable Dask-backed lazy
reading using block-aligned chunk sizes inferred from the raster
metadata.
:param mask_zero_as_nodata:
If ``True``, values equal to zero are masked as missing data after
scaling.
:return: Populated xarray.Dataset.
"""
# lazy import of rioxarray
rxr = lazy_rioxarray()
# get band paths from layout
band_paths = layout.tif_band_files(meas_vars)
# set up chunking if requested
if use_chunks and chunks is None:
first_path = next(iter(band_paths.values()))
chunks = suggest_raster_chunks(str(first_path), target_mb=32.0)
# loop through bands, open variable and attach to ds
for band, path in band_paths.items():
# unpack required variable metadata
band_mtd = mtd.get_variable_product_metadata(band)
# read band data
da = rxr.open_rasterio(path, chunks=chunks)
if "band" in da.dims:
da = da.isel(band=0, drop=True)
da = da.astype("float32")
# set no data as nan
try:
encoded_nodata = da.rio.encoded_nodata
except Exception:
encoded_nodata = None
if encoded_nodata is not None:
da = da.where(da != encoded_nodata)
if mask_zero_as_nodata:
da = da.where(da != 0)
# apply subset clipping if provided
if subset is not None:
if subset.clip_box:
x_min, y_min, x_max, y_max = subset.clip_box
da = da.rio.clip_box(x_min, y_min, x_max, y_max)
if subset.geometries:
da = da.rio.clip(subset.geometries, from_disk=True)
# Apply radiometric scaling to reflectance for visible bands, and radiance for thermal bands
if band in ["B10", "B11"]:
# Radiance (float) = (RADIANCE_MULT_BAND_x * DN) + RADIANCE_ADD_BAND_x
da *= band_mtd.get("radiance_mult", 1.0)
da += band_mtd.get("radiance_add", 0.0)
else:
# Reflectance (float) = ((REFLECTANCE_MULT_BAND_x * DN) + REFLECTANCE_ADD_BAND_x)/sin(SUN_ELEVATION)
da *= band_mtd.get("reflectance_mult", 1.0)
da += band_mtd.get("reflectance_add", 0.0)
# try per-pixel solar angles if available & if not in ds, use sun_elevation from metadata
if "solar_zenith_angle" in ds:
# todo: handle case where grid coords do not align - for non-30m band
try:
da /= np.sin(np.deg2rad(90.0 - ds["solar_zenith_angle"].values)) # per-pixel SZA
except Exception:
warnings.warn(
"Per-pixel solar zenith angle correction failed; using metadata sun elevation instead."
)
da /= np.sin(
np.deg2rad(mtd.get_product_metadata().get("sun_elevation")) # type: ignore[arg-type]
) # Perhaps a warning should be raised here as SZA used is less accurate as not per pixel ?
else:
warnings.warn(
"Per-pixel solar zenith angle correction not available; using metadata sun elevation instead."
)
da /= np.sin(np.deg2rad(mtd.get_product_metadata().get("sun_elevation"))) # type: ignore[arg-type]
# name dimensions according to geometry
geom = band_mtd.get("geometry_id")
if geom:
rename_map = {}
if "x" in da.dims:
rename_map["x"] = f"x_{geom}"
if "y" in da.dims:
rename_map["y"] = f"y_{geom}"
if rename_map:
da = da.rename(rename_map)
# Strip attributes attached by rioxarray clipping
for coord_name in da.coords:
if coord_name == "x" or coord_name == "y" or coord_name.startswith("x_") or coord_name.startswith("y_"):
da.coords[coord_name].attrs = {}
ds[band] = da
return ds
if __name__ == "__main__":
pass