Source code for playnano.analysis.pipeline

"""Module for the AnalysisPipeline class for orchastration of analysis workflows."""

import importlib.metadata as metadata
import logging
from collections import defaultdict
from typing import Any, Optional

from playnano.afm_stack import AFMImageStack
from playnano.analysis import BUILTIN_ANALYSIS_MODULES
from playnano.analysis.base import AnalysisModule
from playnano.analysis.utils.common import sanitize_analysis_for_logging
from playnano.processing.mask_generators import register_masking
from playnano.utils.system_info import gather_environment_info
from playnano.utils.time_utils import utc_now_iso

MASKING_FUNCS = register_masking()

logger = logging.getLogger(__name__)

AnalysisRecord = dict[str, Any]
"""
Structured output of an AnalysisPipeline run.

This record contains:
- environment : dict
    Metadata about the runtime environment (e.g. Python version, library versions).
- analysis : dict
    Results of each analysis module run, with keys 'step_<n>_<module_name>'.
- provenance : dict
    Metadata about the provenance of the analysis steps, with keys:
      - steps : list of dict
          Ordered list of executed analysis steps. Each entry contains:
            - index : int
                1-based index of the step in the pipeline.
            - name : str
                The name of the analysis module used.
            - params : dict
                Parameters passed to the module.
            - timestamp : str
                ISO 8601 UTC timestamp when the step was executed.
            - version : str or None
                Optional version string provided by the module instance.
            - analysis_key : str
                Key under which this step's outputs are stored in the `analysis` dict.
      - results_by_name : dict[str, list]
          Maps module names to lists of outputs from each occurrence.
      - frame_times : list[float] or None
          Timestamps for each frame in the stack, from `stack.get_frame_times()`,
          or None if unavailable.

Examples
--------
>>> pipeline = AnalysisPipeline()
>>> pipeline.add("feature_detection", threshold=5)
>>> record = pipeline.run(stack, log_to="out.json")
>>> # Access outputs:
>>> record["analysis"]["step_1_feature_detection"]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}
>>> # Inspect provenance:
>>> record["provenance"]["results_by_name"]["feature_detection"][0]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}
"""


