playnano.analysis.pipeline module¶
Module for the AnalysisPipeline class for orchastration of analysis workflows.
- class playnano.analysis.pipeline.AnalysisPipeline[source]¶
Bases:
objectOrchestrates a sequence of analysis steps on an AFMImageStack.
Each step corresponds to an AnalysisModule (built-in or entry-point), invoked in order with the given parameters. Outputs of each step are stored in stack.analysis under keys ‘step_<n>_<module_name>’. Detailed provenance (timestamps, parameters, version, linking keys) is recorded in stack.provenance[“analysis”]. The run() method returns a dict containing environment info, the analysis dict, and its provenance.
- add(module_name: str, **params) None[source]¶
Add an analysis module to the pipeline.
- Parameters:
module_name (str) – The name of the analysis module to add (must be registered).
**params – Keyword arguments passed to the module’s run() method.
- Return type:
None
Examples
>>> pipeline.add("particle_detect", threshold=5, min_size=10) >>> pipeline.add("track_particles", max_jump=3)
- clear() None[source]¶
Remove all scheduled analysis steps and clear module cache.
This allows reconfiguration of the pipeline without creating a new instance.
- run(stack: AFMImageStack, log_to: str | None = None) dict[str, Any][source]¶
Execute all added analysis steps on the given AFMImageStack.
Each step:
is resolved to an AnalysisModule instance
invoked with (stack, previous_results=…, **params)
its outputs are stored under stack.analysis[“step_<n>_<module_name>”]
provenance is recorded in stack.provenance[“analysis”][“steps”]
The overall provenance sub-dict also collects:
results_by_name: mapping module name to list of outputs
frame_times: result of stack.get_frame_times(), or None
The environment info (via gather_environment_info) is stored at stack.provenance[“environment”] (if not already set).
- Parameters:
stack (AFMImageStack) – The AFMImageStack to analyze.
log_to (str, optional) – Path to a JSON file where the combined record will be saved.
- Returns:
AnalysisRecord (dict)
{ – “environment”: <dict of environment metadata>, “analysis”: <dict of outputs per step>, “provenance”: <dict with keys “steps”, “results_by_name”, “frame_times”>
}
Notes
Raw outputs: accessible via stack.analysis[“step_<n>_<module_name>”].
Provenance: in stack.provenance[“analysis”], with a list of step records.
If stack.provenance or stack.analysis is absent, they are created.
If log_to is provided, the same record dict is JSON-dumped using NumpyEncoder.
- Raises:
Exception – Propagates any exception from module.run(…), after logging.
Examples
>>> pipeline = AnalysisPipeline() >>> pipeline.add("count_nonzero") >>> pipeline.add("feature_detection", mask_fn="threshold_mask", min_size=5) >>> record = pipeline.run(stack, log_to="out.json") >>> # Access the outputs: >>> record["analysis"]["step_1_count_nonzero"] {'counts': [...], ...} >>> # Inspect provenance: >>> for step_info in record["provenance"]["steps"]: ... print(step_info["name"], step_info["analysis_key"]) count_nonzero step_1_count_nonzero feature_detection step_2_feature_detection
- playnano.analysis.pipeline.AnalysisRecord¶
Structured output of an AnalysisPipeline run.
This record contains: - environment : dict
Metadata about the runtime environment (e.g. Python version, library versions).
- analysisdict
Results of each analysis module run, with keys ‘step_<n>_<module_name>’.
- provenancedict
- Metadata about the provenance of the analysis steps, with keys:
- stepslist of dict
- Ordered list of executed analysis steps. Each entry contains:
- indexint
1-based index of the step in the pipeline.
- namestr
The name of the analysis module used.
- paramsdict
Parameters passed to the module.
- timestampstr
ISO 8601 UTC timestamp when the step was executed.
- versionstr or None
Optional version string provided by the module instance.
- analysis_keystr
Key under which this step’s outputs are stored in the analysis dict.
- results_by_namedict[str, list]
Maps module names to lists of outputs from each occurrence.
- frame_timeslist[float] or None
Timestamps for each frame in the stack, from stack.get_frame_times(), or None if unavailable.
Examples
>>> pipeline = AnalysisPipeline() >>> pipeline.add("feature_detection", threshold=5) >>> record = pipeline.run(stack, log_to="out.json") >>> # Access outputs: >>> record["analysis"]["step_1_feature_detection"]["summary"] {'total_features': 23, 'avg_per_frame': 3.8} >>> # Inspect provenance: >>> record["provenance"]["results_by_name"]["feature_detection"][0]["summary"] {'total_features': 23, 'avg_per_frame': 3.8}