From cbd2837dbe82ad2289b1a6331f22c79f54525557 Mon Sep 17 00:00:00 2001 From: "Andrew L. Mullen" Date: Mon, 3 Jun 2024 16:08:58 -0400 Subject: [PATCH 1/4] change mosaicking to gdal and account for images with different projections within the same composite group --- .../pipelines/composite_pipeline_v2.py | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 dl_water_bodies/pipelines/composite_pipeline_v2.py diff --git a/dl_water_bodies/pipelines/composite_pipeline_v2.py b/dl_water_bodies/pipelines/composite_pipeline_v2.py new file mode 100644 index 0000000..eb0d8b7 --- /dev/null +++ b/dl_water_bodies/pipelines/composite_pipeline_v2.py @@ -0,0 +1,197 @@ +# Composite pipeline +import os +import rasterio +import logging +import numpy as np +import numba as nb +import pandas as pd +from osgeo import osr +import rioxarray as rxr +from multiprocessing import Pool +from glob import glob +from dl_water_bodies.utils.gs_utils import write_geotiff +import re +from datetime import datetime +from pympler.tracker import SummaryTracker +import gc +import time +import subprocess +from dask.distributed import Client, LocalCluster, Lock +import xarray as xr + +# ----------------------------------------------------------------------------- +# class Composite +# ----------------------------------------------------------------------------- +class Composite(object): + + def __init__(self, prediction_dir: str, mask_dir: str, years: str, months: str, + output_dir: str, composite_name: str, nodata: int = 0, + logger: logging.Logger = None) -> None: + + self._prediction_dir = prediction_dir + self._mask_dir = mask_dir + + self._years = years + self._months = months + + self._output_dir = output_dir + self._composite_name = composite_name + + self._nodata = nodata + + self._logger = logger + + self._prediction_paths = self.get_prediction_filepaths() + self._mask_paths = self.get_corresponding_mask_filepaths() + self._crs = None + # ------------------------------------------------------------------------- + # build_composite + # ------------------------------------------------------------------------- + def get_prediction_filepaths(self) -> list: + """i + Get filename from list of regexes + """ + # get the paths/filenames of the regex + filenames = [] + print(self._years) + print(self._months) + for im in os.listdir(self._prediction_dir): + + if im.endswith('.tif'): + date=re.search(r'\d{8}', im) + + timestamp = datetime.strptime(date.group(), '%Y%m%d').date() + + if timestamp.month in self._months: + if timestamp.year in self._years: + + if (timestamp.month == 5) and (timestamp.day<15): + print(f'{timestamp} during snow period, skipping') + pass + elif (timestamp.month == 9) and (timestamp.day>15): + print(f'{timestamp} during snow period, skipping') + pass + else: + print(timestamp) + filenames.append(os.path.join(self._prediction_dir, im)) + + return sorted(filenames) + + def get_corresponding_mask_filepaths(self) -> list: + """ + get masks corresponding to predictions, if no mask, will be None + """ + #get all mask dates + mask_dates = [re.search(r'\d{8}', d).group() for d in os.listdir(self._mask_dir)] + + #get full mask paths + mask_paths = [os.path.join(self._mask_dir, m) for m in os.listdir(self._mask_dir)] + + #empty array to fill with mask paths corresponding to prediction paths + corresponding_paths = [] + + #iterate predictions + for p in self._prediction_paths: + date=re.search(r'\d{8}', p).group() + + if date in mask_dates: + idx = mask_dates.index(date) + corresponding_paths.append(mask_paths[idx]) + + else: + corresponding_paths.append(None) + + return corresponding_paths + + def mask_pl_im_with_UDM2(self, pl_pred_path, pl_mask_path): + + pl_pred = rxr.open_rasterio(pl_pred_path) + + pl_mask = rxr.open_rasterio(pl_mask_path) + + pl_pred = pl_pred.where((pl_mask[0]==1) & (pl_mask[1]==0) & (pl_mask[2]==0) + & (pl_mask[3]==0) & (pl_mask[4]==0) & (pl_mask[5]==0), 255) + return pl_pred + + def mosaic_gdal(self, infiles, outfile): + + #infiles = ' '.join(infiles) + + if os.path.exists(outfile): + os.remove(outfile) + + #gdal_command = ['gdal_merge.py', '-o', outfile, '-ot', 'UInt16', '-init', '255', '-n', '255', '-separate'] + infiles + gdal_command = ['gdal_merge.py', '-o', outfile, '-ot', 'UInt16', '-init', '255', '-a_nodata', '255', '-separate', '-co', 'BIGTIFF=IF_SAFER'] + infiles + print(gdal_command) + + result = subprocess.run(gdal_command, capture_output=True, text=True) + + if result.returncode == 0: + print("Merging successful") + else: + print("Error occurred:", result.stderr) + + def prep_prediction_for_composite(self, pl_pred_path, pl_mask_path, out_path, reproject='ESRI:102001'): + + print('masking Planet with UDM2') + pl_im_masked = self.mask_pl_im_with_UDM2(pl_pred_path, pl_mask_path) + + pl_im_masked = pl_im_masked.rio.write_nodata(255) + + if not reproject is None: + pl_im_masked = pl_im_masked.rio.reproject(reproject, nodata=255) + + print(f'saving to: {out_path}') + pl_im_masked.rio.to_raster(out_path, tiled=True, compress='deflate') + + del pl_im_masked + + return + + + def composite_image_group(self, temp_dir='temp/'): + t1_start = time.time() + + if not os.path.isdir(temp_dir): + os.mkdir(temp_dir) + + if not os.path.isdir(self._output_dir): + os.mkdir(self._output_dir) + + temp_output_paths = [os.path.join(temp_dir, p.split('/')[-1][:-4]+'_temp.tif') for p in self._prediction_paths] + + with Pool() as pool: + pool.starmap(self.prep_prediction_for_composite, zip(self._prediction_paths, self._mask_paths, temp_output_paths)) + + print('mosaicking') + self.mosaic_gdal(temp_output_paths, os.path.join(temp_dir, 'temp_mosaic.tif')) + + with LocalCluster(n_workers=10, threads_per_worker=1, memory_limit='64GB') as cluster, Client(cluster) as client: + + #mosaic = rxr.open_rasterio(os.path.join(temp_dir, 'temp_mosaic.tif'), masked=True, chunks={'x':1000, 'y': 1000}, lock=False) + mosaic = rxr.open_rasterio(os.path.join(temp_dir, 'temp_mosaic.tif'), masked=True) + + print('compositing') + mean = mosaic.mean(dim='band')#.round() + mean = mean.fillna(255) + mean = mean.where((mean<0.4) | (mean==255), 1) + mean = mean.where(mean>=0.4, 0) + + mean.rio.write_crs(mosaic.rio.crs, inplace=True) + mean.rio.write_transform(mosaic.rio.transform(), inplace=True) + mean.rio.write_nodata(255, inplace=True) + + #mean.rio.to_raster(os.path.join(self._output_dir, self._composite_name), tiled=True, compress='deflate', dtype=np.uint8, lock=Lock("rio", client=client), compute=True) + + mean.rio.to_raster(os.path.join(self._output_dir, self._composite_name), tiled=True, compress='deflate', dtype=np.uint8) + + del mean + del mosaic + + t1_stop = time.time() + + for f in glob(os.path.join(temp_dir,'*')): + os.remove(f) + + print("Elapsed time (s):", t1_stop-t1_start) + From 53123debc8bae7adf7c81899ec902f2c5964ada9 Mon Sep 17 00:00:00 2001 From: "Andrew L. Mullen" Date: Mon, 3 Jun 2024 16:12:59 -0400 Subject: [PATCH 2/4] change composite pipeline to use composite v2 --- dl_water_bodies/view/composite_pipeline_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dl_water_bodies/view/composite_pipeline_cli.py b/dl_water_bodies/view/composite_pipeline_cli.py index 2d8f600..c328df5 100644 --- a/dl_water_bodies/view/composite_pipeline_cli.py +++ b/dl_water_bodies/view/composite_pipeline_cli.py @@ -4,7 +4,7 @@ import sys import os -from dl_water_bodies.pipelines.composite_pipeline import Composite +from dl_water_bodies.pipelines.composite_pipeline_v2 import Composite # ----------------------------------------------------------------------------- # main @@ -74,7 +74,7 @@ def main(): nodata=args.nodata, logger=logger) - compositePipeline.build_composites() + compositePipeline.composite_image_group() # ----------------------------------------------------------------------------- From 93c7942e513fee0942e99f793b25c8b0b27e5c63 Mon Sep 17 00:00:00 2001 From: "Andrew L. Mullen" Date: Mon, 3 Jun 2024 16:14:07 -0400 Subject: [PATCH 3/4] use image and mask directories instead of regex --- dl_water_bodies/view/dlwater_pipeline_cli.py | 32 ++++++-------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/dl_water_bodies/view/dlwater_pipeline_cli.py b/dl_water_bodies/view/dlwater_pipeline_cli.py index 7cf21f3..760900c 100644 --- a/dl_water_bodies/view/dlwater_pipeline_cli.py +++ b/dl_water_bodies/view/dlwater_pipeline_cli.py @@ -4,15 +4,13 @@ import argparse from dl_water_bodies.pipelines.dlwater_pipeline import WaterMaskPipeline - # ----------------------------------------------------------------------------- # main # # python dlwater_pipeline_cli.py -c config.yaml -d config.csv -s preprocess -# ----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- def main(): - # Process command-line args. desc = 'Use this application to perform CNN segmentation.' parser = argparse.ArgumentParser(description=desc) @@ -24,14 +22,6 @@ def main(): dest='config_file', help='Path to the configuration file') - parser.add_argument('-d', - '--data-csv', - type=str, - required=False, - default=None, - dest='data_csv', - help='Path to the data configuration file') - parser.add_argument('-s', '--step', type=str, @@ -59,21 +49,21 @@ def main(): help='Path to output directory') parser.add_argument('-r', - '--regex-list', + '--image-dir', type=str, nargs='*', required=False, - dest='inference_regex_list', - help='Inference regex list', + dest='image_dir', + help='directory with images to predict', default=['*.tif']) parser.add_argument('-mr', - '--mask-regex-list', + '--mask-dir', type=str, nargs='*', required=False, - dest='mask_regex_list', - help='Mask regex list', + dest='mask_dir', + help='directory with corresponding image masks', default=['*.tif']) parser.add_argument('-f', @@ -83,7 +73,6 @@ def main(): dest='force_delete', action='store_true', help='Force the deletion of lock files') - args = parser.parse_args() # Setup timer to monitor script execution time @@ -92,11 +81,10 @@ def main(): # Initialize pipeline object pipeline = WaterMaskPipeline( args.config_file, - args.data_csv, args.model_filename, args.output_dir, - args.inference_regex_list, - args.mask_regex_list, + args.image_dir, + args.mask_dir, args.force_delete ) @@ -108,7 +96,7 @@ def main(): if "predict" in args.pipeline_step: pipeline.predict() - logging.info(f'Took {(time.time()-timer)/60.0:.2f} min.') + logging.info('Took {} min.'.format((time.time()-timer)/60.0)) return From 324db79572b0bd2731a7a4676749c3369a46647c Mon Sep 17 00:00:00 2001 From: "Andrew L. Mullen" Date: Mon, 3 Jun 2024 16:15:39 -0400 Subject: [PATCH 4/4] smarter association of planet images with UDM2 masks --- dl_water_bodies/pipelines/dlwater_pipeline.py | 117 ++++++++++++------ 1 file changed, 78 insertions(+), 39 deletions(-) diff --git a/dl_water_bodies/pipelines/dlwater_pipeline.py b/dl_water_bodies/pipelines/dlwater_pipeline.py index c188b3d..12b6ab7 100644 --- a/dl_water_bodies/pipelines/dlwater_pipeline.py +++ b/dl_water_bodies/pipelines/dlwater_pipeline.py @@ -47,6 +47,7 @@ __status__ = "Production" __all__ = ["cp"] +os.environ["SM_FRAMEWORK"] = "tf.keras" # ----------------------------------------------------------------------------- # class WaterMaskPipeline @@ -59,11 +60,10 @@ class WaterMaskPipeline(object): def __init__( self, config_filename: str = None, - data_csv: str = None, model_filename: str = None, output_dir: str = None, - inference_regex_list: str = None, - mask_regex_list: str = None, + image_dir: str = None, + mask_dir: str = None, force_delete: bool = False, default_config: str = 'templates/watermask_default.yaml', logger=None @@ -74,7 +74,8 @@ def __init__( # Set logger self.logger = logger if logger is not None else self._set_logger() - + self.mask_dir=mask_dir + self.image_dir=image_dir # Configuration file intialization if config_filename is None: config_filename = os.path.join( @@ -95,30 +96,20 @@ def __init__( if output_dir is not None: self.conf.inference_save_dir = output_dir os.makedirs(self.conf.inference_save_dir, exist_ok=True) - + # rewrite inference regex list - if inference_regex_list is not None: - self.conf.inference_regex_list = inference_regex_list + #if image_dir is not None: + #self.conf.inference_regex_list = self.get_image_filepaths(image_dir) + #self.conf.inference_regex_list = self.get_filenames(image_dir) # rewrite mask regex list - if mask_regex_list is not None: - self.conf.mask_regex_list = mask_regex_list - - # Set Data CSV - self.data_csv = data_csv + #if mask_dir is not None: + #self.conf.mask_regex_list = self.get_corresponding_mask_filepaths(mask_dir) # Force deletion self.force_delete = force_delete - # Set output directories and locations - self.images_dir = os.path.join(self.conf.data_dir, 'images') - os.makedirs(self.images_dir, exist_ok=True) - self.logger.info(f'Images dir: {self.images_dir}') - - self.labels_dir = os.path.join(self.conf.data_dir, 'labels') - os.makedirs(self.labels_dir, exist_ok=True) - self.logger.info(f'Images dir: {self.labels_dir}') - + #Set model path self.model_dir = self.conf.model_dir os.makedirs(self.model_dir, exist_ok=True) @@ -179,6 +170,44 @@ def get_filenames(self, data_regex: str) -> list: filenames = glob(data_regex) assert len(filenames) > 0, f'No files under {data_regex}' return sorted(filenames) + + def get_image_filepaths(self, image_dir: str) -> list: + """i + Get filename from list of regexes + """ + # get the paths/filenames of the regex + filenames = [] + + for im in os.listdir(image_dir): + filenames.append(os.path.join(image_dir, im)) + + return sorted(filenames) + + def get_corresponding_mask_filepaths(self, data_filenames, mask_dir: str) -> list: + """ + get masks corresponding to predictions, if no mask, will be None + """ + #get all mask dates + mask_dates = [re.search(r'\d{8}', d).group() for d in os.listdir(mask_dir)] + + #get full mask paths + mask_paths = [os.path.join(mask_dir, m) for m in os.listdir(mask_dir)] + + #empty array to fill with mask paths corresponding to prediction paths + corresponding_paths = [] + + #iterate predictions + for p in data_filenames: + date=re.search(r'\d{8}', p).group() + + if date in mask_dates: + idx = mask_dates.index(date) + corresponding_paths.append(mask_paths[idx]) + + else: + corresponding_paths.append("") + + return corresponding_paths # ------------------------------------------------------------------------- # Utils @@ -218,20 +247,16 @@ def predict(self) -> None: preprocess_input = sm.get_preprocessing(BACKBONE) # Gather filenames to predict - if len(self.conf.inference_regex_list) > 0: - data_filenames = sorted( - self.get_filenames(self.conf.inference_regex_list)) - else: - data_filenames = sorted( - self.get_filenames(self.conf.inference_regex)) + + data_filenames = sorted(self.get_filenames(self.image_dir)) + + logging.info(f'{len(data_filenames)} files to predict') # Gather filenames to mask predictions - if len(self.conf.mask_regex_list) > 0: - mask_filenames = sorted( - self.get_filenames(self.conf.mask_regex_list)) - else: - mask_filenames = sorted(self.get_filenames(self.conf.mask_regex)) + + mask_filenames = self.get_corresponding_mask_filepaths(data_filenames, self.mask_dir) + logging.info(f'{len(mask_filenames)} masks for prediction') # iterate files, create lock file to avoid predicting the same file @@ -243,13 +268,13 @@ def predict(self) -> None: # set output directory basename = os.path.basename(os.path.dirname(filename)) output_directory = os.path.join( - self.conf.inference_save_dir, basename) + self.conf.inference_save_dir)#, basename) os.makedirs(output_directory, exist_ok=True) # set prediction output filename output_filename = os.path.join( output_directory, - f'{Path(filename).stem}-{self.conf.experiment_type}.tif') + f'{Path(filename).stem}_{self.conf.experiment_type}.tif') # lock file for multi-node, multi-processing lock_filename = f'{output_filename}.lock' @@ -274,8 +299,9 @@ def predict(self) -> None: logging.info(f'Prediction shape: {image.shape}') # open mask filename - mask = rxr.open_rasterio(mask_filename) - logging.info(f'Mask shape: {mask.shape}') + if mask_filename: + mask = rxr.open_rasterio(mask_filename) + logging.info(f'Mask shape: {mask.shape}') # check bands in imagery, do not proceed if one band if image.shape[0] == 1: @@ -294,6 +320,17 @@ def predict(self) -> None: # scale image values, PlanetScope specific normalization image = (image / 10000.0) * 255 + + #get metadata for later + crs=image.rio.crs + transform=image.rio.transform() + + + # set nodata regions to 0.5 + #image_zero_mask = image==0 + + image = xr.where(image==0, 0.5, image) + temporary_tif = preprocess_input(image.values) # Sliding window prediction @@ -320,8 +357,9 @@ def predict(self) -> None: nodata = 0 # prediction.rio.nodata # Mask out using the Planet quota - mask = np.squeeze(mask[0, :, :].values) - prediction[mask == 0] = nodata + if mask_filename: + mask = np.squeeze(mask[0, :, :].values) + prediction[mask == 0] = nodata # TODO: Fix no-data from corners to 255 # nodata_mask = np.squeeze(image[:, :, 0].values) @@ -353,7 +391,8 @@ def predict(self) -> None: # prediction.rio.write_nodata( # self.conf.prediction_nodata, encoded=True, inplace=True) prediction.rio.write_nodata(nodata, encoded=True, inplace=True) - + prediction.rio.write_crs(crs, inplace=True) + prediction.rio.write_transform(transform, inplace=True) # Save output raster file to disk prediction.rio.to_raster( output_filename,