playnano.analysis package

Subpackages

Submodules

playnano.analysis.base module

Module holding the AnalysisModule base class.

class playnano.analysis.base.AnalysisModule[source]

Bases: ABC

Abstract base class for analysis steps.

Subclasses must implement:

  • a name property returning a unique string identifier

  • a run(stack, previous_results=None, **params) -> dict method

Inherits from abc.ABC.

abstract property name: str

Unique name for this analysis module, e.g. “particle_detect”.

Used by pipeline to identify and refer to the module.

abstractmethod run(stack: AFMImageStack, previous_results: dict[str, Any] | None = None, **params) dict[str, Any][source]

Perform the analysis on the given AFMImageStack.

Parameters:
  • stack (AFMImageStack) – The AFMImageStack instance, containing .data and metadata.

  • previous_results (dict[str, Any] or None, optional) – Outputs from earlier modules in the pipeline, if any.

  • **params (dict) – Module-specific parameters, e.g., threshold, min_size, etc.

Returns:

Dictionary mapping output names (strings) to results. Example:

{
    "coords": numpy.ndarray of shape (N, 3),
    "masks": numpy.ndarray of shape (n_frames, H, W)
}

Return type:

AnalysisOutputs

Notes

Subclasses must implement this method. The returned dictionary can contain any data the analysis module produces, but must be keyed by unique output names.

playnano.analysis.export module

Functions for exporting analysis results.

playnano.analysis.export.export_analysis_to_json(out_path: str, analysis_record: dict[str, Any]) None[source]

Write the analysis_record (returned by AnalysisPipeline.run) to JSON.

Parameters:
  • out_path (str) – Output file path.

  • analysis_record (dict) – Analysis record to serialize.

Return type:

None

Raises:

OSError – If the file cannot be written.

playnano.analysis.pipeline module

Module for the AnalysisPipeline class for orchastration of analysis workflows.

class playnano.analysis.pipeline.AnalysisPipeline[source]

Bases: object

Orchestrates a sequence of analysis steps on an AFMImageStack.

Each step corresponds to an AnalysisModule (built-in or entry-point), invoked in order with the given parameters. Outputs of each step are stored in stack.analysis under keys ‘step_<n>_<module_name>’. Detailed provenance (timestamps, parameters, version, linking keys) is recorded in stack.provenance[“analysis”]. The run() method returns a dict containing environment info, the analysis dict, and its provenance.

add(module_name: str, **params) None[source]

Add an analysis module to the pipeline.

Parameters:
  • module_name (str) – The name of the analysis module to add (must be registered).

  • **params – Keyword arguments passed to the module’s run() method.

Return type:

None

Examples

>>> pipeline.add("particle_detect", threshold=5, min_size=10)
>>> pipeline.add("track_particles", max_jump=3)
clear() None[source]

Remove all scheduled analysis steps and clear module cache.

This allows reconfiguration of the pipeline without creating a new instance.

run(stack: AFMImageStack, log_to: str | None = None) dict[str, Any][source]

Execute all added analysis steps on the given AFMImageStack.

Each step:

  • is resolved to an AnalysisModule instance

  • invoked with (stack, previous_results=…, **params)

  • its outputs are stored under stack.analysis[“step_<n>_<module_name>”]

  • provenance is recorded in stack.provenance[“analysis”][“steps”]

The overall provenance sub-dict also collects:

  • results_by_name: mapping module name to list of outputs

  • frame_times: result of stack.get_frame_times(), or None

The environment info (via gather_environment_info) is stored at stack.provenance[“environment”] (if not already set).

Parameters:
  • stack (AFMImageStack) – The AFMImageStack to analyze.

  • log_to (str, optional) – Path to a JSON file where the combined record will be saved.

Returns:

  • AnalysisRecord (dict)

  • { – “environment”: <dict of environment metadata>, “analysis”: <dict of outputs per step>, “provenance”: <dict with keys “steps”, “results_by_name”, “frame_times”>

  • }

Notes

  • Raw outputs: accessible via stack.analysis[“step_<n>_<module_name>”].

  • Provenance: in stack.provenance[“analysis”], with a list of step records.

  • If stack.provenance or stack.analysis is absent, they are created.

  • If log_to is provided, the same record dict is JSON-dumped using NumpyEncoder.

Raises:

Exception – Propagates any exception from module.run(…), after logging.

Examples

>>> pipeline = AnalysisPipeline()
>>> pipeline.add("count_nonzero")
>>> pipeline.add("feature_detection", mask_fn="threshold_mask", min_size=5)
>>> record = pipeline.run(stack, log_to="out.json")
>>> # Access the outputs:
>>> record["analysis"]["step_1_count_nonzero"]
{'counts': [...], ...}
>>> # Inspect provenance:
>>> for step_info in record["provenance"]["steps"]:
...     print(step_info["name"], step_info["analysis_key"])
count_nonzero step_1_count_nonzero
feature_detection step_2_feature_detection
playnano.analysis.pipeline.AnalysisRecord

Structured output of an AnalysisPipeline run.

This record contains: - environment : dict

Metadata about the runtime environment (e.g. Python version, library versions).

  • analysisdict

    Results of each analysis module run, with keys ‘step_<n>_<module_name>’.

  • provenancedict
    Metadata about the provenance of the analysis steps, with keys:
    • stepslist of dict
      Ordered list of executed analysis steps. Each entry contains:
      • indexint

        1-based index of the step in the pipeline.

      • namestr

        The name of the analysis module used.

      • paramsdict

        Parameters passed to the module.

      • timestampstr

        ISO 8601 UTC timestamp when the step was executed.

      • versionstr or None

        Optional version string provided by the module instance.

      • analysis_keystr

        Key under which this step’s outputs are stored in the analysis dict.

    • results_by_namedict[str, list]

      Maps module names to lists of outputs from each occurrence.

    • frame_timeslist[float] or None

      Timestamps for each frame in the stack, from stack.get_frame_times(), or None if unavailable.

Examples

>>> pipeline = AnalysisPipeline()
>>> pipeline.add("feature_detection", threshold=5)
>>> record = pipeline.run(stack, log_to="out.json")
>>> # Access outputs:
>>> record["analysis"]["step_1_feature_detection"]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}
>>> # Inspect provenance:
>>> record["provenance"]["results_by_name"]["feature_detection"][0]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}

alias of dict[str, Any]

Module contents

Public package initialization. Analysis modules live here.