Source code for playnano.processing.pipeline

"""Module containing the ProcessingPipeline class for AFMImageStack processing.

This module provides ProcessingPipeline, which runs a sequence of
mask/filter/method/plugin steps on an AFMImageStack. Each step's output is stored
in `stack.processed` (for filters) or `stack.masks` (for masks), and detailed
provenance (timestamps, parameters, step type, version info, keys) is recorded in
`stack.provenance["processing"]`. Environment metadata at pipeline start is recorded in
`stack.provenance["environment"]`.
"""

from __future__ import annotations

import importlib.metadata
import inspect
import logging
from collections import defaultdict
from typing import Any, Optional, Tuple

import numpy as np

from playnano.afm_stack import AFMImageStack
from playnano.utils.system_info import gather_environment_info
from playnano.utils.time_utils import utc_now_iso

logger = logging.getLogger(__name__)


def _get_plugin_version(fn) -> str | None:
    """
    Attempt to determine the package version for a plugin function.

    This inspects the module in which `fn` is defined, extracts the top-level
    package name, and returns its version via importlib.metadata. If any step
    fails (e.g., module not found, package not installed), returns None.

    Parameters
    ----------
    fn : callable
        The function object for which to infer the package version. Typically
        a plugin filter or similar user-provided function.

    Returns
    -------
    str or None
        The version string of the package containing `fn`, or None if it cannot
        be determined.
    """
    try:
        module = inspect.getmodule(fn)
        if module and hasattr(module, "__name__"):
            pkg_name = module.__name__.split(".")[0]
            return importlib.metadata.version(pkg_name)
    except Exception:
        return None