[docs] class AnalysisPipeline: """ Orchestrates a sequence of analysis steps on an AFMImageStack. Each step corresponds to an AnalysisModule (built-in or entry-point), invoked in order with the given parameters. Outputs of each step are stored in `stack.analysis` under keys 'step_<n>_<module_name>'. Detailed provenance (timestamps, parameters, version, linking keys) is recorded in `stack.provenance["analysis"]`. The run() method returns a dict containing environment info, the `analysis` dict, and its `provenance`. """ def __init__(self): """ Initialize an empty analysis pipeline. Steps are stored as a list of (module_name, params) tuples. Modules are loaded on demand using an internal registry or entry points. """ # Each entry: (module_name: str, params: dict) self.steps: list[tuple[str, dict[str, Any]]] = [] # Cache instantiated modules: name -> instance self._module_cache: dict[str, AnalysisModule] = {}
[docs] def add(self, module_name: str, **params) -> None: """ Add an analysis module to the pipeline. Parameters ---------- module_name : str The name of the analysis module to add (must be registered). **params Keyword arguments passed to the module's `run()` method. Returns ------- None Examples -------- >>> pipeline.add("particle_detect", threshold=5, min_size=10) >>> pipeline.add("track_particles", max_jump=3) """ self.steps.append((module_name, params))
[docs] def clear(self) -> None: """ Remove all scheduled analysis steps and clear module cache. This allows reconfiguration of the pipeline without creating a new instance. """ self.steps.clear() self._module_cache.clear()
def _load_module(self, module_name: str) -> AnalysisModule: """ Load and instantiate an analysis module given its name. Modules are first looked up in a built-in registry, then via entry points registered under the group 'playnano.analysis'. Loaded modules are cached to avoid re-instantiation on repeated `run()` calls. Parameters ---------- module_name : str The name of the analysis module to load. Returns ------- AnalysisModule The loaded and initialized module instance. Raises ------ ValueError If the module name cannot be resolved from the registry or entry points. TypeError If the loaded module is not an instance of `AnalysisModule`. """ # Fast path: cached if module_name in self._module_cache: return self._module_cache[module_name] # 1) Internal registry cls = None try: cls = BUILTIN_ANALYSIS_MODULES[module_name] except Exception: cls = None # 2) Entry points (plugin mechanism) if cls is None: # Modern API available on Python 3.10–3.12 eps = metadata.entry_points().select( group="playnano.analysis", name=module_name ) ep = next(iter(eps), None) if ep is None: raise ValueError( f"Analysis module '{module_name}' not found in registry or entry points" # noqa ) try: cls = ep.load() except Exception as e: raise RuntimeError( f"Failed to load entry point for analysis module '{module_name}': {e}" # noqa ) from e # 3) Instantiate and type-check # First: check class/type without instantiating if not callable(cls): raise TypeError( f"Loaded analysis module '{module_name}' is not instantiable " f"(got {cls!r})" ) # Create the instance try: instance = cls() except Exception as e: raise RuntimeError( f"Failed to instantiate analysis module '{module_name}' ({cls}): {e}" ) from e # Now validate that instance is correct type if not isinstance(instance, AnalysisModule): raise TypeError( f"Loaded module, '{module_name}', is not an AnalysisModule subclass;" f"got {type(instance)!r}" ) # Cache and return self._module_cache[module_name] = instance return instance def _resolve_mask_fn(self, params: dict[str, Any]) -> dict[str, Any]: """ Resolve a string-based 'mask_fn' to its registered callable, if applicable. If `params["mask_fn"]` is a string matching a registered masking function, replaces it with the corresponding callable. Returns a shallow copy of `params` with the resolved function. Parameters ---------- params : dict of str to Any Dictionary of parameters passed to an analysis module. Must include 'mask_fn'. Returns ------- dict A shallow copy of `params` with 'mask_fn' resolved to a callable. Raises ------ ValueError If the provided string does not correspond to a registered masking function. """ mask_key = params["mask_fn"] if mask_key in MASKING_FUNCS: params = params.copy() params["mask_fn"] = MASKING_FUNCS[mask_key] return params else: raise ValueError( f"mask_fn '{mask_key}' not found in registered masking functions" )
[docs] def run(self, stack: AFMImageStack, log_to: Optional[str] = None) -> AnalysisRecord: """ Execute all added analysis steps on the given AFMImageStack. Each step: - is resolved to an AnalysisModule instance - invoked with `(stack, previous_results=..., **params)` - its outputs are stored under `stack.analysis["step_<n>_<module_name>"]` - provenance is recorded in `stack.provenance["analysis"]["steps"]` The overall provenance sub-dict also collects: - results_by_name: mapping module name to list of outputs - frame_times: result of `stack.get_frame_times()`, or None The environment info (via gather_environment_info) is stored at `stack.provenance["environment"]` (if not already set). Parameters ---------- stack : AFMImageStack The AFMImageStack to analyze. log_to : str, optional Path to a JSON file where the combined record will be saved. Returns ------- AnalysisRecord : dict { "environment": <dict of environment metadata>, "analysis": <dict of outputs per step>, "provenance": <dict with keys "steps", "results_by_name", "frame_times"> } Notes ----- - Raw outputs: accessible via `stack.analysis["step_<n>_<module_name>"]`. - Provenance: in `stack.provenance["analysis"]`, with a list of step records. - If stack.provenance or stack.analysis is absent, they are created. - If log_to is provided, the same record dict is JSON-dumped using NumpyEncoder. Raises ------ Exception Propagates any exception from module.run(...), after logging. Examples -------- >>> pipeline = AnalysisPipeline() >>> pipeline.add("count_nonzero") >>> pipeline.add("feature_detection", mask_fn="threshold_mask", min_size=5) >>> record = pipeline.run(stack, log_to="out.json") >>> # Access the outputs: >>> record["analysis"]["step_1_count_nonzero"] {'counts': [...], ...} >>> # Inspect provenance: >>> for step_info in record["provenance"]["steps"]: ... print(step_info["name"], step_info["analysis_key"]) count_nonzero step_1_count_nonzero feature_detection step_2_feature_detection """ # Ensure stack.provenance is a dict if not hasattr(stack, "provenance") or not isinstance(stack.provenance, dict): stack.provenance = {} # Ensure 'analysis' sub-dict exists if "analysis" not in stack.provenance or not isinstance( stack.provenance["analysis"], dict ): stack.provenance["analysis"] = { "steps": [], "results_by_name": {}, "frame_times": None, } if not hasattr(stack, "analysis") or not isinstance(stack.analysis, dict): stack.analysis = {} env = gather_environment_info() # If provenance already has environment from processing, you may choose to # merge or keep first; # For simplicity, you can overwrite or check if empty. if not stack.provenance.get("environment"): stack.provenance["environment"] = env # Clear analysis provenance record stack.provenance["analysis"]["steps"].clear() stack.provenance["analysis"]["results_by_name"].clear() stack.provenance["analysis"]["frame_times"] = None step_results: list[dict[str, Any]] = [] results_by_name: defaultdict[str, list] = defaultdict(list) previous_latest: dict[str, dict[str, Any]] = {} # module cache unchanged for idx, (module_name, params) in enumerate(self.steps, start=1): logger.info( f"Running analysis step {idx}: {module_name} with params {params!r}" ) module = self._load_module(module_name) # check any declared requirements reqs = getattr(module, "requires", ()) if reqs: if not any(r in previous_latest for r in reqs): raise RuntimeError( f"Analysis step '{module_name}' requires one of {reqs!r}; " f"make sure to add at least one before '{module_name}'." ) # timestamp timestamp = utc_now_iso() if "mask_fn" in params and isinstance(params["mask_fn"], str): params = self._resolve_mask_fn(params) # run; pass in previous_latest so module can read latest outputs by name try: outputs = module.run(stack, previous_results=previous_latest, **params) except Exception as e: logger.error( f"Module '{module_name}' failed at step {idx}: {e}", exc_info=True ) raise # Store snapshot under unique key safe_name = module_name.replace(" ", "_") analysis_key = f"step_{idx}_{safe_name}" stack.analysis[analysis_key] = outputs # record this step step_record: dict[str, Any] = { "index": idx, "name": module_name, "params": params, "timestamp": timestamp, "version": getattr(module, "version", None), "analysis_key": analysis_key, } step_results.append(step_record) # update previous_results structures results_by_name[module_name].append( {"analysis_key": analysis_key, "outputs": outputs} ) # allow downstream modules to use latest result previous_latest[module_name] = outputs # Build the updated provedence record stack.provenance["analysis"]["steps"] = step_results stack.provenance["analysis"]["results_by_name"] = dict(results_by_name) frame_times = ( stack.get_frame_times() if hasattr(stack, "get_frame_times") else None ) stack.provenance["analysis"]["frame_times"] = frame_times record = { "environment": stack.provenance["environment"], "analysis": stack.analysis, "provenance": stack.provenance["analysis"], } # write to file if requested # Swapped from using NumpyEncoder because it could not handle the size # of the full analysis record. if log_to: import json safe_record = { "environment": sanitize_analysis_for_logging(record["environment"]), "analysis": sanitize_analysis_for_logging(record["analysis"]), "provenance": sanitize_analysis_for_logging(record["provenance"]), } with open(log_to, "w") as file: json.dump(safe_record, file, indent=2) # Add record return record