Source code for eoio.processors.processor_pipeline
"""eoio.processors.processor_pipeline
Returns
-------
xarray.Dataset
Processed xarray Dataset.
Raises
------
ProcessorPipelineError
Raised when invalid processor specified.
Raised when error occurred in the specified processors during the run.
"""
from typing import Any, Dict
from eoio.processors.registry import PROCESSOR_REGISTRY
import xarray as xr
from processor_tools import Context
import logging
logger = logging.getLogger(__name__)
[docs]
class ProcessorPipelineError(ValueError):
"""
Raised when a processor is invalid or fails during execution.
"""
[docs]
class ProcessorPipeline:
"""
Sequential processor pipeline that applies registered EOIO processors
to an xarray.Dataset.
The pipeline performs the following steps:
- validates that the requested processors exist in the registry
- instantiates them with their respective parameters
- runs each processor in sequence
Parameters
--------------------
:param processor_params:
Mapping of processor names to their specific parameter dictionaries.
The on_missing parameter can be used to specify how to handle errors for each processor, supported values are:
- "error" (default): processor pipeline is stopped returning most recent successful processed dataset
- "skip": processor pipeline omits processor when an error occurs
Keys must exist in `PROCESSOR_REGISTRY`.
:param context:
Processing context (reader info, metadata view, logger, etc.).
Example
-------
processor_params = {
"processor_name_0": {param_0:..., param_1:...., "on_missing":"skip"},
"processor_name_1": {param_0:..., param_1:...., "on_missing":"error"}
}
context = {...}
processed_ds = ProcessorPipeline(processor_params: Dict[str, Dict[str, Any]], context: Dict[str, Any]).run(ds: xr.Dataset)
"""
[docs]
def __init__(
self,
processor_params: Dict[str, Dict[str, Any]],
context: Dict[str, Any] | Context,
) -> None:
self.processor_params: Dict[str, Dict[str, Any]] = processor_params
self.context: Dict[str, Any] = context
self.processors: list[Any] = []
self.registry_lc = {name.lower(): name for name in PROCESSOR_REGISTRY}
self._validate_processor_names()
self._instantiate_processors()
def _validate_processor_names(self) -> None:
"""Ensure all referenced processors exist in the registry."""
for processor_name in self.processor_params:
if processor_name.lower() not in self.registry_lc:
raise ProcessorPipelineError(
f"Unknown processor: {processor_name}, available processors are {sorted(PROCESSOR_REGISTRY.keys())}"
)
def _instantiate_processors(self) -> None:
"""Create processor instances with parameters."""
for processor_name in self.processor_params:
registry_name = self.registry_lc[processor_name.lower()]
cls = PROCESSOR_REGISTRY[registry_name]
processor_instance = cls(self.processor_params[processor_name], self.context) # init processor
self.processors.append(processor_instance)
[docs]
def run(self, ds: xr.Dataset) -> xr.Dataset:
"""
Run all processors sequentially.
:param ds:
Input dataset.
:return:
The processed xarray dataset.
:raises ProcessorPipelineError:
If a processor fails and its `on_missing` is `"error"`.
"""
logger.info(msg="Starting processor pipeline...")
current_ds = ds
processor_names = list(self.processor_params)
for i, processor in enumerate(self.processors, start=0):
try:
logger.info(f"Running processor {i + 1}: {processor_names[i]}")
current_ds = processor.run(current_ds)
except Exception as e:
params = getattr(processor, "params", {}) # check if params exists
on_missing = str(object=params.get("on_missing", "error")).lower() # "error" | "skip"
logger.error(msg=f"Error in processor {processor_names[i]}: {e}")
if on_missing == "skip":
logger.warning(msg=f"Skipping processor {processor_names[i]} (index {i}) due to error: {e}")
# Continue without altering current_ds
continue
# Default behaviour: raise a pipeline error
raise ProcessorPipelineError(f"Error in {processor_names[i]}: {e}") from e
logger.info("Pipeline completed successfully.")
return current_ds