playNano.analysis package

Subpackages

Submodules

playNano.analysis.base module

Module holding the AnalysisModule base class.

class playNano.analysis.base.AnalysisModule[source]

Bases: ABC

Abstract base class for analysis steps.

Subclasses must implement:

  • a name property returning a unique string identifier

  • a run(stack, previous_results=None, **params) -> dict method

abstract property name: str

Unique name for this analysis module, e.g. “particle_detect”.

Used by pipeline to identify and refer to the module.

abstract run(stack: AFMImageStack, previous_results: dict[str, Any] | None = None, **params) dict[str, Any][source]

Perform the analysis on the given AFMImageStack.

Parameters:
  • stack (AFMImageStack) – The AFMImageStack instance, containing .data and metadata.

  • previous_results (dict or None) – Outputs from earlier modules in the pipeline, if any.

  • **params (dict) – Module-specific parameters, e.g., threshold, min_size, etc.

Returns:

A dictionary mapping output names (strings) to results. Example:

{
    "coords": numpy array of shape (N, 3),
    "masks": numpy array of shape (n_frames, H, W),
}

Return type:

AnalysisOutputs

playNano.analysis.export module

Functions for exporting ananlysis results.

playNano.analysis.export.export_analysis_to_json(out_path: str, analysis_record: dict[str, Any]) None[source]

Write the analysis_record (returned by AnalysisPipeline.run) to JSON.

Parameters:
  • out_path (str) – Output file path.

  • analysis_record (dict) – Analysis record to serialize.

Return type:

None

Raises:

OSError – If the file cannot be written.

playNano.analysis.pipeline module

Module for the AnalysisPipeline class for orchastration of analysis workflows.

class playNano.analysis.pipeline.AnalysisPipeline[source]

Bases: object

Orchestrates a sequence of analysis steps on an AFMImageStack.

Each step corresponds to an AnalysisModule (built-in or entry-point), invoked in order with the given parameters. Outputs of each step are stored in stack.analysis under keys ‘step_<n>_<module_name>’. Detailed provenance (timestamps, parameters, version, linking keys) is recorded in stack.provenance[“analysis”]. The run() method returns a dict containing environment info, the analysis dict, and its provenance.

add(module_name: str, **params) None[source]

Add an analysis module to the pipeline.

Parameters:
  • module_name (str) – The name of the analysis module to add (must be registered).

  • **params – Keyword arguments passed to the module’s run() method.

Return type:

None

Examples

>>> pipeline.add("particle_detect", threshold=5, min_size=10)
>>> pipeline.add("track_particles", max_jump=3)
clear() None[source]

Remove all scheduled analysis steps and clear module cache.

This allows reconfiguration of the pipeline without creating a new instance.

run(stack: AFMImageStack, log_to: str | None = None) dict[str, Any][source]

Execute all added analysis steps on the given AFMImageStack.

Each step:

  • is resolved to an AnalysisModule instance

  • invoked with (stack, previous_results=…, **params)

  • its outputs are stored under stack.analysis[“step_<n>_<module_name>”]

  • provenance is recorded in stack.provenance[“analysis”][“steps”]

The overall provenance sub-dict also collects:

  • results_by_name: mapping module name to list of outputs

  • frame_times: result of stack.get_frame_times(), or None

The environment info (via gather_environment_info) is stored at stack.provenance[“environment”] (if not already set).

Parameters:
  • stack (AFMImageStack) – The AFMImageStack to analyze.

  • log_to (str, optional) – Path to a JSON file where the combined record will be saved.

Returns:

  • AnalysisRecord (dict)

  • { – “environment”: <dict of environment metadata>, “analysis”: <dict of outputs per step>, “provenance”: <dict with keys “steps”, “results_by_name”, “frame_times”>

  • }

Notes

  • Raw outputs: accessible via stack.analysis[“step_<n>_<module_name>”].

  • Provenance: in stack.provenance[“analysis”], with a list of step records.

  • If stack.provenance or stack.analysis is absent, they are created.

  • If log_to is provided, the same record dict is JSON-dumped using NumpyEncoder.

Raises:

Exception – Propagates any exception from module.run(…), after logging.

Examples

>>> pipeline = AnalysisPipeline()
>>> pipeline.add("count_nonzero")
>>> pipeline.add("feature_detection", mask_fn="threshold_mask", min_size=5)
>>> record = pipeline.run(stack, log_to="out.json")
>>> # Access the outputs:
>>> record["analysis"]["step_1_count_nonzero"]
{'counts': [...], ...}
>>> # Inspect provenance:
>>> for step_info in record["provenance"]["steps"]:
...     print(step_info["name"], step_info["analysis_key"])
count_nonzero step_1_count_nonzero
feature_detection step_2_feature_detection
playNano.analysis.pipeline.AnalysisRecord

Structured output of an AnalysisPipeline run.

This record contains: - environment : dict

Metadata about the runtime environment (e.g. Python version, library versions).

  • analysisdict

    Results of each analysis module run, with keys ‘step_<n>_<module_name>’.

  • provenancedict
    Metadata about the provenance of the analysis steps, with keys:
    • stepslist of dict
      Ordered list of executed analysis steps. Each entry contains:
      • indexint

        1-based index of the step in the pipeline.

      • namestr

        The name of the analysis module used.

      • paramsdict

        Parameters passed to the module.

      • timestampstr

        ISO 8601 UTC timestamp when the step was executed.

      • versionstr or None

        Optional version string provided by the module instance.

      • analysis_keystr

        Key under which this step’s outputs are stored in the analysis dict.

    • results_by_namedict[str, list]

      Maps module names to lists of outputs from each occurrence.

    • frame_timeslist[float] or None

      Timestamps for each frame in the stack, from stack.get_frame_times(), or None if unavailable.

Examples

>>> pipeline = AnalysisPipeline()
>>> pipeline.add("feature_detection", threshold=5)
>>> record = pipeline.run(stack, log_to="out.json")
>>> # Access outputs:
>>> record["analysis"]["step_1_feature_detection"]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}
>>> # Inspect provenance:
>>> record["provenance"]["results_by_name"]["feature_detection"][0]["summary"]
{'total_features': 23, 'avg_per_frame': 3.8}

alias of dict[str, Any]

Module contents

Public package initialization. Analysis modules live here.