Source code for topostats.tracing.splining

"""Order single pixel skeletons with or without NodeStats Statistics."""

import logging
import math

import numpy as np
import numpy.typing as npt
from scipy import interpolate as interp

from topostats.classes import Molecule, TopoStats
from topostats.logs.logs import LOGGER_NAME

LOGGER = logging.getLogger(LOGGER_NAME)

# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
# pylint: disable=too-many-positional-arguments


# pylint: disable=too-many-instance-attributes
[docs] class splineTrace: # pylint: disable=too-many-instance-attributes """ Smooth the ordered trace via an average of splines. Parameters ---------- topostats_object : TopoStats TopoStats object with ordered traces calculated for grains and molecules. grain : int Grain number to process. molecule : int Molecule number to process. spline_step_size : float Step length in meters to use a coordinate for splining. spline_linear_smoothing : float Amount of linear spline smoothing. spline_circular_smoothing : float Amount of circular spline smoothing. spline_degree : int Degree of the spline. Cubic splines are recommended. Even values of k should be avoided especially with a small s-value. """ # pylint: disable=too-many-arguments def __init__( self, topostats_object: TopoStats, grain: int, molecule: int, spline_step_size: float | None = None, spline_linear_smoothing: float | None = None, spline_circular_smoothing: float | None = None, spline_degree: int | None = None, ) -> None: # pylint: disable=too-many-arguments """ Initialise the splineTrace class. Parameters ---------- topostats_object : TopoStats TopoStats object with ordered traces calculated for grains and molecules. grain : int Grain number to process. molecule : int Molecule number to process. spline_step_size : float Step length in meters to use a coordinate for splining. spline_linear_smoothing : float Amount of linear spline smoothing. spline_circular_smoothing : float Amount of circular spline smoothing. spline_degree : int Degree of the spline. Cubic splines are recommended. Even values of k should be avoided especially with a small s-value. """ self.image = topostats_object.image self.number_of_rows, self.number_of_columns = topostats_object.image.shape # Extract ordered trace and boolean for circular from desired grain and molecules self.mol_ordered_trace = topostats_object.grain_crops[grain].ordered_trace.molecule_data[molecule] self.mol_is_circular = topostats_object.grain_crops[grain].ordered_trace.molecule_data[molecule].circular self.pixel_to_nm_scaling = topostats_object.pixel_to_nm_scaling self.spline_step_size = spline_step_size self.spline_linear_smoothing = spline_linear_smoothing self.spline_circular_smoothing = spline_circular_smoothing self.spline_degree = spline_degree self.tracing_stats = { "contour_length": None, "end_to_end_distance": None, }
[docs] def get_splined_traces( self, fitted_trace: npt.NDArray, ) -> npt.NDArray: """ Get a splined version of the fitted trace - useful for finding the radius of gyration etc. This function actually calculates the average of several splines which is important for getting a good fit on the lower resolution data. Parameters ---------- fitted_trace : npt.NDArray Numpy array of the fitted trace. Returns ------- npt.NDArray Splined (smoothed) array of trace. """ # Calculate the step size in pixels from the step size in metres. # Should always be at least 1. # Note that step_size_m is in m and pixel_to_nm_scaling is in m because of the legacy code which seems to almost # always have pixel_to_nm_scaling be set in metres using the flag convert_nm_to_m. No idea why this is the case. step_size_px = max(int(self.spline_step_size / (self.pixel_to_nm_scaling * 1e-9)), 1) # Splines will be totalled and then divived by number of splines to calculate the average spline spline_sum = None # Get the length of the fitted trace fitted_trace_length = fitted_trace.shape[0] # If the fitted trace is less than the degree plus one, then there is no # point in trying to spline it, just return the fitted trace if fitted_trace_length < self.spline_degree + 1: LOGGER.debug( f"Fitted trace for grain {step_size_px} too small ({fitted_trace_length}), returning fitted trace" ) return fitted_trace # There cannot be fewer than degree + 1 points in the spline # Decrease the step size to ensure more than this number of points while fitted_trace_length / step_size_px < self.spline_degree + 1: # Step size cannot be less than 1 if step_size_px <= 1: step_size_px = 1 break step_size_px = -1 # Set smoothness and periodicity appropriately for linear / circular molecules. spline_smoothness, spline_periodicity = ( (self.spline_circular_smoothing, 2) if self.mol_is_circular else (self.spline_linear_smoothing, 0) ) # Create an array of evenly spaced points between 0 and 1 for the splines to be evaluated at. # This is needed to ensure that the splines are all the same length as the number of points # in the spline is controlled by the ev_array variable. ev_array = np.linspace(0, 1, fitted_trace_length * step_size_px) # Find as many splines as there are steps in step size, this allows for a better spline to be obtained # by averaging the splines. Think of this like weaving a lot of splines together along the course of # the trace. Example spline coordinate indexes: [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4], where spline # 1 takes every 4th coordinate, starting at position 0, then spline 2 takes every 4th coordinate # starting at position 1, etc... for i in range(step_size_px): # Sample the fitted trace at every step_size_px pixels sampled = [fitted_trace[j, :] for j in range(i, fitted_trace_length, step_size_px)] # Scipy.splprep cannot handle duplicate consecutive x, y tuples, so remove them. # Get rid of any consecutive duplicates in the sampled coordinates sampled = self.remove_duplicate_consecutive_tuples(tuple_list=sampled) x_sampled = sampled[:, 0] y_sampled = sampled[:, 1] # Use scipy's B-spline functions # tck is a tuple, (t,c,k) containing the vector of knots, the B-spline coefficients # and the degree of the spline. # s is the smoothing factor, per is the periodicity, k is the degree of the spline tck = interp.splprep( [x_sampled, y_sampled], s=spline_smoothness, per=spline_periodicity, # quiet=self.spline_quiet, k=self.spline_degree, )[0] # splev returns a tuple (x_coords ,y_coords) containing the smoothed coordinates of the # spline, constructed from the B-spline coefficients and knots. The number of points in # the spline is controlled by the ev_array variable. # ev_array is an array of evenly spaced points between 0 and 1. # This is to ensure that the splines are all the same length. # Tck simply provides the coefficients for the spline. out = interp.splev(ev_array, tck) splined_trace = np.column_stack((out[0], out[1])) # Add the splined trace to the spline_sum array for averaging later if spline_sum is None: spline_sum = np.array(splined_trace) else: spline_sum = np.add(spline_sum, splined_trace) # Find the average spline between the set of splines # This is an attempt to find a better spline by averaging our candidates return np.divide(spline_sum, [step_size_px, step_size_px])
[docs] @staticmethod # Perhaps we need a module for array functions? def remove_duplicate_consecutive_tuples(tuple_list: list[tuple | npt.NDArray]) -> list[tuple]: """ Remove duplicate consecutive tuples from a list. Parameters ---------- tuple_list : list[tuple | npt.NDArray] List of tuples or numpy ndarrays to remove consecutive duplicates from. Returns ------- list[Tuple] List of tuples with consecutive duplicates removed. Examples -------- For the list of tuples [(1, 2), (1, 2), (1, 2), (2, 3), (2, 3), (3, 4)], this function will return [(1, 2), (2, 3), (3, 4)] """ duplicates_removed = [] for index, tup in enumerate(tuple_list): if index == 0 or not np.array_equal(tuple_list[index - 1], tup): duplicates_removed.append(tup) return np.array(duplicates_removed)
[docs] def run_spline_trace(self) -> tuple[npt.NDArray, dict]: """ Pipeline to run the splining smoothing and obtaining smoothing stats. Returns ------- tuple[npt.NDArray, dict] Tuple of Nx2 smoothed trace coordinates, and smoothed trace statistics. """ # fitted trace # fitted_trace = self.get_fitted_traces(self.ordered_trace, mol_is_circular) # splined trace splined_trace = self.get_splined_traces(self.mol_ordered_trace) # compile CL & E2E distance self.tracing_stats["contour_length"] = ( measure_contour_length(splined_trace, self.mol_is_circular, self.pixel_to_nm_scaling) * 1e-9 ) self.tracing_stats["end_to_end_distance"] = ( measure_end_to_end_distance(splined_trace, self.mol_is_circular, self.pixel_to_nm_scaling) * 1e-9 ) return splined_trace, self.tracing_stats
[docs] class windowTrace: """ Obtain a smoothed trace of a molecule. Parameters ---------- topostats_object : TopoStats TopoStats object with ordered traces calculated for grains and molecules. grain : int Grain number to process. molecule : int Molecule number to process. rolling_window_size : np.float64, optional The length of the rolling window too average over, by default 6.0. rolling_window_resampling : bool, optional Whether to resample the rolling window, by default False. rolling_window_resample_interval_nm : float, optional The regular spatial interval (nm) to resample the rolling window, by default 0.5. """ def __init__( self, topostats_object: TopoStats, grain: int, molecule: int, rolling_window_size: float | None = None, rolling_window_resampling: bool = False, rolling_window_resample_interval_nm: float = 0.5, ) -> None: """ Initialise the windowTrace class. Parameters ---------- topostats_object : TopoStats TopoStats object with ordered traces calculated for grains and molecules. grain : int Grain number to process. molecule : int Molecule number to process. rolling_window_size : np.float64, optional The length of the rolling window too average over, by default 6.0. rolling_window_resampling : bool, optional Whether to resample the rolling window, by default False. rolling_window_resample_interval_nm : float, optional The regular spatial interval (nm) to resample the rolling window, by default 0.5. """ # Extract ordered trace and boolean for circular from desired grain and molecules self.mol_ordered_trace = topostats_object.grain_crops[grain].ordered_trace.molecule_data[molecule] self.mol_is_circular = topostats_object.grain_crops[grain].ordered_trace.molecule_data[molecule].circular self.pixel_to_nm_scaling = topostats_object.pixel_to_nm_scaling self.rolling_window_size = rolling_window_size / 1e-9 # for nm scaling factor self.rolling_window_resampling = rolling_window_resampling self.rolling_window_resample_interval_nm = rolling_window_resample_interval_nm / 1e-9 # for nm scaling factor self.tracing_stats = { "contour_length": None, "end_to_end_distance": None, }
[docs] @staticmethod def pool_trace_circular( molecule: Molecule, rolling_window_size: np.float64 = 6.0, pixel_to_nm_scaling: float = 1, ) -> npt.NDArray[np.float64]: """ Smooth a pixelwise ordered trace of circular molecules via a sliding window. Parameters ---------- molecule : Molecule Molecule object with attribute ``.ordered_coords``, the ordered coordinates to be splined. rolling_window_size : np.float64, optional The length of the rolling window too average over, by default 6.0. pixel_to_nm_scaling : float, optional The pixel to nm scaling factor, by default 1. Returns ------- npt.NDArray[np.float64] MxN Smoothed ordered trace coordinates. """ # Pool the trace points pixel_trace = molecule.ordered_coords pooled_trace = [] len_pixel_trace = len(pixel_trace) for i in range(len_pixel_trace): binned_points = [] current_length = 0 j = 1 # compile rolling window while current_length < rolling_window_size: current_index = i + j previous_index = i + j - 1 while current_index >= len_pixel_trace: current_index -= len_pixel_trace while previous_index >= len_pixel_trace: previous_index -= len_pixel_trace current_length += ( np.linalg.norm(pixel_trace[current_index] - pixel_trace[previous_index]) * pixel_to_nm_scaling ) binned_points.append(pixel_trace[current_index]) j += 1 # Get the mean of the binned points pooled_trace.append(np.mean(binned_points, axis=0)) return np.array(pooled_trace)
[docs] @staticmethod def pool_trace_linear( molecule: Molecule, rolling_window_size: np.float64 = 6.0, pixel_to_nm_scaling: float = 1, ) -> npt.NDArray[np.float64]: """ Smooth a pixelwise ordered trace of linear molecules via a sliding window. Parameters ---------- molecule : Molecule Molecule object with attribute ``.ordered_coords``, the ordered coordinates to be splined. rolling_window_size : np.float64, optional The length of the rolling window too average over, by default 6.0. pixel_to_nm_scaling : float, optional The pixel to nm scaling factor, by default 1. Returns ------- npt.NDArray[np.float64] MxN Smoothed ordered trace coordinates. """ pixel_trace = molecule.ordered_coords pooled_trace = [pixel_trace[0]] # Add first coord as to not cut it off len_pixel_trace = len(pixel_trace) # Get average point for trace in rolling window for i in range(0, len_pixel_trace): binned_points = [] current_length = 0 j = 0 # Compile rolling window while current_length < rolling_window_size: current_index = i + j previous_index = i + j - 1 if current_index >= len_pixel_trace: # exit if exceeding the trace break current_length += ( np.linalg.norm(pixel_trace[current_index] - pixel_trace[previous_index]) * pixel_to_nm_scaling ) binned_points.append(pixel_trace[current_index]) j += 1 else: # Get the mean of the binned points pooled_trace.append(np.mean(binned_points, axis=0)) # Exit if reached the end of the trace if current_index + 1 >= len_pixel_trace: break pooled_trace.append(pixel_trace[-1]) # Add last coord as to not cut it off # Check if the first two points are the same and remove the first point if they are # This can happen if the algorithm happens to add the first point naturally due to having a small # rolling window size. if np.array_equal(pooled_trace[0], pooled_trace[1]): pooled_trace.pop(0) # Check if the last two points are the same and remove the last point if they are # This can happen if the algorithm happens to add the last point naturally due to having a small # rolling window size. if np.array_equal(pooled_trace[-1], pooled_trace[-2]): pooled_trace.pop(-1) return np.array(pooled_trace)
[docs] def run_window_trace(self) -> tuple[npt.NDArray, dict]: """ Pipeline to run the rolling window smoothing and obtaining smoothing stats. Returns ------- tuple[npt.NDArray, dict] Tuple of Nx2 smoothed trace coordinates, and smoothed trace statistics. """ # fitted trace # fitted_trace = self.get_fitted_traces(self.ordered_trace, mol_is_circular) # splined trace if self.mol_is_circular: splined_trace = self.pool_trace_circular( self.mol_ordered_trace, self.rolling_window_size, self.pixel_to_nm_scaling ) if self.rolling_window_resampling: splined_trace = resample_points_regular_interval( points=splined_trace, interval=self.rolling_window_resample_interval_nm / self.pixel_to_nm_scaling, circular=True, ) else: splined_trace = self.pool_trace_linear( self.mol_ordered_trace, self.rolling_window_size, self.pixel_to_nm_scaling ) if self.rolling_window_resampling: splined_trace = resample_points_regular_interval( points=splined_trace, interval=self.rolling_window_resample_interval_nm / self.pixel_to_nm_scaling, circular=False, ) # compile contour length & end-to-end distance self.tracing_stats["contour_length"] = ( measure_contour_length(splined_trace, self.mol_is_circular, self.pixel_to_nm_scaling) * 1e-9 ) self.tracing_stats["end_to_end_distance"] = ( measure_end_to_end_distance(splined_trace, self.mol_is_circular, self.pixel_to_nm_scaling) * 1e-9 ) return splined_trace, self.tracing_stats
[docs] def measure_contour_length(splined_trace: npt.NDArray, mol_is_circular: bool, pixel_to_nm_scaling: float) -> float: """ Contour length for each of the splined traces accounting for whether the molecule is circular or linear. Contour length units are nm. Parameters ---------- splined_trace : npt.NDArray The splined trace. mol_is_circular : bool Whether the molecule is circular or not. pixel_to_nm_scaling : float Scaling factor from pixels to nanometres. Returns ------- float Length of molecule in nanometres (nm). """ if mol_is_circular: for num in range(len(splined_trace)): x1 = splined_trace[num - 1, 0] y1 = splined_trace[num - 1, 1] x2 = splined_trace[num, 0] y2 = splined_trace[num, 1] try: hypotenuse_array.append(math.hypot((x1 - x2), (y1 - y2))) except NameError: hypotenuse_array = [math.hypot((x1 - x2), (y1 - y2))] contour_length = np.sum(np.array(hypotenuse_array)) * pixel_to_nm_scaling del hypotenuse_array else: for num in range(len(splined_trace)): try: x1 = splined_trace[num, 0] y1 = splined_trace[num, 1] x2 = splined_trace[num + 1, 0] y2 = splined_trace[num + 1, 1] try: hypotenuse_array.append(math.hypot((x1 - x2), (y1 - y2))) except NameError: hypotenuse_array = [math.hypot((x1 - x2), (y1 - y2))] except IndexError: # IndexError happens at last point in array contour_length = np.sum(np.array(hypotenuse_array)) * pixel_to_nm_scaling del hypotenuse_array break return contour_length
[docs] def measure_end_to_end_distance(splined_trace, mol_is_circular, pixel_to_nm_scaling: float): """ Euclidean distance between the start and end of linear molecules. The hypotenuse is calculated between the start ([0,0], [0,1]) and end ([-1,0], [-1,1]) of linear molecules. If the molecule is circular then the distance is set to zero (0). Parameters ---------- splined_trace : npt.NDArray The splined trace. mol_is_circular : bool Whether the molecule is circular or not. pixel_to_nm_scaling : float Scaling factor from pixels to nanometres. Returns ------- float Length of molecule in nanometres (nm). """ if not mol_is_circular: return ( math.hypot((splined_trace[0, 0] - splined_trace[-1, 0]), (splined_trace[0, 1] - splined_trace[-1, 1])) * pixel_to_nm_scaling ) return 0
# pylint: disable=too-many-arguments # pylint: disable=too-many-locals
[docs] def splining_image( topostats_object: TopoStats, method: str | None = None, rolling_window_size: float | None = None, spline_step_size: float | None = None, spline_linear_smoothing: float | None = None, spline_circular_smoothing: float | None = None, spline_degree: int | None = None, rolling_window_resampling: bool | None = None, rolling_window_resample_regular_spatial_interval: float | None = None, ) -> None: # pylint: disable=too-many-arguments # pylint: disable=too-many-locals """ Obtain smoothed traces of pixel-wise ordered traces for molecules in an image. Parameters ---------- topostats_object : TopoStats TopoStats object with ordered traces of grain crops to be splined. method : str Method of trace smoothing, options are 'splining' and 'rolling_window'. rolling_window_size : float Length in meters to average coordinates over in the rolling window. spline_step_size : float Step length in meters to use a coordinate for splining. spline_linear_smoothing : float Amount of linear spline smoothing. spline_circular_smoothing : float Amount of circular spline smoothing. spline_degree : int Degree of the spline. Cubic splines are recommended. Even values of k should be avoided especially with a small s-value. rolling_window_resampling : bool, optional Whether to resample the rolling window, by default False. rolling_window_resample_regular_spatial_interval : float, optional The regular spatial interval (nm) to resample the rolling window, by default 0.5. """ config = topostats_object.config["splining"].copy() splining_method = config["method"] if method is None else method rolling_window_size = config["rolling_window_size"] if rolling_window_size is None else rolling_window_size spline_step_size = config["spline_step_size"] if spline_step_size is None else spline_step_size spline_linear_smoothing = ( config["spline_linear_smoothing"] if spline_linear_smoothing is None else spline_linear_smoothing ) spline_circular_smoothing = ( config["spline_circular_smoothing"] if spline_circular_smoothing is None else spline_circular_smoothing ) spline_degree = config["spline_degree"] if spline_degree is None else spline_degree rolling_window_resampling = ( config["rolling_window_resampling"] if rolling_window_resampling is None else rolling_window_resampling ) rolling_window_resample_regular_spatial_interval = ( config["rolling_window_resample_regular_spatial_interval"] if rolling_window_resample_regular_spatial_interval is None else rolling_window_resample_regular_spatial_interval ) # iterate through ordered_trace.molecule_data adding statistics as attributes to the Molecule objects if topostats_object.grain_crops is not None: # pylint: disable=too-many-nested-blocks for grain_no, grain_crop in topostats_object.grain_crops.items(): mol_no = None if grain_crop.ordered_trace is not None and grain_crop.ordered_trace.molecule_data is not None: for mol_no, molecule in grain_crop.ordered_trace.molecule_data.items(): try: LOGGER.info( f"[{topostats_object.filename}] : Splining Grain {grain_no + 1} Molecule {mol_no + 1}" ) # check if want to do nodestats tracing or not if splining_method == "rolling_window": splined_data, tracing_stats = windowTrace( topostats_object=topostats_object, grain=grain_no, molecule=mol_no, rolling_window_size=rolling_window_size, rolling_window_resampling=rolling_window_resampling, rolling_window_resample_interval_nm=rolling_window_resample_regular_spatial_interval, ).run_window_trace() # if not doing nodestats ordering, do original TopoStats ordering elif splining_method == "spline": splined_data, tracing_stats = splineTrace( topostats_object=topostats_object, grain=grain_no, molecule=mol_no, spline_step_size=spline_step_size, spline_linear_smoothing=spline_linear_smoothing, spline_circular_smoothing=spline_circular_smoothing, spline_degree=spline_degree, ).run_spline_trace() else: raise ValueError( f"Invalid parameter for splining.method : {splining_method}\n" "Please change your configuration, valid values are 'rolling_window' or 'splining'." ) # Add statistics to Molecule object molecule.splined_coords = splined_data molecule.end_to_end_distance = tracing_stats["end_to_end_distance"] molecule.contour_length = tracing_stats["contour_length"] molecule.bbox = grain_crop.bbox LOGGER.debug(f"[{topostats_object.filename}] : Finished splining {grain_no} - {mol_no}") except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error( f"[{topostats_object.filename}] : Splining for {grain_no} failed. " "Consider raising an issue on GitHub. Error: ", exc_info=e, ) if mol_no is None: LOGGER.warning(f"[{topostats_object.filename}] : No molecules found for grain {grain_no}") else: LOGGER.warning("f[{topostats_objec.filename}] : No grains to spline.")
[docs] def interpolate_between_two_points_distance( point1: npt.NDArray[np.float32], point2: npt.NDArray[np.float32], distance: np.float32 ) -> npt.NDArray[np.float32]: """ Interpolate between two points to create a new point at a set distance between the two. Parameters ---------- point1 : npt.NDArray[np.float32] The first point. point2 : npt.NDArray[np.float32] The second point. distance : np.float32 The distance to interpolate between the two points. Returns ------- npt.NDArray[np.float32] The new point at the specified distance between the two points. """ distance_between_points = np.linalg.norm(point2 - point1) assert ( distance_between_points > distance ), f"distance between points is less than the desired interval: {distance_between_points} < {distance}" proportion = distance / distance_between_points return point1 + proportion * (point2 - point1)
[docs] def resample_points_regular_interval(points: npt.NDArray, interval: float, circular: bool) -> npt.NDArray: """ Resample a set of points to be at regular spatial intervals. Note: This is NOT intended to be pure interpolation, as interpolated points would not produce uniformly spaced points in cartesian space. Parameters ---------- points : npt.NDArray The points to resample. interval : float The distance that all returned points should be apart. circular : bool If True, the first and last points will be connected to form a closed loop. If False, the first and last points will not be connected. Returns ------- npt.NDArray The resampled points, evenly spaced at the specified interval. """ if circular: points = np.concatenate((points, points[0:1]), axis=0) resampled_points = [] resampled_points.append(points[0]) current_point_index = 1 while True: current_point = resampled_points[-1] next_original_point = points[current_point_index] distance_to_next_splined_point = np.linalg.norm(next_original_point - current_point) # if the distance to the next splined point is less than the interval, then skip to the next point if distance_to_next_splined_point < interval: current_point_index += 1 if current_point_index >= len(points): break continue new_interpolated_point = interpolate_between_two_points_distance( point1=current_point, point2=next_original_point, distance=interval ) resampled_points.append(new_interpolated_point) # if the first and last points are less than 0.5 * the interval apart, then remove the last point if np.linalg.norm(resampled_points[0] - resampled_points[-1]) < 0.5 * interval: resampled_points = resampled_points[:-1] return np.array(resampled_points)