import GeoAnalyze
import geopandas
import pandas
import bs4
import re
import json
import typing
import tempfile
import os
from . import utility
[docs]
class Analysis:
'''
Provide methods for analyzing simulation outputs and generating insights.
'''
[docs]
def sediment_delivery_to_stream_json(
self,
info_file: str,
segsed_file: str,
cumsed_file: str,
json_file: str
) -> pandas.DataFrame:
'''
Integrate sediment delivery data into stream segments using WaTEM/SEDEM outputs, with the extension
`Output per river segment = 1 <https://watem-sedem.github.io/watem-sedem/model_extensions.html#output-per-river-segment>`_ enabled.
Parameters
----------
info_file : str
Path to the input information TXT file ``stream_information.txt``, produced by
:meth:`OptiDamTool.WatemSedem.dem_to_stream`.
segsed_file : str
Path to the input TXT file ``Total sediment segments.txt``,
generated by a WaTEM/SEDEM simulation. This file contains sediment
inflow (in kilograms) to each stream segment from its own subbasin area.
cumsed_file : str
Path to the input TXT file ``Cumulative sediment segments.txt``,
generated by a WaTEM/SEDEM simulation. This file contains sediment
inflow (in kilograms) to each stream segment from all upstream segments.
json_file : str
Path to the output JSON file to save the output DataFrame after integrating sediment delivery
into stream segments.
Returns
-------
DataFrame
A DataFrame containing stream information with two additional columns. The ``sed_kg`` column
represents the sediment delivery (in kilograms) to individual stream segments. The ``cumsed_kg`` column
represents the cumulative sediment delivery (in kilograms), including contributions from all upstream segments.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.sediment_delivery_to_stream_json
),
vars_values=locals()
)
# check JSON extension of output file
utility._validate_json_extension(
json_file=json_file
)
# stream information DataFrame
stream_df = pandas.read_json(
path_or_buf=info_file,
orient='records'
)
# integrating sediment delivery to stream segments
stream_col = 'ws_id'
sediment_df = pandas.read_csv(
filepath_or_buffer=segsed_file,
skiprows=1,
sep='\t'
)
sediment_dict = dict(zip(*sediment_df.values.T))
stream_df['sed_kg'] = stream_df[stream_col].apply(lambda x: sediment_dict.get(x))
# integrating cumulative sediment delivery to stream segments
cumsed_df = pandas.read_csv(
filepath_or_buffer=cumsed_file,
skiprows=1,
sep='\t'
)
cumsed_dict = dict(zip(*cumsed_df.values.T))
stream_df['cumsed_kg'] = stream_df[stream_col].apply(lambda x: cumsed_dict.get(x))
# saving output Stream DataFrame
stream_df.to_json(
path_or_buf=json_file,
orient='records',
indent=4
)
return stream_df
[docs]
def sediment_delivery_to_stream_geojson(
self,
stream_file: str,
sediment_file: str,
geojson_file: str
) -> geopandas.GeoDataFrame:
'''
Generate a stream GeoJSON file containing detailed information for each segment, including sediment inflow values.
The output GeoDataFrame includes all columns from the DataFrame produced by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_json`, along with two additional columns
``sed_ton`` and ``cumsed_ton``, which represent sediment inflow to stream segments converted from kilograms to tons.
Parameters
----------
stream_file : str
Path to the input stream shapefile ``stream_lines.shp``, produced by
:meth:`OptiDamTool.WatemSedem.dem_to_stream`.
sediment_file : str
Path to the input JSON file generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_json`.
geojson_file : str
Path to the output GeoJSON file to save the output GeoDataFrame.
Returns
-------
GeoDataFrame
A GeoDataFrame containing detailed information on stream segments.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.sediment_delivery_to_stream_geojson
),
vars_values=locals()
)
# check JSON extension of output file
if not geojson_file.lower().endswith('.geojson'):
raise TypeError('Output file path must have a valid GeoJSON file extension')
# stream GeoDataFrame
stream_gdf = geopandas.read_file(
filename=stream_file
)
# sediment information DataFrame
sediment_df = pandas.read_json(
path_or_buf=sediment_file,
orient='records'
)
# merging stream GeoDataFrame with information DataFrame
common_cols = [col for col in sediment_df.columns if col in stream_gdf.columns]
stream_gdf = stream_gdf.merge(
right=sediment_df,
on=common_cols
)
stream_gdf['sed_ton'] = stream_gdf['sed_kg'] / 1000
stream_gdf['cumsed_ton'] = stream_gdf['cumsed_kg'] / 1000
# saving stream GeoDataFrame
stream_gdf.to_file(
filename=geojson_file
)
return stream_gdf
[docs]
def sediment_summary_dynamics_region(
self,
sediment_file: str,
summary_file: str,
output_file: str
) -> pandas.DataFrame:
'''
Summarize total sediment values for the model region using outputs from a WaTEM/SEDEM simulation
with the extension `Only Routing = 0 <https://watem-sedem.github.io/watem-sedem/choices.html#onlyrouting>`_
disabled. The computed totals are used to derive insights on sediment dynamics across the region.
Parameters
----------
sediment_file : str
Path to the input TXT file ``Total sediment.txt``, generated by a WaTEM/SEDEM simulation.
summary_file : str
Path to the input JSON file ``summary.json``, produced by
:meth:`OptiDamTool.WatemSedem.dem_to_stream`. This file provides the total model region area in square meters.
output_file : str
Path to the JSON file where the output DataFrame summarizing sediment dynamics will be saved.
Returns
-------
DataFrame
A DataFrame containing summary metrics on sediment dynamics for the model region.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.sediment_summary_dynamics_region
),
vars_values=locals()
)
# check JSON extension of output file
utility._validate_json_extension(
json_file=output_file
)
# extract values from TXT file
with open(sediment_file, 'r') as input_sediment:
txt_lines = [line.strip().split(':') for line in input_sediment][:4]
value_lines = [
[j, float(k.split('(')[0].strip())] for j, k in txt_lines
]
# area from summary file
with open(summary_file, 'r') as input_summary:
area_m2 = json.load(input_summary)['Watershed area (m^2)']
# sediment summary DataFrame
df = pandas.DataFrame(
data=value_lines,
columns=['features', 'sed_kg']
)
df.iloc[-2, 0] = 'Sediment leaving via stream'
df.iloc[-1, 0] = 'Sediment leaving via boundary'
df['area_m2'] = area_m2
df['kg_per_m2'] = df['sed_kg'] / area_m2
df['sed_ton'] = df['sed_kg'] / 1000
df['ton_per_ha'] = df['kg_per_m2'] * 10
# save the DataFrame
df.to_json(
path_or_buf=output_file,
orient='records',
indent=4
)
return df
[docs]
def raster_features_retrieve(
self,
input_file: str,
crs_code: int,
output_file: str,
target_driver: str = 'GTiff',
scale: int | float = 1,
offset: int | float = 0
) -> str:
'''
Assign a default ``GTiff`` driver and a Coordinate Reference System (CRS) to the input raster
generated from a WaTEM/SEDEM simulation using the default
`Idrisi raster format <https://watem-sedem.github.io/watem-sedem/choices.html#saga-grids>`_,
which does not include CRS information. The function also applies a linear transformation to
raster values :math:`x` using the formula :math:`y = ax + b`, where :math:`a` and :math:`b`
are the ``scale`` and ``offset`` input variables, respectively.
Parameters
----------
input_file : str
Path to the input raster file.
crs_code : int
EPSG code of the projected CRS to assign (e.g., 32638).
output_file : str
Path to save the output raster file.
target_driver : str, optional
GDAL-compatible name of the target driver. Default is 'GTiff'.
scale : float, optional
Scaling factor to apply to the raster values. Default is 1.
offset : float, optional
Offset value to add to the scaled raster values. Default is 0.
Returns
-------
str
A message confirming that all geoprocessing steps are complete.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.raster_features_retrieve
),
vars_values=locals()
)
# class object
raster = GeoAnalyze.Raster()
# temporary directory
with tempfile.TemporaryDirectory() as tmp_dir:
# raster driver conversion
raster.driver_convert(
input_file=input_file,
target_driver=target_driver,
output_file=os.path.join(tmp_dir, 't1.tif')
)
# assiging Coordinate Reference system
raster.crs_assign(
input_file=os.path.join(tmp_dir, 't1.tif'),
crs_code=crs_code,
output_file=os.path.join(tmp_dir, 't2.tif')
)
# linear scaling of raster
raster.value_scale_and_offset(
input_file=os.path.join(tmp_dir, 't2.tif'),
output_file=output_file,
scale=scale,
offset=offset
)
output = 'All geoprocessing steps are complete'
return output
[docs]
def nondominated_solution_sorting(
self,
input_file: str,
sorting_by: str,
output_file: str
) -> pandas.DataFrame:
'''
Sort non-dominated solutions produced by the :class:`OptiDamTool.SystemDesign`
class according to one of the available sorting methods: dam identifiers,
Euclidean distance, or objective directions.
Parameters
----------
input_file : str
Path to the input JSON file ``solutions_nondominated.json`` generated by
:meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
sorting_by : str
Method used to sort the output DataFrame of non-dominated solutions.
- ``dam_identifiers``
Sort the DataFrame by the ``d_<i>`` columns in ascending order.
- ``euclidean_metric``
Sort the DataFrame by the column ``euclidean_metric(<ideal_solution>)``,
which represents the Euclidean distance of normalized solutions to the
ideal solution, in ascending order.
- ``objective_directions``
Sort the DataFrame by the ``<obj>(<dir>)`` columns according to the
specified objective directions (``min`` for ascending and ``max`` for
descending).
output_file : str
Path to the JSON file where the sorted DataFrame will be saved.
Returns
-------
pandas.DataFrame
A sorted DataFrame.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.nondominated_solution_sorting
),
vars_values=locals()
)
# check JSON extension of output file
utility._validate_json_extension(
json_file=output_file
)
# check validity of input sorting option
valid_options = [
'dam_identifiers',
'metric_euclidean',
'objective_directions'
]
if sorting_by not in valid_options:
raise ValueError(
f'Invalid solution_sorting name "{sorting_by}"; valid names are {valid_options}'
)
# DataFrame from JSON file
df = pandas.read_json(
path_or_buf=input_file,
orient='records'
)
# select DataFrame columns for sorting
df_columns = list(df.columns)
if sorting_by == 'dam_identifiers':
sort_cols = [
col for col in df_columns if col.startswith('d_')
]
ascending = [True] * len(sort_cols)
if sorting_by == 'metric_euclidean':
sort_cols = [
col for col in df_columns if col.startswith('metric_euclidean')
]
ascending = [True] * len(sort_cols)
if sorting_by == 'objective_directions':
sort_cols = []
ascending = []
for col in df_columns:
if col.endswith(('(min)', '(max)')):
sort_cols.append(col)
obj_dir = True if col.endswith('(min)') else False
ascending.append(obj_dir)
# DataFrame sorting
df = df.sort_values(
by=sort_cols,
ascending=ascending,
ignore_index=True
)
df['count'] = [
i + 1 for i in range(len(df))
]
# save output DataFrame
df.to_json(
path_or_buf=output_file,
orient='records',
indent=4
)
return df
def _dam_features_extraction(
self,
input_file: str,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Extract dam features in the Kingdom of Saudi Arabia from the input file and
translates Arabic text to English where applicable. This private utility function
returns a GeoDataFrame containing the processed and translated dam features.
'''
# Arabic to English translation dictionary
arabic_dict = {
'added_columns': {
'المنطقة': 'region',
'المحافظة': 'governorate',
'اسم_السد': 'dam_name',
'الحالة': 'status',
'الغرض__الاساسي': 'original_purpose',
'الغرض_الحالي': 'current_purpose',
'النوع': 'dam_type',
'طول_السد__م': 'dam_length_m',
'إرتفاع_السد__م': 'dam_height_m',
'إرتفاع_المفيض__م': 'spillway_height_m',
'السعة_التخزينية__م3': 'storage_capacity_m3',
'تاريخ_التنفيذ': 'construction_year_hijri',
'شمال': 'latitude',
'شرق': 'longitude',
'Dam_Area': 'drainage_area',
'اسم_الوادي': 'wadi_name',
'اسم_الوادي_2': 'wadi_name_alternative'
},
'row_region': {
'الرياض': 'Riyadh',
'مكة المكرمة': 'Makkah',
'المدينة المنورة': 'Madinah',
'القصيم': 'Qassim',
'الشرقية': 'Eastern Province',
'الحدود الشمالية': 'Northern Borders',
'عسير': 'Asir',
'الباحة': 'Al-Baha',
'حائل': 'Hail',
'تبوك': 'Tabuk',
'الجوف': 'Al-Jawf',
'جازان': 'Jazan',
'نجران': 'Najran'
},
'row_status': {
'منفذ': 'Completed',
'تحت التنفيذ': 'Under construction',
'تحت التصميم': 'Under design',
'تحت الطرح': 'Under tendering'
},
'row_original_purpose': {
'استعاضة': 'Replacement',
'تحكم': 'Control',
'حماية': 'Protection',
'شرب': 'Drinking'
},
'row_current_purpose': {
'حماية': 'Protection',
'استعاضة': 'Replacement',
'شرب': 'Drinking'
},
'row_dam_type': {
'ترابي': 'Earthen',
'خرساني': 'Concrete',
'ركامي': 'Rockfill',
'جوفي': 'Subsurface',
'حجري': 'Stone'
},
'row_drainage_area': {
'الرف الرسوبي': 'Sedimentary shelf',
'الدرع العربي': 'Arabian shield'
},
'row_wadi_name': {
'حوض لمنطقة الشرقية': 'Eastern Region Basin',
'حوض وادي حنيفة': 'Wadi Hanifa Basin',
'حوض وادي الرمة': 'Wadi Al-Rummah Basin',
'حوض وادي السهباء': 'Wadi Al-Sahba Basin',
'حوض وادي الدواسر': 'Wadi Al-Dawasir Basin',
'ج': 'J',
'حوض وادي الخرمة': 'Wadi Al-Khurmah Basin',
'حوض وادي الحمض': 'Wadi Al-Hamad Basin',
'ب': 'B',
'حوض وادي الاخضر': 'Wadi Al-Akhdar Basin',
'حوض وادي عرعر': 'Wadi Arar Basin',
'حوض وادي السرحان': 'Wadi Al-Sarhan Basin',
'ا': 'A',
'حوض الربع الخالي': 'Rub" Al-Khali Basin'
},
'row_wadi_name_alternative': {
'<Null>': 'N/A',
'Rainfall Area': 'Rainfall area'
}
}
# dam GeoDataFrame
dam_gdf = geopandas.read_file(
filename=input_file
)
# feature list
dam_features = []
for i, html_str in enumerate(dam_gdf['Description']):
soup = bs4.BeautifulSoup(html_str, 'html.parser')
table = soup.find('table')
if not isinstance(table, bs4.Tag):
continue
inner_table = table.find('table')
if not isinstance(inner_table, bs4.Tag):
continue
rows = inner_table.find_all('tr')
i_dict = {}
for row in rows:
if not isinstance(row, bs4.Tag):
continue
cols = row.find_all('td')
if len(cols) == 2:
key = cols[0].get_text(strip=True)
value = cols[1].get_text(strip=True)
i_dict[key] = value
translated_dict = {
arabic_dict['added_columns'][k]: v for k, v in i_dict.items()
}
dam_features.append(translated_dict)
# Combine feature DataFrame with dam GeoDataFrame
feature_df = pandas.DataFrame(dam_features)
dam_gdf = pandas.concat(
objs=[dam_gdf, feature_df],
axis=1
)
# drop columns that are not required
dam_gdf = dam_gdf.drop(
columns=['layer', 'Name', 'Description']
)
# convert string number to float in the applicable columns
float_cols = [
'dam_length_m',
'dam_height_m',
'storage_capacity_m3',
'latitude',
'longitude'
]
for col in float_cols:
dam_gdf[col] = dam_gdf[col].astype(float)
# convert Arabic row entries to English
for col in dam_gdf.columns:
dict_key = 'row_' + col
if dict_key in arabic_dict:
dam_gdf[col] = dam_gdf[col].apply(lambda x: arabic_dict[dict_key][x])
# get approximate Gregorian construction year
dam_gdf['construction_year'] = dam_gdf['construction_year_hijri'].apply(
lambda x: re.search(r'(\d+)', x)
)
dam_gdf['construction_year'] = dam_gdf['construction_year'].apply(
lambda x: x.group(1) if x else None
)
dam_gdf['construction_year'] = dam_gdf['construction_year'].apply(
lambda x: int(float(x) * 0.97 + 622) if x else None
)
# saving dam GeoDataFrame
dam_gdf.to_file(
filename=output_file
)
return dam_gdf