Source code for playnano.cli.utils

"""Utility functions for the playNano CLI."""

import inspect
import json
import logging
import numbers
from importlib import metadata
from pathlib import Path
from typing import Any, Mapping, Union, get_args, get_origin

import numpy as np
import yaml

from playnano.analysis import BUILTIN_ANALYSIS_MODULES
from playnano.processing.filters import register_filters
from playnano.processing.mask_generators import register_masking
from playnano.processing.masked_filters import register_mask_filters
from playnano.processing.stack_edit import register_stack_edit_processing
from playnano.processing.video_processing import register_video_processing

# Built-in filters and mask dictionaries
FILTER_MAP = register_filters()
MASK_MAP = register_masking()
MASK_FILTERS_MAP = register_mask_filters()
VIDEO_FILTER_MAP = register_video_processing()
STACK_EDIT_MAP = register_stack_edit_processing()

# Names of all entry-point plugins (if any third-party filters are installed)
_PLUGIN_ENTRYPOINTS = {
    ep.name: ep for ep in metadata.entry_points(group="playnano.filters")
}

# Names of all entry-point plugins (if any third-party filters are installed)
_ANALYSIS_PLUGIN_ENTRYPOINTS = {
    ep.name: ep for ep in metadata.entry_points(group="playnano.analysis")
}

INVALID_CHARS = r'\/:*?"<>|'
INVALID_FOLDER_CHARS = r'*?"<>|'
SKIP_PARAM_NAMES = {"data", "image", "arr", "mask", "stack", "debug"}

logger = logging.getLogger(__name__)


