Source code for OptiDamTool.analysis

import GeoAnalyze
import geopandas
import pandas
import bs4
import re
import json
import typing
import tempfile
import os
from . import utility


[docs] class Analysis: ''' Provide methods for analyzing simulation outputs and generating insights. '''
[docs] def sediment_delivery_to_stream_json( self, info_file: str, segsed_file: str, cumsed_file: str, json_file: str ) -> pandas.DataFrame: ''' Integrate sediment delivery data into stream segments using WaTEM/SEDEM outputs, with the extension `Output per river segment = 1 <https://watem-sedem.github.io/watem-sedem/model_extensions.html#output-per-river-segment>`_ enabled. Parameters ---------- info_file : str Path to the input information TXT file ``stream_information.txt``, produced by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. segsed_file : str Path to the input TXT file ``Total sediment segments.txt``, generated by a WaTEM/SEDEM simulation. This file contains sediment inflow (in kilograms) to each stream segment from its own subbasin area. cumsed_file : str Path to the input TXT file ``Cumulative sediment segments.txt``, generated by a WaTEM/SEDEM simulation. This file contains sediment inflow (in kilograms) to each stream segment from all upstream segments. json_file : str Path to the output JSON file to save the output DataFrame after integrating sediment delivery into stream segments. Returns ------- DataFrame A DataFrame containing stream information with two additional columns. The ``sed_kg`` column represents the sediment delivery (in kilograms) to individual stream segments. The ``cumsed_kg`` column represents the cumulative sediment delivery (in kilograms), including contributions from all upstream segments. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.sediment_delivery_to_stream_json ), vars_values=locals() ) # check JSON extension of output file utility._validate_json_extension( json_file=json_file ) # stream information DataFrame stream_df = pandas.read_json( path_or_buf=info_file, orient='records' ) # integrating sediment delivery to stream segments stream_col = 'ws_id' sediment_df = pandas.read_csv( filepath_or_buffer=segsed_file, skiprows=1, sep='\t' ) sediment_dict = dict(zip(*sediment_df.values.T)) stream_df['sed_kg'] = stream_df[stream_col].apply(lambda x: sediment_dict.get(x)) # integrating cumulative sediment delivery to stream segments cumsed_df = pandas.read_csv( filepath_or_buffer=cumsed_file, skiprows=1, sep='\t' ) cumsed_dict = dict(zip(*cumsed_df.values.T)) stream_df['cumsed_kg'] = stream_df[stream_col].apply(lambda x: cumsed_dict.get(x)) # saving output Stream DataFrame stream_df.to_json( path_or_buf=json_file, orient='records', indent=4 ) return stream_df
[docs] def sediment_delivery_to_stream_geojson( self, stream_file: str, sediment_file: str, geojson_file: str ) -> geopandas.GeoDataFrame: ''' Generate a stream GeoJSON file containing detailed information for each segment, including sediment inflow values. The output GeoDataFrame includes all columns from the DataFrame produced by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_json`, along with two additional columns ``sed_ton`` and ``cumsed_ton``, which represent sediment inflow to stream segments converted from kilograms to tons. Parameters ---------- stream_file : str Path to the input stream shapefile ``stream_lines.shp``, produced by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. sediment_file : str Path to the input JSON file generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_json`. geojson_file : str Path to the output GeoJSON file to save the output GeoDataFrame. Returns ------- GeoDataFrame A GeoDataFrame containing detailed information on stream segments. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.sediment_delivery_to_stream_geojson ), vars_values=locals() ) # check JSON extension of output file if not geojson_file.lower().endswith('.geojson'): raise TypeError('Output file path must have a valid GeoJSON file extension') # stream GeoDataFrame stream_gdf = geopandas.read_file( filename=stream_file ) # sediment information DataFrame sediment_df = pandas.read_json( path_or_buf=sediment_file, orient='records' ) # merging stream GeoDataFrame with information DataFrame common_cols = [col for col in sediment_df.columns if col in stream_gdf.columns] stream_gdf = stream_gdf.merge( right=sediment_df, on=common_cols ) stream_gdf['sed_ton'] = stream_gdf['sed_kg'] / 1000 stream_gdf['cumsed_ton'] = stream_gdf['cumsed_kg'] / 1000 # saving stream GeoDataFrame stream_gdf.to_file( filename=geojson_file ) return stream_gdf
[docs] def sediment_summary_dynamics_region( self, sediment_file: str, summary_file: str, output_file: str ) -> pandas.DataFrame: ''' Summarize total sediment values for the model region using outputs from a WaTEM/SEDEM simulation with the extension `Only Routing = 0 <https://watem-sedem.github.io/watem-sedem/choices.html#onlyrouting>`_ disabled. The computed totals are used to derive insights on sediment dynamics across the region. Parameters ---------- sediment_file : str Path to the input TXT file ``Total sediment.txt``, generated by a WaTEM/SEDEM simulation. summary_file : str Path to the input JSON file ``summary.json``, produced by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. This file provides the total model region area in square meters. output_file : str Path to the JSON file where the output DataFrame summarizing sediment dynamics will be saved. Returns ------- DataFrame A DataFrame containing summary metrics on sediment dynamics for the model region. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.sediment_summary_dynamics_region ), vars_values=locals() ) # check JSON extension of output file utility._validate_json_extension( json_file=output_file ) # extract values from TXT file with open(sediment_file, 'r') as input_sediment: txt_lines = [line.strip().split(':') for line in input_sediment][:4] value_lines = [ [j, float(k.split('(')[0].strip())] for j, k in txt_lines ] # area from summary file with open(summary_file, 'r') as input_summary: area_m2 = json.load(input_summary)['Watershed area (m^2)'] # sediment summary DataFrame df = pandas.DataFrame( data=value_lines, columns=['features', 'sed_kg'] ) df.iloc[-2, 0] = 'Sediment leaving via stream' df.iloc[-1, 0] = 'Sediment leaving via boundary' df['area_m2'] = area_m2 df['kg_per_m2'] = df['sed_kg'] / area_m2 df['sed_ton'] = df['sed_kg'] / 1000 df['ton_per_ha'] = df['kg_per_m2'] * 10 # save the DataFrame df.to_json( path_or_buf=output_file, orient='records', indent=4 ) return df
[docs] def raster_features_retrieve( self, input_file: str, crs_code: int, output_file: str, target_driver: str = 'GTiff', scale: int | float = 1, offset: int | float = 0 ) -> str: ''' Assign a default ``GTiff`` driver and a Coordinate Reference System (CRS) to the input raster generated from a WaTEM/SEDEM simulation using the default `Idrisi raster format <https://watem-sedem.github.io/watem-sedem/choices.html#saga-grids>`_, which does not include CRS information. The function also applies a linear transformation to raster values :math:`x` using the formula :math:`y = ax + b`, where :math:`a` and :math:`b` are the ``scale`` and ``offset`` input variables, respectively. Parameters ---------- input_file : str Path to the input raster file. crs_code : int EPSG code of the projected CRS to assign (e.g., 32638). output_file : str Path to save the output raster file. target_driver : str, optional GDAL-compatible name of the target driver. Default is 'GTiff'. scale : float, optional Scaling factor to apply to the raster values. Default is 1. offset : float, optional Offset value to add to the scaled raster values. Default is 0. Returns ------- str A message confirming that all geoprocessing steps are complete. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.raster_features_retrieve ), vars_values=locals() ) # class object raster = GeoAnalyze.Raster() # temporary directory with tempfile.TemporaryDirectory() as tmp_dir: # raster driver conversion raster.driver_convert( input_file=input_file, target_driver=target_driver, output_file=os.path.join(tmp_dir, 't1.tif') ) # assiging Coordinate Reference system raster.crs_assign( input_file=os.path.join(tmp_dir, 't1.tif'), crs_code=crs_code, output_file=os.path.join(tmp_dir, 't2.tif') ) # linear scaling of raster raster.value_scale_and_offset( input_file=os.path.join(tmp_dir, 't2.tif'), output_file=output_file, scale=scale, offset=offset ) output = 'All geoprocessing steps are complete' return output
[docs] def nondominated_solution_sorting( self, input_file: str, sorting_by: str, output_file: str ) -> pandas.DataFrame: ''' Sort non-dominated solutions produced by the :class:`OptiDamTool.SystemDesign` class according to one of the available sorting methods: dam identifiers, Euclidean distance, or objective directions. Parameters ---------- input_file : str Path to the input JSON file ``solutions_nondominated.json`` generated by :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`. sorting_by : str Method used to sort the output DataFrame of non-dominated solutions. - ``dam_identifiers`` Sort the DataFrame by the ``d_<i>`` columns in ascending order. - ``metric_euclidean`` Sort the DataFrame by the column ``euclidean_metric(<ideal_solution>)``, which represents the Euclidean distance of normalized solutions to the ideal solution, in ascending order. - ``objective_directions`` Sort the DataFrame by the ``<obj>(<dir>)`` columns according to the specified objective directions (``min`` for ascending and ``max`` for descending). output_file : str Path to the JSON file where the sorted DataFrame will be saved. Returns ------- pandas.DataFrame A sorted DataFrame. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.nondominated_solution_sorting ), vars_values=locals() ) # check JSON extension of output file utility._validate_json_extension( json_file=output_file ) # check validity of input sorting option valid_options = [ 'dam_identifiers', 'metric_euclidean', 'objective_directions' ] if sorting_by not in valid_options: raise ValueError( f'Invalid solution_sorting name "{sorting_by}"; valid names are {valid_options}' ) # DataFrame from JSON file df = pandas.read_json( path_or_buf=input_file, orient='records' ) # select DataFrame columns for sorting df_columns = list(df.columns) if sorting_by == 'dam_identifiers': sort_cols = [ col for col in df_columns if col.startswith('d_') ] ascending = [True] * len(sort_cols) if sorting_by == 'metric_euclidean': sort_cols = [ col for col in df_columns if col.startswith('metric_euclidean') ] ascending = [True] * len(sort_cols) if sorting_by == 'objective_directions': sort_cols = [] ascending = [] for col in df_columns: if col.endswith(('(min)', '(max)')): sort_cols.append(col) obj_dir = True if col.endswith('(min)') else False ascending.append(obj_dir) # DataFrame sorting df = df.sort_values( by=sort_cols, ascending=ascending, ignore_index=True ) df['count'] = [ i + 1 for i in range(len(df)) ] # save output DataFrame df.to_json( path_or_buf=output_file, orient='records', indent=4 ) return df
def _dam_features_extraction( self, input_file: str, output_file: str ) -> geopandas.GeoDataFrame: ''' Extract dam features in the Kingdom of Saudi Arabia from the input file and translates Arabic text to English where applicable. This private utility function returns a GeoDataFrame containing the processed and translated dam features. ''' # Arabic to English translation dictionary arabic_dict = { 'added_columns': { 'المنطقة': 'region', 'المحافظة': 'governorate', 'اسم_السد': 'dam_name', 'الحالة': 'status', 'الغرض__الاساسي': 'original_purpose', 'الغرض_الحالي': 'current_purpose', 'النوع': 'dam_type', 'طول_السد__م': 'dam_length_m', 'إرتفاع_السد__م': 'dam_height_m', 'إرتفاع_المفيض__م': 'spillway_height_m', 'السعة_التخزينية__م3': 'storage_capacity_m3', 'تاريخ_التنفيذ': 'construction_year_hijri', 'شمال': 'latitude', 'شرق': 'longitude', 'Dam_Area': 'drainage_area', 'اسم_الوادي': 'wadi_name', 'اسم_الوادي_2': 'wadi_name_alternative' }, 'row_region': { 'الرياض': 'Riyadh', 'مكة المكرمة': 'Makkah', 'المدينة المنورة': 'Madinah', 'القصيم': 'Qassim', 'الشرقية': 'Eastern Province', 'الحدود الشمالية': 'Northern Borders', 'عسير': 'Asir', 'الباحة': 'Al-Baha', 'حائل': 'Hail', 'تبوك': 'Tabuk', 'الجوف': 'Al-Jawf', 'جازان': 'Jazan', 'نجران': 'Najran' }, 'row_status': { 'منفذ': 'Completed', 'تحت التنفيذ': 'Under construction', 'تحت التصميم': 'Under design', 'تحت الطرح': 'Under tendering' }, 'row_original_purpose': { 'استعاضة': 'Replacement', 'تحكم': 'Control', 'حماية': 'Protection', 'شرب': 'Drinking' }, 'row_current_purpose': { 'حماية': 'Protection', 'استعاضة': 'Replacement', 'شرب': 'Drinking' }, 'row_dam_type': { 'ترابي': 'Earthen', 'خرساني': 'Concrete', 'ركامي': 'Rockfill', 'جوفي': 'Subsurface', 'حجري': 'Stone' }, 'row_drainage_area': { 'الرف الرسوبي': 'Sedimentary shelf', 'الدرع العربي': 'Arabian shield' }, 'row_wadi_name': { 'حوض لمنطقة الشرقية': 'Eastern Region Basin', 'حوض وادي حنيفة': 'Wadi Hanifa Basin', 'حوض وادي الرمة': 'Wadi Al-Rummah Basin', 'حوض وادي السهباء': 'Wadi Al-Sahba Basin', 'حوض وادي الدواسر': 'Wadi Al-Dawasir Basin', 'ج': 'J', 'حوض وادي الخرمة': 'Wadi Al-Khurmah Basin', 'حوض وادي الحمض': 'Wadi Al-Hamad Basin', 'ب': 'B', 'حوض وادي الاخضر': 'Wadi Al-Akhdar Basin', 'حوض وادي عرعر': 'Wadi Arar Basin', 'حوض وادي السرحان': 'Wadi Al-Sarhan Basin', 'ا': 'A', 'حوض الربع الخالي': 'Rub" Al-Khali Basin' }, 'row_wadi_name_alternative': { '<Null>': 'N/A', 'Rainfall Area': 'Rainfall area' } } # dam GeoDataFrame dam_gdf = geopandas.read_file( filename=input_file ) # feature list dam_features = [] for i, html_str in enumerate(dam_gdf['Description']): soup = bs4.BeautifulSoup(html_str, 'html.parser') table = soup.find('table') if not isinstance(table, bs4.Tag): continue inner_table = table.find('table') if not isinstance(inner_table, bs4.Tag): continue rows = inner_table.find_all('tr') i_dict = {} for row in rows: if not isinstance(row, bs4.Tag): continue cols = row.find_all('td') if len(cols) == 2: key = cols[0].get_text(strip=True) value = cols[1].get_text(strip=True) i_dict[key] = value translated_dict = { arabic_dict['added_columns'][k]: v for k, v in i_dict.items() } dam_features.append(translated_dict) # Combine feature DataFrame with dam GeoDataFrame feature_df = pandas.DataFrame(dam_features) dam_gdf = pandas.concat( objs=[dam_gdf, feature_df], axis=1 ) # drop columns that are not required dam_gdf = dam_gdf.drop( columns=['layer', 'Name', 'Description'] ) # convert string number to float in the applicable columns float_cols = [ 'dam_length_m', 'dam_height_m', 'storage_capacity_m3', 'latitude', 'longitude' ] for col in float_cols: dam_gdf[col] = dam_gdf[col].astype(float) # convert Arabic row entries to English for col in dam_gdf.columns: dict_key = 'row_' + col if dict_key in arabic_dict: dam_gdf[col] = dam_gdf[col].apply(lambda x: arabic_dict[dict_key][x]) # get approximate Gregorian construction year dam_gdf['construction_year'] = dam_gdf['construction_year_hijri'].apply( lambda x: re.search(r'(\d+)', x) ) dam_gdf['construction_year'] = dam_gdf['construction_year'].apply( lambda x: x.group(1) if x else None ) dam_gdf['construction_year'] = dam_gdf['construction_year'].apply( lambda x: int(float(x) * 0.97 + 622) if x else None ) # saving dam GeoDataFrame dam_gdf.to_file( filename=output_file ) return dam_gdf