"""
Threshold-based feature detection for AFM image stacks.
This is a playNano analysis module implementing a concrete subclass of the
abstract `AnalysisModule` base class. It detects contiguous features in each
frame of an AFM image stack using user-provided or precomputed masks, with
optional hole filling, morphological separation, size and edge filtering, and
per-feature statistics extraction.
See Also
--------
playnano.analysis.modules.log_blob_detection : LoG-based alternative detection method.
playnano.analysis.modules.particle_tracking : Links detected features across frames.
.. versionadded:: 0.2.0
Author
------
Daniel E. Rollins (d.e.rollins@leeds.ac.uk) / GitHub: derollins
AI Transparency Note
--------------------
AI-based tools were used for limited typing/formatting assistance
and for debugging, refactoring, and documentation suggestions. All code paths,
algorithms, and final behaviour were reviewed and validated by the author.
"""
import inspect
from typing import Any, Optional
import numpy as np
from scipy.ndimage import binary_fill_holes
from skimage.measure import label, regionprops
from skimage.morphology import disk, opening, remove_small_holes
from playnano.analysis.base import AnalysisModule
from playnano.processing.mask_generators import register_masking
from playnano.utils.param_utils import param_conditions
MASK_MAP = register_masking()
[docs]
class FeatureDetectionModule(AnalysisModule):
"""
Detect contiguous features in each frame of an AFM image stack.
This module accepts either a callable mask-generating function or a reference
to a boolean mask array stored in `previous_results`. Connected components are
identified per frame, optionally morphologically separated, filtered by size
and edge contact, and returned along with feature statistics and labeled masks.
Parameters
----------
mask_fn : callable, optional
Function of the form `frame -> bool_2D_array` used to generate a mask
for each frame. Required if `mask_key` is not provided.
mask_key : str, optional
Key referencing a boolean mask array stored in `previous_results`.
Required if `mask_fn` is not provided.
morph_opening : bool, default False
If True, apply morphological opening to each labeled object to separate
touching features.
sep_radius : int, default 6
Radius of the disk structuring element used for morphological opening.
min_size : int, default 10
Minimum pixel area for a region to be retained.
remove_edge : bool, default True
If True, discard any region touching the frame boundary.
fill_holes : bool, default False
If True, fill holes within each mask before labeling.
hole_area : int or None, optional
Maximum size of holes to fill. If None, fill all holes.
**mask_kwargs : Any
Additional keyword arguments passed to `mask_fn(frame, **mask_kwargs)`.
Returns
-------
dict[str, Any]
- **features_per_frame** : list[list[dict]]
Per-frame list of feature stats dicts, each with:
- `"frame_timestamp"` : float
- `"label"` : int
- `"area"` : int
- `"min"`, `"max"`, `"mean"` : float
- `"bbox"` : (min_row, min_col, max_row, max_col)
- `"centroid"` : (row, col)
- **labeled_masks** : list[np.ndarray]
Integer-labeled masks for each frame.
- **summary** : dict
Aggregate values: total frames, total features, average features
per frame.
Raises
------
ValueError
If mask configuration is invalid or mask has incorrect shape/dtype.
KeyError
If `mask_key` is supplied but missing from `previous_results`.
Version
-------
0.2.0
Version 0.2.0 adds morphological opening for separating close or touching particles.
Examples
--------
>>> pipeline.add("feature_detection", mask_fn=mask_mean_offset,
... min_size=20, fill_holes=True, hole_area=50)
>>> result = pipeline.run(stack)
>>> result["summary"]["total_features"]
123
"""
version = "0.2.0"
@property
def name(self) -> str:
"""
Return the name of the analysis module.
Returns
-------
str
The identifier for this module: ``"feature_detection"``.
"""
return "feature_detection"
def _get_mask_array(
self,
data: np.ndarray,
previous_results: Optional[dict[str, Any]],
mask_fn: Optional[callable],
mask_key: Optional[str],
**mask_kwargs,
) -> np.ndarray:
"""
Resolve the boolean mask array for the stack.
Either retrieves a precomputed mask from `previous_results` using
`mask_key` or computes a mask for each frame using `mask_fn`.
Parameters
----------
data : np.ndarray
3D array of shape (n_frames, H, W) representing the stack data.
previous_results : dict[str, Any] or None
Mapping of previously computed analysis outputs.
mask_fn : callable or None
Function that produces a mask for a single frame.
mask_key : str or None
Key referencing a boolean array in `previous_results`.
**mask_kwargs : Any
Passed directly to `mask_fn`.
Returns
-------
np.ndarray
Boolean array of shape matching `data`.
Raises
------
ValueError
If mask configuration is invalid.
KeyError
If `mask_key` is provided but missing in `previous_results`.
"""
n_frames, H, W = data.shape
if mask_key is not None:
if not previous_results or mask_key not in previous_results:
raise KeyError(f"mask_key '{mask_key}' not found in previous_results")
mask_arr = previous_results[mask_key]
if not (
isinstance(mask_arr, np.ndarray)
and mask_arr.dtype == bool
and mask_arr.shape == data.shape
):
raise ValueError(
f"previous_results[{mask_key}] must be a boolean ndarray of shape {data.shape}" # noqa
)
return mask_arr
if mask_fn is None:
raise ValueError("Either mask_fn or mask_key must be provided")
# Resolve mask_fn if it's a registered string
if isinstance(mask_fn, str):
if mask_fn not in MASK_MAP:
raise ValueError(
f"mask_fn '{mask_fn}' is not a known registered mask. "
f"Available: {list(MASK_MAP.keys())}"
)
mask_fn = MASK_MAP[mask_fn]
# Compute mask frame-by-frame
mask_arr = np.zeros_like(data, dtype=bool)
for i in range(n_frames):
try:
mf = mask_fn(data[i], **mask_kwargs)
except TypeError:
mf = mask_fn(data[i])
if not (
isinstance(mf, np.ndarray) and mf.dtype == bool and mf.shape == (H, W)
):
raise ValueError(f"mask_fn returned invalid mask for frame {i}")
mask_arr[i] = mf
return mask_arr
def _separate_touching_by_opening(
self,
labeled_mask: np.ndarray,
selem_radius: int = 6,
max_area_loss: float = 0.6,
) -> np.ndarray:
"""
Apply morphological opening to each labeled object separately, then relabel.
If opening removes too much of an object (area loss > max_area_loss), keep
the original object to avoid over-erosion of thin rectangles.
Parameters
----------
labeled_mask : np.ndarray
Integer-labeled mask (0 = background)
selem_radius : int
Radius for disk structuring element used in opening
max_area_loss : float
If opened area < (1 - max_area_loss) * original area, fallback to original
Returns
-------
np.ndarray
New labeled mask with objects possibly split into multiple labels.
"""
if selem_radius <= 0:
return labeled_mask
selem = disk(selem_radius)
out = np.zeros_like(labeled_mask, dtype=np.int32)
next_label = 1
# Iterate each object independently
for obj_id in np.unique(labeled_mask):
if obj_id == 0:
continue
binary = labeled_mask == obj_id
orig_area = int(binary.sum())
if orig_area == 0:
continue
opened = opening(binary, selem)
opened_area = int(opened.sum())
# Safety: if opening removed too much, retain original shape
if opened_area == 0 or opened_area < (1.0 - max_area_loss) * orig_area:
opened = binary
relabeled = label(opened)
if relabeled.max() == 0:
continue
for sub_id in range(1, relabeled.max() + 1):
out[relabeled == sub_id] = next_label
next_label += 1
return out
def _process_frame(
self,
frame: np.ndarray,
mask_frame: np.ndarray,
frame_ts: float,
*,
morph_opening: bool,
sep_radius: int,
min_size: int,
remove_edge: bool,
fill_holes: bool,
hole_area: Optional[int],
) -> tuple[list[dict[str, Any]], np.ndarray]:
"""Process a single frame: hole fill, labeling, filtering, stats."""
H, W = frame.shape
# Optionally fill holes
if fill_holes:
if hole_area is not None:
# Normalize semantics to "fill holes with area < hole_area"
# across skimage versions.
# New API (0.26+): prefers `max_size` which fills holes with
# area <= max_size.
# Old API: `area_threshold` fills holes with area < area_threshold.
sig = inspect.signature(remove_small_holes)
if "max_size" in sig.parameters:
# Emulate strict "< hole_area" by using <= hole_area-1
max_size = max(hole_area - 1, 0) # guard against negatives
mask_frame = remove_small_holes(
mask_frame, max_size=max_size, connectivity=1
)
else:
mask_frame = remove_small_holes(
mask_frame, area_threshold=hole_area, connectivity=1
)
else:
mask_frame = binary_fill_holes(mask_frame)
mask_frame = mask_frame.astype(bool)
# Label connected regions
initial_labeled = label(mask_frame)
# Morphological separation (per object), before filtering
if morph_opening and sep_radius > 0:
initial_labeled = self._separate_touching_by_opening(
initial_labeled, selem_radius=sep_radius
)
filtered_mask = np.zeros_like(mask_frame, dtype=bool)
for prop in regionprops(initial_labeled):
if prop.area < min_size:
continue
minr, minc, maxr, maxc = prop.bbox
if remove_edge and (minr == 0 or minc == 0 or maxr == H or maxc == W):
continue
filtered_mask[initial_labeled == prop.label] = True
# Relabel after filtering
labeled = label(filtered_mask)
props = regionprops(labeled, intensity_image=frame)
# Collect stats
features: list[dict[str, Any]] = []
for prop in props:
mask_pixels = labeled == prop.label
vals = frame[mask_pixels]
if vals.size == 0:
continue
features.append(
{
"frame_timestamp": frame_ts,
"label": int(prop.label),
"area": float(prop.area),
"min": float(vals.min()),
"max": float(vals.max()),
"mean": float(vals.mean()),
"bbox": tuple(map(int, prop.bbox)), # (minr, minc, maxr, maxc)
"centroid": tuple(map(float, prop.centroid)),
}
)
return features, labeled
def _summarize(self, n_frames: int, total_features: int) -> dict[str, Any]:
"""Summarize results across frames."""
return {
"total_frames": n_frames,
"total_features": total_features,
"avg_features_per_frame": (
total_features / n_frames if n_frames > 0 else 0.0
),
}
[docs]
@param_conditions(
mask_fn=lambda p: not p.get("mask_key"),
mask_key=lambda p: not p.get("mask_fn"),
hole_area=lambda p: p.get("fill_holes", False),
)
def run(
self,
stack,
previous_results: Optional[dict[str, Any]] = None,
*,
# Mask input: either supply a mask function or refer to
# existing mask in previous_results
mask_fn: Optional[callable] = None,
mask_key: Optional[str] = None,
# Morphological opening
morph_opening: bool = False,
sep_radius: int = 6,
# Filtering criteria:
min_size: int = 10,
remove_edge: bool = True,
# Hole-filling options:
fill_holes: bool = False,
hole_area: Optional[int] = None,
# kwargs for mask_fn(frame, **mask_kwargs)
**mask_kwargs,
) -> dict[str, Any]:
"""
Detect contiguous features on each frame of stack.data.
Parameters
----------
stack : AFMImageStack
The AFM stack whose `.data` (3D array) and `.time_for_frame()` are used.
previous_results : dict[str, Any], optional
Mapping of earlier analysis outputs. If `mask_key` is given,
must contain a boolean mask array under that key.
mask_fn : callable, optional
Function frame->bool array for masking.
Required if `mask_key` is None.
Returns
-------
dict[str, Any]
Dictionary containing:
- features_per_frame : list of lists of dict
- labeled_masks : list of np.ndarray
- summary : dict with total_features, total_frames, avg_features_per_frame
Raises
------
ValueError
If `stack.data` is None or not 3D, or mask array invalid,
or neither `mask_fn` nor `mask_key` provided.
KeyError
If `mask_key` not found in `previous_results`.
Examples
--------
>>> pipeline.add("feature_detection", mask_fn=mask_mean_offset, min_size=20)
>>> result = pipeline.run(stack)
"""
data = stack.data
if data is None:
raise ValueError("AFMImageStack has no data")
if not isinstance(data, np.ndarray) or data.ndim != 3:
raise ValueError("stack.data must be a 3D numpy array (n_frames, H, W)")
n_frames, _, _ = data.shape
mask_arr = self._get_mask_array(
data, previous_results, mask_fn, mask_key, **mask_kwargs
)
features_per_frame: list[list[dict[str, Any]]] = []
labeled_masks: list[np.ndarray] = []
total_features = 0
for i in range(n_frames):
try:
frame_ts = float(stack.time_for_frame(i))
except Exception:
frame_ts = float(i)
feats, labeled = self._process_frame(
data[i],
mask_arr[i].copy(),
frame_ts,
morph_opening=morph_opening,
sep_radius=sep_radius,
min_size=min_size,
remove_edge=remove_edge,
fill_holes=fill_holes,
hole_area=hole_area,
)
features_per_frame.append(feats)
labeled_masks.append(labeled)
total_features += len(feats)
return {
"features_per_frame": features_per_frame,
"labeled_masks": labeled_masks,
"summary": self._summarize(n_frames, total_features),
}