[docs] class ProcessingPipeline: """ Orchestrates a sequence of masking and filtering steps on an AFMImageStack. This pipeline records outputs and detailed provenance for each step. Each step is specified by a name and keyword arguments: - ``"clear"``: resets any active mask. - Mask steps: compute boolean masks stored in ``stack.masks[...]``. - Filter/method/plugin steps: apply to the current data (and mask if present), storing results in ``stack.processed[...]``. Provenance for each step, including index, name, parameters, timestamp, step type, version, keys, and summaries, is appended to ``stack.provenance["processing"]["steps"]``. Additionally, a mapping from step name to a list of snapshot keys is stored in ``stack.provenance["processing"]["keys_by_name"]``. The final processed array overwrites ``stack.data``, and environment metadata is captured once in ``stack.provenance["environment"]``. """ def __init__(self, stack: AFMImageStack) -> None: """ Initialize the processing pipeline with an AFMImageStack instance. Parameters ---------- stack : AFMImageStack The AFMImageStack instance to process. """ self.stack = stack self.steps: list[tuple[str, dict[str, Any]]] = []
[docs] def add_mask(self, mask_name: str, **kwargs) -> ProcessingPipeline: """ Add a masking step to the pipeline. Parameters ---------- mask_name : str The name of the registered mask function to apply. **kwargs Additional parameters passed to the mask function. Returns ------- ProcessingPipeline The pipeline instance (for method chaining). Notes ----- If a mask is currently active (i.e. not cleared), this new mask will be logically combined (ORed) with the existing one. """ self.steps.append((mask_name, kwargs)) return self
[docs] def add_filter(self, filter_name: str, **kwargs) -> ProcessingPipeline: """ Add a filter step to the pipeline. Parameters ---------- filter_name : str The name of the registered filter function to apply. **kwargs Additional keyword arguments for the filter function. Returns ------- ProcessingPipeline The pipeline instance (for method chaining). Notes ----- If a mask is currently active, the pipeline will attempt to use a masked version of the filter (from `MASK_FILTERS_MAP`) if available. Otherwise, the unmasked filter is applied to the whole dataset. """ self.steps.append((filter_name, kwargs)) return self
[docs] def clear_mask(self) -> ProcessingPipeline: """ Add a step to clear the current mask. Returns ------- ProcessingPipeline The pipeline instance (for method chaining). Notes ----- Calling this resets the masking state, so subsequent filters will be applied to the entire dataset unless a new mask is added. """ self.steps.append(("clear", {})) return self
[docs] def run(self) -> np.ndarray: """ Execute configured steps on the AFMImageStack, storing outputs and provenance. The pipeline iterates through all added masks, filters, and plugins in order, applying each to the current data. Masks are combined if multiple are applied before a filter. Each step's output is stored in `stack.processed` (filters) or `stack.masks` (masks), and a detailed provenance record is saved in `stack.provenance["processing"]`. Behavior -------- 1. Record or update environment metadata via ``gather_environment_info()`` into ``stack.provenance["environment"]``. 2. Reset previous processing provenance under ``stack.provenance["processing"]``, ensuring that keys ``"steps"`` (a list) and ``"keys_by_name"`` (a dictionary) exist and are cleared. 3. If not already present, snapshot the original data as ``"raw"`` in ``stack.processed``. 4. Iterate over ``self.steps`` in order (1-based index): - Resolve the step type via ``stack._resolve_step(step_name)``, which returns a tuple of the form (``step_type``, ``fn``). - Record a timestamp (from ``utc_now_iso()``), index, name, parameters, step type, function version (from ``fn.__version__`` or plugin lookup), and module name. - If ``step_type`` is ``"clear"``: - Reset the current mask to ``None``. - Record ``"mask_cleared": True`` in the provenance entry. - If ``step_type`` is ``"mask"``: - Call ``stack._execute_mask_step(fn, arr, **kwargs)`` to compute a boolean mask array. - If there is no existing mask, store it under a new key ``step_<idx>_<mask_name>`` in ``stack.masks``. - Otherwise, overlay it with the previous mask (logical OR) under a derived key. - Update the current mask and record ``"mask_key"`` and ``"mask_summary"`` in provenance. - Else (filter/method/plugin): - Call ``stack._execute_filter_step(fn, arr, mask, step_name, **kwargs)`` to obtain the new array. - Store the result under ``stack.processed["step_<idx>_<safe_name>"]`` and update ``arr``. - Record ``"processed_key"`` and ``"output_summary"`` in provenance. 5. After all steps, overwrite ``stack.data`` with ``arr``. 6. Build ``stack.provenance["processing"]["keys_by_name"]``, mapping each step name to the list of stored keys (``processed_key`` or ``mask_key``) in order. 7. Return the final processed array. Returns ------- np.ndarray The final processed data array, now also stored in `stack.data`. Raises ------ RuntimeError If a step cannot be resolved or executed due to misconfiguration. ValueError If overlaying a mask fails due to missing previous mask key (propagated). Exception Any exception raised by a step function is logged and re-raised. Notes ----- - The method ensures a raw copy of the original stack exists under `stack.processed["raw"]`. - Mask steps may be overlaid with previous masks using logical OR. - Non-drop_frames stack_edit steps automatically delegate to drop_frames to maintain provenance consistency. """ self._prepare_environment_and_provenance() arr = self._snapshot_raw_data() mask = None for step_idx, (step_name, kwargs) in enumerate(self.steps, start=1): arr, mask = self._run_single_step(step_idx, step_name, kwargs, arr, mask) self.stack.data = arr self._finalize_provenance() logger.info("Processing pipeline completed successfully.") return arr
# ------------------------------------------------------------------------- # Core setup and teardown helpers # ------------------------------------------------------------------------- def _prepare_environment_and_provenance(self) -> None: """ Record environment metadata and reset processing provenance. This method overwrites the previous environment information and clears all processing history under `stack.provenance["processing"]`. Side Effects ------------ - Modifies `stack.provenance["environment"]`. - Clears `stack.provenance["processing"]["steps"]`. - Clears `stack.provenance["processing"]["keys_by_name"]`. """ env = gather_environment_info() self.stack.provenance["environment"] = env proc_prov = self.stack.provenance["processing"] proc_prov["steps"].clear() proc_prov["keys_by_name"].clear() def _snapshot_raw_data(self) -> np.ndarray: """ Ensure a copy of the original stack data exists. If 'raw' is not present in `stack.processed`, this method stores a copy of the current stack data for provenance purposes. Returns ------- np.ndarray A reference to the current stack data array for processing. Side Effects ------------ - Modifies `stack.processed["raw"]` if it does not already exist. """ if "raw" not in self.stack.processed: self.stack.processed["raw"] = self.stack.data.copy() return self.stack.data def _finalize_provenance(self) -> None: """ Populate `keys_by_name` mapping for processed and mask keys. This method iterates over all recorded steps in `stack.provenance["processing"]["steps"]` and builds a dictionary mapping each step name to a list of processed or mask keys in order of execution. Side Effects ------------ - Modifies `stack.provenance["processing"]["keys_by_name"]`. """ keys_by_name: dict[str, list[str]] = defaultdict(list) for rec in self.stack.provenance["processing"]["steps"]: if key := rec.get("processed_key") or rec.get("mask_key"): keys_by_name[rec["name"]].append(key) self.stack.provenance["processing"]["keys_by_name"] = dict(keys_by_name) # ------------------------------------------------------------------------- # Step dispatch and resolution # ------------------------------------------------------------------------- def _run_single_step( self, step_idx: int, step_name: str, kwargs: dict[str, Any], arr: np.ndarray, mask: Optional[np.ndarray], ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Execute a single pipeline step based on its resolved type. Parameters ---------- step_idx : int 1-based index of the current step. step_name : str Name of the step to execute. kwargs : dict Keyword arguments to pass to the step function. arr : np.ndarray Current working array. mask : np.ndarray or None Current mask array or None. Returns ------- tuple Tuple of (updated array, updated mask). Mask is None for non-mask steps. Notes ----- - Resolves the step type using `stack._resolve_step`. - Logs execution start and provenance information. - Delegates execution to specialized `_handle_*` functions based on type. """ logger.info(f"[processing] Step {step_idx}: '{step_name}' with args {kwargs}") step_record = self._init_step_record(step_idx, step_name, kwargs) step_type, fn = self._resolve_step_with_logging(step_idx, step_name) step_record["step_type"] = step_type step_record["version"] = self._get_step_version(fn, step_type) step_record["function_module"] = getattr(fn, "__module__", None) if step_type == "clear": return self._handle_clear_step(step_record, arr) elif step_type == "mask": return self._handle_mask_step( step_idx, step_name, fn, arr, mask, step_record ) elif step_type in {"filter", "method", "plugin"}: return self._handle_filter_step( step_idx, step_name, fn, arr, mask, step_record, kwargs ) elif step_type == "video_filter": return self._handle_video_filter_step( step_idx, step_name, fn, arr, step_record, kwargs ) elif step_type == "stack_edit": return self._handle_stack_edit_step( step_idx, step_name, fn, arr, step_record, kwargs ) else: logger.warning(f"Unrecognized step_type '{step_type}' for {step_name}") return arr, mask def _init_step_record( self, step_idx: int, step_name: str, kwargs: dict[str, Any] ) -> dict[str, Any]: """ Initialize a provenance dictionary for a processing step. Parameters ---------- step_idx : int The 1-based step index. step_name : str Name of the step. kwargs : dict Parameters passed to the step. Returns ------- dict A dictionary with fields 'index', 'name', 'params', and 'timestamp'. """ return { "index": step_idx, "name": step_name, "params": kwargs, "timestamp": utc_now_iso(), } def _resolve_step_with_logging(self, step_idx: int, step_name: str): """ Resolve a processing step to its type and callable, with error logging. This method attempts to determine what kind of processing step is requested (mask, filter, method, plugin, video filter, stack edit), and returns the step type along with a callable implementing it. If the step cannot be resolved, the original exception is logged and re-raised, preserving its type. Parameters ---------- step_idx : int The 1-based index of the step within the pipeline. step_name : str Name of the step to resolve. Returns ------- tuple[str, callable | None] - step type: one of "clear", "mask", "method", "plugin", "filter", "video_filter", "stack_edit" - callable implementing the step, or None if step_type == "clear" Raises ------ ValueError If the step name is not recognized among masks, filters, methods, plugins, video filters, or stack edits. """ try: return self.stack._resolve_step(step_name) except Exception as e: # Log the failure for debugging logger.error(f"Failed to resolve step {step_idx}: {step_name}: {e}") # Re-raise the original exception so that its type is preserved raise def _get_step_version(self, fn, step_type: str) -> Optional[str]: """ Get the version of a step function or plugin. Parameters ---------- fn : callable Step function. step_type : str Step type ('plugin' triggers special lookup). Returns ------- str or None Version string if available. """ version = getattr(fn, "__version__", None) if version is None and step_type == "plugin": version = _get_plugin_version(fn) return version # ------------------------------------------------------------------------- # Step handlers (all with detailed docstrings) # ------------------------------------------------------------------------- def _handle_clear_step( self, step_record: dict[str, Any], arr: np.ndarray ) -> Tuple[np.ndarray, None]: """ Handle a 'clear' step in the processing pipeline safely. This step clears the current mask but does not modify the working array. Returns the array currently being processed (arr), not stack.data. Parameters ---------- step_record : dict Provenance dictionary for this step. arr : np.ndarray Current working array in the pipeline. Returns ------- tuple (arr, None) - arr : np.ndarray The array currently being processed (unchanged). - None : Indicates that the mask has been cleared. Side Effects ------------ - Appends the step record to `stack.provenance["processing"]["steps"]`. """ step_record["mask_cleared"] = True self._record_step(step_record) return arr, None def _handle_mask_step( self, step_idx: int, step_name: str, fn, arr: np.ndarray, mask: Optional[np.ndarray], step_record: dict[str, Any], ) -> Tuple[np.ndarray, np.ndarray]: """ Compute a new mask or overlay with previous masks. Parameters ---------- step_idx : int Index of the step. step_name : str Step name. fn : callable Mask-generating function. arr : np.ndarray Current array. mask : np.ndarray or None Current mask. step_record : dict Step provenance dictionary. Returns ------- tuple (arr, new_mask) Side Effects ------------ - Updates `stack.masks` with new mask. - Updates step record with 'mask_key' and 'mask_summary'. """ new_mask = self.stack._execute_mask_step(fn, arr, **step_record["params"]) if mask is None: key = f"step_{step_idx}_{step_name}" self.stack.masks[key] = new_mask.copy() else: combined = np.logical_or(mask, new_mask) try: last_mask_key = list(self.stack.masks)[-1] last_mask_part = "_".join(last_mask_key.split("_")[2:]) except IndexError: last_mask_part = "overlay" logger.warning( "No previous mask found when overlaying; using 'overlay'" ) key = f"step_{step_idx}_{last_mask_part}_{step_name}" self.stack.masks[key] = combined.copy() new_mask = combined step_record["mask_key"] = key step_record["mask_summary"] = { "shape": new_mask.shape, "dtype": str(new_mask.dtype), } self._record_step(step_record) return arr, new_mask def _handle_filter_step( self, step_idx: int, step_name: str, fn, arr: np.ndarray, mask: Optional[np.ndarray], step_record: dict[str, Any], kwargs: dict[str, Any], ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Execute a 'filter', 'method', or 'plugin' step. Parameters ---------- step_idx : int step_name : str fn : callable arr : np.ndarray mask : np.ndarray or None step_record : dict kwargs : dict Returns ------- tuple (updated array, mask unchanged) """ try: new_arr = self.stack._execute_filter_step( fn, arr, mask, step_name, **kwargs ) except Exception as e: logger.error(f"Failed to apply filter '{step_name}': {e}") raise safe_name = step_name.replace(" ", "_") proc_key = f"step_{step_idx}_{safe_name}" self.stack.processed[proc_key] = new_arr.copy() step_record["processed_key"] = proc_key step_record["output_summary"] = { "shape": new_arr.shape, "dtype": str(new_arr.dtype), } self._record_step(step_record) return new_arr, mask def _handle_video_filter_step( self, step_idx: int, step_name: str, fn, arr: np.ndarray, step_record: dict[str, Any], kwargs: dict[str, Any], ) -> Tuple[np.ndarray, Optional[dict]]: """ Execute a 'video_filter' step, producing a new array for all frames. Parameters ---------- step_idx : int Index of the step within the pipeline. step_name : str Name of the filter function being executed. fn : callable The filter function to apply. arr : np.ndarray Input 3D image stack or 2D frame array. step_record : dict Provenance record to update. kwargs : dict Additional keyword arguments for the filter function. Returns ------- new_arr : np.ndarray The filtered array. metadata : dict or None Metadata returned by the filter function, if available. """ try: result = self.stack._execute_video_processing_step(fn, arr, **kwargs) if isinstance(result, tuple) and len(result) == 2: new_arr, metadata = result else: new_arr, metadata = result, {} except Exception as e: logger.error(f"Video filter '{step_name}' failed: {e}") raise proc_key = f"step_{step_idx}_{step_name}" self.stack.processed[proc_key] = new_arr.copy() step_record["processed_key"] = proc_key step_record["metadata"] = metadata step_record["output_summary"] = { "shape": new_arr.shape, "dtype": str(new_arr.dtype), } self._record_step(step_record) return new_arr, metadata def _handle_stack_edit_step( self, step_idx: int, step_name: str, fn, arr: np.ndarray, step_record: dict[str, Any], kwargs: dict[str, Any], ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Execute a 'stack_edit' step. If the step is not 'drop_frames', delegates to 'drop_frames' to maintain consistent processing provenance. Parameters ---------- step_idx : int step_name : str fn : callable arr : np.ndarray step_record : dict kwargs : dict Returns ------- tuple (arr after edit, None) Raises ------ TypeError If a non-drop_frames stack_edit does not return a list or array of indices. """ try: if step_name == "drop_frames": new_arr = self.stack._execute_stack_edit_step(fn, arr, **kwargs) delegated_to = None else: indices_to_drop = fn(self.stack.data, **kwargs) if not isinstance(indices_to_drop, (list, np.ndarray)): raise TypeError( f"Stack edit '{step_name}' must return list or array of indices, " # noqa f"got {type(indices_to_drop).__name__}" ) delegated_to = "drop_frames" drop_fn = self.stack._resolve_step(delegated_to)[1] new_arr = self.stack._execute_stack_edit_step( drop_fn, arr, indices_to_drop=indices_to_drop ) except Exception as e: logger.error(f"Stack edit '{step_name}' failed: {e}") raise proc_key = f"step_{step_idx}_drop_frames" self.stack.processed[proc_key] = new_arr.copy() step_record["processed_key"] = proc_key step_record["output_summary"] = { "stack_edit_function_used": step_name, "delegated_to": delegated_to, "shape": new_arr.shape, "dtype": str(new_arr.dtype), } self._record_step(step_record) return new_arr, None # ------------------------------------------------------------------------- # Provenance recording # ------------------------------------------------------------------------- def _record_step(self, step_record: dict[str, Any]) -> None: """ Append a step record to the processing provenance. Parameters ---------- step_record : dict Metadata dictionary describing the executed step. Side Effects ------------ - Updates `stack.provenance["processing"]["steps"]`. """ self.stack.provenance["processing"]["steps"].append(step_record)