[docs] def is_valid_step(name: str) -> bool: """Return True if `name` is a built-in filter, mask, plugin or the 'clear' step.""" return ( name == "clear" or name in FILTER_MAP or name in MASK_MAP or name in _PLUGIN_ENTRYPOINTS or name in VIDEO_FILTER_MAP or name in STACK_EDIT_MAP )
[docs] def is_valid_analysis_step(name: str) -> bool: """Return True if `name` is a built-in analysis, plugin or the 'clear' step.""" return ( name == "clear" or name in BUILTIN_ANALYSIS_MODULES or name in _ANALYSIS_PLUGIN_ENTRYPOINTS )
[docs] def parse_processing_string(processing_str: str) -> list[tuple[str, dict[str, object]]]: """ Parse a semicolon-delimited string of processing steps into a structured list. Each step in the string can optionally include parameters, separated by commas. Parameters are specified as key=value pairs. Parameters ---------- processing_str : str Semicolon-delimited string specifying processing steps. Each step may have optional parameters (seperated by commas) after a colon, e.g., "remove_plane; gaussian_filter:sigma=2.0; threshold_mask:threshold=2" Returns ------- list of tuple List of tuples, each containing: - step_name (str): the name of the processing step - kwargs (dict of str → object): dictionary of parameters for the step Examples -------- >>> parse_processing_string("remove_plane") [('remove_plane', {})] >>> parse_processing_string("gaussian_filter:sigma=2.0,truncate=4.0") [('gaussian_filter', {'sigma': 2.0, 'truncate': 4.0})] >>> parse_processing_string( ... "remove_plane; gaussian_filter:sigma=2.0; threshold_mask:threshold=2" ... ) [ ('remove_plane', {}), ('gaussian_filter', {'sigma': 2.0}), ('threshold_mask', {'threshold': 2}) ] """ steps: list[tuple[str, dict[str, object]]] = [] # Split the input string into individual steps using ';' as the delimiter for segment in processing_str.split(";"): segment = segment.strip() if not segment: continue # Skip empty segments # Check if the step includes parameters (indicated by ':') if ":" in segment: step_name, params_part = segment.split(":", 1) step_name = step_name.strip() # Validate the step name if not is_valid_step(step_name): raise ValueError(f"Unknown processing step: '{step_name}'") kwargs: dict[str, object] = {} # Split parameters by ',' and parse each key=value pair for pair in params_part.split(","): pair = pair.strip() if not pair: continue # Skip empty parameter entries if "=" not in pair: raise ValueError( f"Invalid parameter expression '{pair}' in step '{step_name}'" ) key, val_str = pair.split("=", 1) key = key.strip() val_str = val_str.strip() # Attempt to convert the value to a boolean, int, or float if val_str.lower() in ("true", "false"): val = val_str.lower() == "true" else: try: val = float(val_str) if "." in val_str else int(val_str) except ValueError: val = val_str # Leave as string if not numeric kwargs[key] = val steps.append((step_name, kwargs)) else: # Step without parameters step_name = segment if not is_valid_step(step_name): raise ValueError(f"Unknown processing step: '{step_name}'") steps.append((step_name, {})) return steps
[docs] def parse_processing_file(path: str) -> list[tuple[str, dict[str, object]]]: """ Parse a YAML (or JSON) processing file into a list of (step_name, kwargs) tuples. Expected YAML schema: filters: - name: remove_plane - name: gaussian_filter sigma: 2.0 - name: threshold_mask threshold: 2 - name: polynomial_flatten order: 2 Returns a list in the order listed under `filters`. """ p = Path(path) if not p.exists(): raise FileNotFoundError(f"processing file not found: {path}") text = p.read_text() # Attempt to parse YAML first try: data = yaml.safe_load(text) except Exception: # If YAML parse fails, try JSON import json try: data = json.loads(text) except Exception as e: raise ValueError( f"Unable to parse processing file as YAML or JSON: {e}" ) from e if not isinstance(data, dict) or "filters" not in data: raise ValueError("processing file must contain top-level key 'filters'") filters_list = data["filters"] if not isinstance(filters_list, list): raise ValueError("'filters' must be a list in the processing file") steps: list[tuple[str, dict[str, object]]] = [] for entry in filters_list: if not isinstance(entry, dict) or "name" not in entry: raise ValueError( "Each entry under 'filters' must be a dict containing 'name'" ) # noqa step_name = entry["name"] if not is_valid_step(step_name): raise ValueError( f"Unknown processing step in processing file: '{step_name}'" ) # Build kwargs from all other key/value pairs in the dict kwargs: dict[str, object] = {} for k, v in entry.items(): if k == "name": continue kwargs[k] = v steps.append((step_name, kwargs)) return steps
[docs] def parse_analysis_string(analysis_str: str) -> list[tuple[str, dict[str, object]]]: """ Parse ; delimited analysis strings into a list (analysis_step_name, kwargs) tuples. Each segment in `analysis_str` is of the form: analysis_module_name analysis_module_name:param=value analysis_module_name:param1=value1,param2=value2 Example: "log_blob_detection:min_sigma=1.0,max_sigma=5.0;x_means_clustering:time_weight=0.2" Returns a list in the order encountered, e.g.: [("log_blob_detection", {"min_sigma":1.0,"max_sigma":5.0}), ("x_means_clustering", {"time_weight": 0.2})] """ steps: list[tuple[str, dict[str, object]]] = [] # Split on ';' (also accept ',' as alternate, just in case) for segment in analysis_str.split(";"): segment = segment.strip() if not segment: continue # If the segment contains ':', separate name from params if ":" in segment: name_part, params_part = segment.split(":", 1) step_name = name_part.strip() if not is_valid_analysis_step(step_name): raise ValueError(f"Unknown analysis step: '{step_name}'") # Parse params: they can be separated by ',' or ';' (but usually commas) kwargs: dict[str, object] = {} for pair in params_part.replace(";", ",").split(","): pair = pair.strip() if not pair: continue if "=" not in pair: raise ValueError( f"Invalid parameter expression '{pair}' in analysis step '{step_name}'" # noqa ) # noqa key, val_str = pair.split("=", 1) key = key.strip() val_str = val_str.strip() # Convert to float or int if possible if val_str.lower() in ("true", "false"): # Allow boolean parameters if needed val = val_str.lower() == "true" else: try: if "." in val_str: val = float(val_str) else: val = int(val_str) except ValueError: val = val_str # leave it as string if it’s not numeric kwargs[key] = val steps.append((step_name, kwargs)) else: # No colon → just the filter name step_name = segment if not is_valid_analysis_step(step_name): raise ValueError(f"Unknown analysis step: '{step_name}'") steps.append((step_name, {})) return steps
[docs] def parse_analysis_file(path: str) -> list[tuple[str, dict[str, object]]]: """ Parse a YAML or JSON analysis file into a list of (name, parameters) tuples. This reads a saved analysis pipeline definition, validates its structure, and normalizes any complex types (e.g., tuples) into YAML/JSON-safe forms. Parameters ---------- path : str Path to the YAML or JSON file containing the analysis definition. Returns ------- list of tuple A list where each element is ``(analysis_step_name, kwargs_dict)``. The ``kwargs_dict`` contains parameters for that analysis step. Raises ------ FileNotFoundError If the file does not exist. ValueError If the file cannot be parsed as YAML or JSON, or if the top-level key ``analysis`` is missing. Examples -------- Example YAML format:: analysis: - name: count_nonzero - name: feature_detection mask_fn: mask_threshold min_size: 10 remove_edge: true - name: particle_tracking coord_columns: [centroid_x, centroid_y] coord_key: features_per_frame detection_module: feature_detection max_distance: 5.0 This would be parsed as:: [ ("count_nonzero", {}), ("feature_detection", { "mask_fn": "mask_threshold", "min_size": 10, "remove_edge": True }), ("particle_tracking", { "coord_columns": ["centroid_x", "centroid_y"], "coord_key": "features_per_frame", "detection_module": "feature_detection", "max_distance": 5.0 }) ] """ p = Path(path) text = p.read_text(encoding="utf8") # Try JSON first if file extension suggests, otherwise YAML try: if p.suffix.lower() in (".json",): raw = json.loads(text) else: raw = yaml.safe_load(text) except (json.JSONDecodeError, yaml.YAMLError) as e: raise ValueError("Unable to parse analysis file as YAML or JSON") from e # Support two styles: {"analysis": [...]} or bare list [...] steps_raw = None if isinstance(raw, dict) and "analysis" in raw: steps_raw = raw["analysis"] elif isinstance(raw, list): steps_raw = raw else: raise ValueError( "Invalid analysis file: expected top-level 'analysis' or list of steps" ) out = [] for i, step in enumerate(steps_raw): if not isinstance(step, dict) or "name" not in step: raise ValueError( f"Invalid step #{i}: each step must be a mapping with a 'name' key" ) name = step["name"] if not is_valid_analysis_step(name): raise ValueError(f"Unknown analysis step: {name}") # Copy kwargs excluding the name to keep file dict intact kwargs = {k: v for k, v in step.items() if k != "name"} out.append((name, kwargs)) return out
def _get_analysis_class(module_name: str): """ Retrieve an analysis class by name from built-in modules or registered entry points. This function first checks the `BUILTIN_ANALYSIS_MODULES` dictionary. If the module is not found there, it attempts to load it from Python package entry points registered under the `playnano.analysis` group. Parameters ---------- module_name : str The name of the analysis module to retrieve. Returns ------- type The analysis class corresponding to the requested module. Raises ------ ValueError If the module cannot be found in built-ins or entry points. Exception If any other error occurs during module loading, the exception is logged and re-raised. """ # mirror pipeline._load_module logic or import from registry try: cls = BUILTIN_ANALYSIS_MODULES.get(module_name) if cls is None: # try to load entry point eps = metadata.entry_points().select( group="playnano.analysis", name=module_name ) if not eps: raise ValueError(f"Analysis module '{module_name}' not found") cls = eps[0].load() return cls except Exception as e: logger.exception(f"Failed to load analysis module '{module_name}': {e}") raise def _cast_input(s: str, expected_type: Any, default: Any): """ Convert a string input into a specified Python type, with fallback defaults. Handles `Optional` and `Union` annotations, and performs best-effort conversion for standard Python types (str, bool, int, float, tuple, list). If the conversion fails or the string is empty, returns the provided default value. Parameters ---------- s : str The string to convert. expected_type : type | Any The Python type or type annotation to convert the string into. default : Any Value to return if the string is empty or conversion is not possible. Returns ------- Any The converted value, or the default if conversion fails. Notes ----- - Boolean conversion recognizes '1', 'true', 'yes', 'y', 't' (case-insensitive) as True. - Tuple and list types assume comma-separated values in the string. - `Optional[T]` and `Union[T, NoneType]` are treated as `T`. - For generic types like `list[int]`, the element type hint is ignored, but the container conversion still applies. """ if s == "": return default # handle missing annotations if expected_type is None or expected_type is inspect._empty: return s origin = get_origin(expected_type) args = get_args(expected_type) # Union / Optional if origin is None and isinstance(expected_type, type): base = expected_type else: base = None if origin is Union: # try each non-None option in order non_none = [a for a in args if not isinstance(a, type(None))] for opt in non_none: try: return _cast_input(s, opt, default) except Exception: continue # fallback return default if type(None) in args else s # Handle plain tuple/list types if origin is None: if expected_type is tuple: items = [item.strip() for item in s.split(",") if item.strip()] return tuple(items) if expected_type is list: items = [item.strip() for item in s.split(",") if item.strip()] return items # simple types try: if base is bool: s2 = s.lower() return s2 in ("1", "true", "yes", "y", "t") if base is int: return int(s) if base is float: return float(s) if base is str: return s except Exception: # fall through to return default/string return s # fallback return s
[docs] def ask_for_analysis_params(module_name: str) -> dict[str, Any]: """Introspect a module's `run()` or parameter spec and ask for values.""" cls = _get_analysis_class(module_name) if hasattr(cls, "parameters") and callable(cls.parameters): spec = cls.parameters() return _ask_with_spec(spec) else: return _ask_with_signature(cls)
def _ask_with_spec(spec: list[dict[str, Any]]) -> dict[str, Any]: """Prompt user for parameter values based on a module-provided spec.""" kwargs = {} pending = list(spec) while pending: progressed, to_retry = _process_pending_entries(pending, kwargs) if not progressed: _prompt_remaining(to_retry, kwargs) break pending = to_retry return kwargs def _ask_with_signature(cls) -> dict[str, Any]: """Prompt user for parameter values based on `run()` signature.""" sig = inspect.signature(cls.run) kwargs = {} conds = getattr(cls.run, "_param_conditions", {}) pending = [ (name, param) for name, param in sig.parameters.items() if name not in ("self", "stack", "previous_results") and param.kind != inspect.Parameter.VAR_KEYWORD ] while pending: progressed, to_retry = _process_signature_pending(pending, kwargs, conds) if not progressed: _prompt_signature_remaining(to_retry, kwargs, conds) break pending = to_retry return kwargs # === Internal shared helpers === def _process_pending_entries(pending, kwargs): """Process pending spec entries with conditions.""" progressed = False to_retry = [] for entry in pending: name, typ, default, cond = ( entry["name"], entry.get("type", str), entry.get("default", ""), entry.get("condition"), ) should_ask = _resolve_condition(cond, kwargs) if should_ask is None: to_retry.append(entry) continue if not should_ask: progressed = True continue val = _prompt_and_cast(name, typ, default) if val is not None: kwargs[name] = val progressed = True return progressed, to_retry def _process_signature_pending(pending, kwargs, conds): """Process pending signature parameters with conditions.""" progressed = False to_retry = [] for pname, param in pending: cond = conds.get(pname, getattr(param, "condition", None)) should_ask = _resolve_condition(cond, kwargs) if should_ask is None: to_retry.append((pname, param)) continue if not should_ask: progressed = True continue default = param.default if param.default is not inspect._empty else None ann = param.annotation if param.annotation is not inspect._empty else None kwargs[pname] = _prompt_and_cast(pname, ann, default) progressed = True return progressed, to_retry def _prompt_remaining(to_retry, kwargs): """Prompt user for remaining spec entries.""" for entry in to_retry: name, typ, default, cond = ( entry["name"], entry.get("type", str), entry.get("default", ""), entry.get("condition"), ) if cond and not _resolve_condition(cond, kwargs): continue val = _prompt_and_cast(name, typ, default) if val is not None: kwargs[name] = val def _prompt_signature_remaining(to_retry, kwargs, conds): """Prompt user for remaining signature parameters.""" for pname, param in to_retry: cond = conds.get(pname, getattr(param, "condition", None)) if cond and not _resolve_condition(cond, kwargs): continue default = param.default if param.default is not inspect._empty else None ann = param.annotation if param.annotation is not inspect._empty else None val = _prompt_and_cast(pname, ann, default) kwargs[pname] = val def _resolve_condition(cond, kwargs): """Safely evaluate a conditional parameter dependency.""" if cond is None: return True try: return bool(cond(kwargs)) except KeyError: return None except Exception: return True def _prompt_and_cast(name, typ, default): """Prompt user for a value and cast appropriately.""" prompt = f" Enter {name} (type={getattr(typ, '__name__', str(typ))}, default={default}): " # noqa val_str = input(prompt).strip() return _cast_input(val_str, typ, default) def _get_processing_callable(step_name: str): """ Return a processing callable filter, mask generator, or mask filter. These are from built-ins or plugins. """ try: if step_name in FILTER_MAP: return FILTER_MAP[step_name] if step_name in MASK_MAP: return MASK_MAP[step_name] if step_name in MASK_FILTERS_MAP: return MASK_FILTERS_MAP[step_name] if step_name in VIDEO_FILTER_MAP: return VIDEO_FILTER_MAP[step_name] if step_name in STACK_EDIT_MAP: return STACK_EDIT_MAP[step_name] if step_name in _PLUGIN_ENTRYPOINTS: return _PLUGIN_ENTRYPOINTS[step_name].load() raise ValueError(f"Processing step '{step_name}' not found") except Exception as e: logger.exception(f"Failed to load processing step '{step_name}': {e}") raise
[docs] def get_processing_step_type(step_name: str) -> str: """Return the type of a processing step.""" if step_name in FILTER_MAP: return "filter" if step_name in MASK_MAP: return "mask generator" if step_name in MASK_FILTERS_MAP: return "mask filter" if step_name in _PLUGIN_ENTRYPOINTS: return "plugin filter" if step_name in VIDEO_FILTER_MAP: return "video filter" if step_name in STACK_EDIT_MAP: return "stack edit" return "unknown"
[docs] def ask_for_processing_params(step_name: str) -> dict[str, Any]: """ Introspect a processing callable's parameters and ask interactively. Skips the first positional arguments (data, mask). """ func = _get_processing_callable(step_name) # your existing resolver sig = inspect.signature(func) conditions = getattr(func, "_param_conditions", {}) # parameters in signature order, excluding data-like args params = [(n, p) for n, p in sig.parameters.items() if n not in SKIP_PARAM_NAMES] kwargs: dict[str, Any] = {} pending = params[:] # list of (name, param) # keep trying until done or no progress while pending: progressed = False to_retry = [] for name, param in pending: cond = conditions.get(name) # If there is no condition -> we should ask it if cond is None: should_ask = True else: try: should_ask = bool(cond(kwargs)) except KeyError: # condition depends on missing answers; postpone should_ask = None except Exception: # if condition raises, ask to be safe should_ask = True if should_ask is None: to_retry.append((name, param)) continue progressed = True # Remove from pending implicitly by not adding to to_retry # If should_ask is False, skip parameter (do not include) if not should_ask: continue default = param.default if param.default is not inspect._empty else None ann = param.annotation if param.annotation is not inspect._empty else None prompt = f" Enter {name} (type={getattr(ann,'__name__', str(ann))}, default={default}): " # noqa val_str = input(prompt).strip() kwargs[name] = _cast_input(val_str, ann, default) if not progressed: # nothing progressed – break and ask remaining params defensively # (prevents infinite loop if conditions depend on each other circularly) for name, param in to_retry: default = param.default if param.default is not inspect._empty else None ann = ( param.annotation if param.annotation is not inspect._empty else None ) prompt = f" Enter {name} (type={getattr(ann,'__name__', str(ann))}, default={default}): " # noqa val_str = input(prompt).strip() kwargs[name] = _cast_input(val_str, ann, default) break pending = to_retry return kwargs
def _sanitize_for_dump(obj: Any) -> Any: """ Convert Python objects into JSON/YAML-safe types suitable for safe_dump/json.dump. - tuple -> list - numpy types -> native Python types (numbers, lists) - pathlib.Path -> str - recursively applied to lists/dicts """ # Paths -> strings if isinstance(obj, Path): return str(obj) # numpy scalars -> python scalars if isinstance(obj, (np.integer, np.floating)): return obj.item() # numpy array -> nested lists if isinstance(obj, np.ndarray): return obj.tolist() # numbers including Python ints/floats -> keep if ( isinstance(obj, numbers.Number) or isinstance(obj, str) or isinstance(obj, bool) or obj is None ): return obj # tuple -> list (important to avoid !!python/tuple) if isinstance(obj, tuple): return [_sanitize_for_dump(x) for x in obj] # list -> map recursively if isinstance(obj, list): return [_sanitize_for_dump(x) for x in obj] # dict-like -> sanitize values if isinstance(obj, Mapping): return {k: _sanitize_for_dump(v) for k, v in obj.items()} # fallback to string representation return str(obj) def _normalize_loaded(obj: Any) -> Any: """ Normalize objects returned by yaml.safe_load / json.load. - Convert tuples to lists (some YAML loaders can still produce tuples) - Recurse into dicts/lists """ if isinstance(obj, tuple): return [_normalize_loaded(x) for x in obj] if isinstance(obj, list): return [_normalize_loaded(x) for x in obj] if isinstance(obj, dict): return {k: _normalize_loaded(v) for k, v in obj.items()} return obj