Source code for topostats.topotracing

"""Run Topotracing at the command line."""
import argparse as arg
from functools import partial
import logging
from multiprocessing import Pool
from pathlib import Path
from typing import Union, Dict
import warnings

from tqdm import tqdm

from topostats.filters import (
    extract_img_name,
    extract_channel,
    extract_pixels,
    extract_pixel_to_nm_scaling,
    amplify,
    align_rows,
    remove_x_y_tilt,
    average_background,
)
from topostats.grains import (
    gaussian_filter,
    tidy_border,
    remove_objects,
    calc_minimum_grain_size,
    label_regions,
    colour_regions,
    region_properties,
    get_bounding_boxes,
    save_region_stats,
)
from topostats.grainstats import GrainStats
from topostats.io import read_yaml, load_scan
from topostats.plottingfuncs import plot_and_save
from topostats.logs.logs import LOGGER_NAME
from topostats.utils import convert_path, find_images, update_config, get_mask, get_threshold

LOGGER = logging.getLogger(LOGGER_NAME)


[docs]def create_parser() -> arg.ArgumentParser: """Create a parser for reading options.""" parser = arg.ArgumentParser( description="Process AFM images. Additional arguments over-ride those in the configuration file." ) parser.add_argument( "-c", "--config_file", dest="config_file", required=True, help="Path to a YAML configuration file." ) parser.add_argument( "-b", "--base_dir", dest="base_dir", type=str, required=False, help="Base directory to scan for images." ) parser.add_argument( "-o", "--output_dir", dest="output_dir", type=str, required=False, help="Output directory to write results to." ) parser.add_argument("-f", "--file_ext", dest="file_ext", help="File extension to scan for") parser.add_argument( "-a", "--amplify_level", dest="amplify_level", type=float, required=False, help="Amplify signals by the given factor.", ) parser.add_argument("-m", "--mask", dest="mask", type=bool, required=False, help="Mask the image.") parser.add_argument( "-w", "--warnings", dest="warnings", type=str, required=False, help="Whether to ignore warnings (ignore (default); deprecation).", ) parser.add_argument("-q", "--quiet", dest="quiet", type=bool, required=False, help="Toggle verbosity.") return parser
[docs]def process_scan( image_path: Union[str, Path] = None, channel: str = "Height", amplify_level: float = 1.0, gaussian_size: Union[int, float] = 2, mode: str = "nearest", threshold_multiplier: Union[int, float] = 1.7, background: float = 0.0, save_plots: bool = True, output_dir: Union[str, Path] = "output", ) -> None: """Process a single image, filtering, finding grains and calculating their statistics. Parameters ---------- image_path : Union[str, Path] Path to image to process. channel : str Channel to extract and process, default 'height'. amplify_level : float Level to amplify image prior to processing by. gaussian_size : Union[int, float] Minimum grain size in nanometers (nm). mode : str Mode for filtering (default is 'nearest'). threshold_multiplier : Union[int, float] Factor by which lower threshold is to be scaled prior to masking. background : float save_plots : bool Flag as to whether to save plots to PNG files. output_dir : Union[str, Path] Path to output directory for saving results. Examples -------- from topostats.topotracing import process_scan process_scan(image_path='minicircle.spm', save_plots=True, output_dir='output/') """ LOGGER.info(f"Processing : {image_path}") # Create output directory img_name = extract_img_name(image_path) output_dir = Path(output_dir) / f"{img_name}" output_dir.mkdir(parents=True, exist_ok=True) LOGGER.info(f"Created output directory : {output_dir}") # Load image, extract channel and pixel to nm scaling image = load_scan(image_path) LOGGER.info(f"[{img_name}] : Loaded image.") extracted_channel = extract_channel(image, channel) LOGGER.info(f"[{img_name}] : Extracted {channel}.") pixels = extract_pixels(extracted_channel) LOGGER.info(f"[{img_name}] : Pixels extracted.") pixel_nm_scaling = extract_pixel_to_nm_scaling(extracted_channel) LOGGER.info(f"[{img_name}] : Pixel to nanometre scaling extracted from image : {pixel_nm_scaling}") # Amplify image if amplify_level != 1.0: pixels = amplify(pixels, amplify_level) LOGGER.info(f"[{img_name}] : Image amplified by {amplify_level}.") # First pass filtering (no mask) initial_align = align_rows(pixels, mask=None) LOGGER.info(f"[{img_name}] : Initial alignment (unmasked) complete.") initial_tilt_removal = remove_x_y_tilt(initial_align, mask=None) LOGGER.info(f"[{img_name}] : Initial tilt removal (unmasked) complete.") # Create mask threshold = get_threshold(initial_tilt_removal) mask = get_mask(initial_tilt_removal, threshold) LOGGER.info(f"[{img_name}] : Mask created.") # Second pass filtering (with mask based on threshold) second_align = align_rows(initial_tilt_removal, mask=mask) LOGGER.info(f"[{img_name}] : Secondary alignent (masked) complete.") second_tilt_removal = remove_x_y_tilt(second_align, mask=mask) LOGGER.info(f"[{img_name}] : Secondary tilt removal (masked) complete.") # Average background zero_averaged_background = average_background(second_tilt_removal, mask=mask) LOGGER.info(f"[{img_name}] : Background zero-averaged.") # Get threshold lower_threshold = get_threshold(zero_averaged_background) * threshold_multiplier # Find grains # Apply a gaussian filter (using pySPM derived pixel_nm_scaling) gaussian_filtered = gaussian_filter( zero_averaged_background, gaussian_size=gaussian_size, pixel_to_nm_scaling=pixel_nm_scaling, mode=mode ) LOGGER.info( f"[{img_name}] : Gaussian filter applied (size : {gaussian_size}; : Pixel to Nanometer Scaling {pixel_nm_scaling}; mode : {mode})" ) # Create a boolean image boolean_image_mask = get_mask(gaussian_filtered, threshold=lower_threshold) # Tidy borders tidied_borders = tidy_border(boolean_image_mask) LOGGER.info(f"[{img_name}] : Borders tidied.") # Get threshold for small objects, need to first label all regions # Calculate minimum grain size in pixels labelled_regions = label_regions(tidied_borders) minimum_grain_size = calc_minimum_grain_size(labelled_regions, background=background) LOGGER.info(f"[{img_name}] : Minimum grain size in pixels calculated : {minimum_grain_size}.") # Remove objects small_objects_removed = remove_objects( tidied_borders, minimum_grain_size_pixels=minimum_grain_size, pixel_to_nm_scaling=pixel_nm_scaling ) LOGGER.info(f"[{img_name}] : Small objects (< {minimum_grain_size} pixels) removed.") # Label regions after cleaning regions_labelled = label_regions(small_objects_removed, background=background) LOGGER.info(f"[{img_name}] : Regions labelled.") # Colour regions after cleaning coloured_regions = colour_regions(regions_labelled) LOGGER.info(f"[{img_name}] : Regions coloured.") # Extract region properties after cleaning image_region_properties = region_properties(regions_labelled) LOGGER.info(f"[{img_name}] : Properties extracted for regions.") # Derive bounding boxes and save statistics # FIXME : This may well be redundant as bounding boxes are added by grainstats, which to keep? bounding_boxes = get_bounding_boxes(image_region_properties) LOGGER.info(f"[{img_name}] : Extracted bounding boxes") # Calculate grain statistics grainstats = GrainStats( data=zero_averaged_background, labelled_data=regions_labelled, pixel_to_nanometre_scaling=pixel_nm_scaling, img_name=img_name, output_dir=output_dir, ) grain_statistics = grainstats.calculate_stats() # Optionally save images of each stage of processing. # Could perhaps improve to make plots either individual or a faceted single image. # Also saving arrays to a dictionary and having an associated dictionary with the same keys and values containing # filename and title would make this considerably less code. if save_plots: plot_and_save(pixels, output_dir / "01-raw_heightmap.png", title="Raw Height") plot_and_save(initial_align, output_dir / "02-initial_align_unmasked.png", title="Initial Alignment (Unmasked)") plot_and_save( initial_tilt_removal, output_dir / "03-initial_tilt_removal_unmasked.png", title="Initial Tilt Removal (Unmasked)", ) plot_and_save(mask, output_dir / "04-binary_mask.png", title="Binary Mask") plot_and_save(second_align, output_dir / "05-secondary_align_masked.png", title="Secondary Alignment (Masked)") plot_and_save( second_tilt_removal, output_dir / "06-secondary_tilt_removal_masked.png", title="Secondary Tilt Removal (Masked)", ) plot_and_save( zero_averaged_background, output_dir / "07-zero_average_background.png", title="Zero Average Background" ) plot_and_save(gaussian_filtered, output_dir / "08-gaussian_filtered.png", title="Gaussian Filtered") plot_and_save(boolean_image_mask, output_dir / "09-boolean.png", title="Boolean Mask") plot_and_save(tidied_borders, output_dir / "10-tidy_borders.png", title="Tidied Borders") plot_and_save(small_objects_removed, output_dir / "11-small_objects_removed.png", title="Small Objects Removed") plot_and_save(regions_labelled, output_dir / "12-labelled_regions.png", title="Labelled Regions") plot_and_save(coloured_regions, output_dir / "13-coloured_regions.png", title="Coloured Regions") plot_and_save( coloured_regions, output_dir / "14-bounding_boxes.png", region_properties=image_region_properties, title="Bounding Boxes", ) grainstats.save_plot( grain_statistics["plot"], title="Labelled Image with Bounding Boxes", filename="15-labelled_image_bboxes.png", )
[docs]def main(): """Run processing.""" # Parse command line options, load config and update with command line options parser = create_parser() args = parser.parse_args() config = read_yaml(args.config_file) config = update_config(config, args) LOGGER.info(f"Configuration file loaded from : {args.config_file}") LOGGER.info(f'Scanning for images in : {config["base_dir"]}') LOGGER.info(f'Output directory : {config["output_dir"]}') LOGGER.info(f'Looking for images with extension : {config["file_ext"]}') img_files = find_images(config["base_dir"]) LOGGER.info(f'Images with extension {config["file_ext"]} in {config["base_dir"]} : {len(img_files)}') LOGGER.debug(f"Configuration : {config}") # Optionally ignore all warnings or just show deprecation warnings if config["warnings"] == "ignore": warnings.filterwarnings("ignore") LOGGER.info("NB : All warnings have been turned off for this run.") elif config["warnings"] == "deprecated": def fxn(): warnings.warn("deprecated", DeprecationWarning) with warnings.catch_warnings(): warnings.simplefilter("ignore") fxn() if config["quiet"]: LOGGER.setLevel("ERROR") # For debugging (as Pool makes it hard to find things when they go wrong) # for x in img_files: # process_scan(image_path=x, # amplify_level=config['amplify_level'], # channel=config['channel'], # gaussian_size=config['grains']['gaussian_size'], # dx=config['grains']['dx'], # mode=config['grains']['mode'], # lower_threshold_otsu_multiplier=config['grains']['lower_threshold_otsu_multiplier'], # minimum_grain_size=config['grains']['minimum_grain_size'], # background=config['grains']['background'], # save_plots=config['save_plots'], # output_dir=config['output_dir']) # Process all images found in parallel (constrainged by 'cores' option in config). processing_function = partial( process_scan, amplify_level=config["amplify_level"], channel=config["channel"], gaussian_size=config["grains"]["gaussian_size"], mode=config["grains"]["mode"], threshold_multiplier=config["grains"]["threshold_multiplier"], background=config["grains"]["background"], save_plots=config["save_plots"], output_dir=config["output_dir"], ) with Pool(processes=config["cores"]) as pool: with tqdm( total=len(img_files), desc=f'Processing {len(img_files)} images from {config["base_dir"]}, results are under {config["output_dir"]}', ) as pbar: for _ in pool.imap_unordered(processing_function, img_files): pbar.update()
if __name__ == "__main__": main()