import GeoAnalyze
import geopandas
import shapely
import pandas
import typing
import os
from .watem_sedem import WatemSedem
from . import utility
[docs]
class Network:
'''
Provide functionality to establish network-based connectivity and operations between dams.
'''
[docs]
def connectivity_adjacent_downstream_dam(
self,
stream_file: str,
dam_list: list[int],
sort_dam: bool = False
) -> dict[int, int]:
'''
Generate adjacent downstream connectivity between dams based on the input stream network.
Each dam is represented by a unique stream segment identifier.
Parameters
----------
stream_file: str
Path to the input stream shapefile,
generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`.
dam_list : list
List of stream segment identifiers representing dam locations.
sort_dam : bool, optional
If True, the dam identifiers in the output dictionary keys will be sorted in ascending order.
Default is False.
Returns
-------
dict
A dictionary with each key is a dam's stream identifier, and the corresponding value
is the stream identifier of the directly connected downstream dam.
A value of -1 indicates that the dam has no downstream connectivity.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.connectivity_adjacent_downstream_dam
),
vars_values=locals()
)
# check distinct stream identifiers for dams
if len(set(dam_list)) < len(dam_list):
raise ValueError('Duplicate stream identifiers found in the input dam list')
# connectivity from upstream to downstream
connect_dict = GeoAnalyze.Stream()._connectivity_upstream_to_downstream(
stream_file=stream_file,
stream_col='ws_id'
)
# adjacent downstream connectvity
adc_dict = {}
for i in dam_list:
if i not in connect_dict:
raise ValueError(f'Invalid stream identifier {i} for a dam')
# all dam connectivity towards outlet
stream_connect = connect_dict[i]
dam_connect = list(
filter(lambda x: x in stream_connect, dam_list)
)
# if no downstream dam is found
if len(dam_connect) == 0:
adc_dict[i] = -1
# extract the adjacent downstream dam
else:
dam_indices = [
stream_connect.index(j) for j in dam_connect
]
adc_dict[i] = stream_connect[min(dam_indices)]
# filtered connectivity for stream outlet identifiers where key and value are same
adc_dict = {
k: v if k != v else -1 for k, v in adc_dict.items()
}
# output dictionary
output = dict(sorted(adc_dict.items())) if sort_dam else adc_dict
return output
[docs]
def connectivity_adjacent_upstream_dam(
self,
stream_file: str,
dam_list: list[int],
sort_dam: bool = False
) -> dict[int, list[int]]:
'''
Compute adjacent upstream connectivity between dams based on the input stream network.
Each dam is represented by a unique stream segment identifier.
Parameters
----------
stream_file: str
Path to the input stream shapefile,
generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`.
dam_list : list
List of stream segment identifiers representing dam locations.
sort_dam : bool, optional
If True, both the dam identifier keys and their corresponding value lists
in the output dictionary will be sorted in ascending order. Default is False.
Returns
-------
dict
A dictionary where each key is a dam's stream identifier, and the corresponding value
is a list of adjacent upstream dam identifiers. An empty list indicates no upstream connectivity.
'''
# adjacent downstream connectivity dictionary
adc_dict = self.connectivity_adjacent_downstream_dam(
stream_file=stream_file,
dam_list=dam_list
)
# DataFrame creation for adjacent downstream connectivity
df_dict = {
'dam_id': adc_dict.keys(),
'adc_id': adc_dict.values()
}
df = pandas.DataFrame(
data=df_dict
)
# non-empty adjacent upstream connectivity
auc_dict = {
j: k['dam_id'].tolist() for j, k in df.groupby(by='adc_id')
}
# adjacent upstream connectivity of all dams
auc_dict = {
i: auc_dict[i] if i in auc_dict else list() for i in dam_list
}
# output dictionary
output = {k: sorted(v) for k, v in sorted(auc_dict.items())} if sort_dam else auc_dict
return output
[docs]
def controlled_drainage_area(
self,
stream_file: str,
dam_list: list[int],
sort_dam: bool = False
) -> dict[int, float]:
'''
Compute the controlled upstream drainage area for selected dams
based on their locations within the input stream network.
Each dam is represented by a unique stream segment identifier.
Parameters
----------
stream_file : str
Path to the input stream shapefile,
generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`.
info_file : str
Path to the stream information text file ``stream_information.txt``,
generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`.
dam_list : list
List of stream segment identifiers representing dam locations.
sort_dam : bool, optional
If True, the dam identifiers in the output dictionary keys will be sorted in ascending order.
Default is False.
Returns
-------
dict
A dictionary where each key is a dam's stream segment identifier,
and the corresponding value is the effective upstream drainage area in square meters.
'''
# adjacent downstream connectivity dictionary
auc_dict = self.connectivity_adjacent_upstream_dam(
stream_file=stream_file,
dam_list=dam_list
)
# cumulative subbasin area dictionary from stream information DataFrame
stream_gdf = geopandas.read_file(
filename=stream_file
)
cumarea_dict = dict(
zip(stream_gdf['ws_id'], stream_gdf['csa_m2'])
)
# effective drainage area dictionary
eda_dict = {}
for i in dam_list:
eda_dict[i] = cumarea_dict[i] - sum([cumarea_dict[j] for j in auc_dict[i]])
# output dictionary
output = dict(sorted(eda_dict.items())) if sort_dam else eda_dict
return output
[docs]
def sediment_inflow_from_drainage_area(
self,
stream_file: str,
dam_list: list[int],
sort_dam: bool = False
) -> dict[int, float]:
'''
Compute the sediment inflows in kilograms for selected dams based on their
local drainage areas within the input stream network. Each dam is identified by a
unique stream segment identifier.
Parameters
----------
stream_file : str
Path to the input stream GeoJSON file, generated by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`.
dam_list : list
List of stream segment identifiers representing dam locations.
sort_dam : bool, optional
If True, the dam identifiers in the output dictionary keys will be sorted in ascending order.
Default is False.
Returns
-------
dict
A dictionary where each key is a dam's stream segment identifier,
and the corresponding value is the effective sediment inflow in kilograms.
'''
# adjacent downstream connectivity dictionary
auc_dict = self.connectivity_adjacent_upstream_dam(
stream_file=stream_file,
dam_list=dam_list
)
# cumulative sediment input from stream information GeoDataFrame
stream_gdf = geopandas.read_file(
filename=stream_file
)
cumsed_dict = dict(
zip(stream_gdf['ws_id'], stream_gdf['cumsed_kg'])
)
# effective sediment inflow dictionary
esi_dict = {}
for i in dam_list:
esi_dict[i] = cumsed_dict[i] - sum([cumsed_dict[j] for j in auc_dict[i]])
# output dictionary
output = dict(sorted(esi_dict.items())) if sort_dam else esi_dict
return output
[docs]
def upstream_metrics_summary(
self,
stream_file: str,
dam_list: list[int],
sort_dam: bool = False
) -> dict[str, dict[int, typing.Any]]:
'''
Compute a summary of upstream metrics for selected dams based on their
locations within the input stream network. Each dam is identified by a unique stream
segment identifier.
The function returns a dictionary containing three sub-dictionaries, where each key
corresponds to a dam's stream identifier:
- **adjacent_upstream_dams**: A dictionary where each value is a list of directly
connected upstream dam identifiers. An empty list indicates no upstream connectivity.
- **controlled_drainage_m2**: A dictionary mapping each dam to its effective upstream
drainage area in square meters, based on upstream dam connections.
- **sediment_inflow_kg**: A dictionary mapping each dam to its effective sediment
inflow in kilograms, based on upstream dam connections.
Parameters
----------
stream_file : str
Path to the input stream GeoJSON file, generated by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`.
dam_list : list
List of stream segment identifiers representing dam locations.
sort_dam : bool, optional
If True, the dam identifiers in three sub-dictionaries keys will be sorted in ascending order.
Default is False.
Returns
-------
dict
A dictionary with three keys: ``adjacent_upstream_connections``,
``controlled_drainage_m2``, and ``sediment_inflow_kg``.
Each key maps to a sub-dictionary where the keys are dam segment identifiers
and the values are the computed metrics.
'''
# adjacent downstream connectivity dictionary
auc_dict = self.connectivity_adjacent_upstream_dam(
stream_file=stream_file,
dam_list=dam_list
)
# cumulative inputs dictionary
stream_gdf = geopandas.read_file(stream_file)
cumarea_dict = dict(zip(stream_gdf['ws_id'], stream_gdf['csa_m2']))
cumsed_dict = dict(zip(stream_gdf['ws_id'], stream_gdf['cumsed_kg']))
# effective metric dictionaries
eda_dict = {}
esi_dict = {}
for i in dam_list:
# effective drainage area for upstream connections
eda_dict[i] = cumarea_dict[i] - sum([cumarea_dict[j] for j in auc_dict[i]])
# effective sediment inflow for upstream connections
esi_dict[i] = cumsed_dict[i] - sum([cumsed_dict[j] for j in auc_dict[i]])
# output metric dictionary
output = {
'adjacent_upstream_dams': {k: sorted(v) for k, v in sorted(auc_dict.items())} if sort_dam else auc_dict,
'controlled_drainage_m2': dict(sorted(eda_dict.items())) if sort_dam else eda_dict,
'sediment_inflow_kg': dict(sorted(esi_dict.items())) if sort_dam else esi_dict
}
return output
[docs]
def trap_efficiency_brown(
self,
storage_dict: dict[int, float],
area_dict: dict[int, float],
brown_d: int | float = 0.1
) -> dict[int, float]:
'''
Compute the Trap Efficiency (TE) without percentage conversion of a dam using the Brown (1943) formula.
Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_.
Parameters
----------
storage_dict : dict
A dictionary where each key is a dam identifier, and the corresponding value
is the dam's storage capacity in cubic meters.
area_dict : dict
A dictionary where each key is a dam identifier, and the corresponding value
is the dam's catchment area in square meters. The function internally
converts the area from square meters to square kilometers.
brown_d : float, optional
Empirical parameter used in the equation. It ranges from 0.046 to 1,
with a mean value of 0.1, which is also the default.
Returns
-------
dict
A dictionary where each key is a dam identifier, and the corresponding value is the computed TE.
'''
# check if both dictionaries have same keys
if area_dict.keys() != storage_dict.keys():
raise KeyError('Mismatch of keys between storage and area dictionaries')
# sediment trapping efficinecy
ste_dict = {}
for dam_id in storage_dict:
area_km2 = area_dict[dam_id] / 10**6
ste_dict[dam_id] = 1 - (1 / (1 + 0.0021 * brown_d * storage_dict[dam_id] / area_km2))
return ste_dict
def _base_storage_df(
self,
storage_dict: dict[int, float]
) -> pandas.DataFrame:
'''
Convert the input storage dictionary of dams into a DataFrame.
'''
# DataFrame from dictionary
df = pandas.DataFrame.from_dict(
data=storage_dict,
orient='index',
columns=['storage_m3']
)
df = df.reset_index(
names=['dam_id']
)
# storage percentage
df['storage_%'] = 100 * df['storage_m3'] / df['storage_m3'].sum()
# Cummulative storage
df['cum_m3'] = df['storage_m3'].cumsum()
return df
def _validate_stodym_plus_config(
self,
storage_dict: dict[int, float],
year_limit: int,
trap_equation: bool,
trap_threshold: float,
trap_constant: typing.Optional[float]
) -> None:
'''
Validate input variables configuration for
the method :meth:`OptiDamTool.Network.stodym_plus`.
'''
# check intial dam storage capacity cannot be negative
for d_id in storage_dict:
if storage_dict[d_id] < 0:
raise ValueError(
f'Invalid negative intial storage volume {storage_dict[d_id]} for the dam identifier {d_id} was received'
)
# check that year_limit value is greater than 0
if year_limit <= 0:
raise ValueError(
f'year_limit must be greater than 0, but received {year_limit}'
)
# check trap_equation
if not trap_equation:
if trap_constant is None or not isinstance(trap_constant, (int, float)):
raise TypeError(
f'trap_constant must be a numeric value when "trap_equation=False", but got type "{type(trap_constant).__name__}"'
)
if not (trap_threshold < trap_constant <= 1):
raise ValueError(
f'trap_constant {trap_constant} is invalid: must satisfy trap_threshold ({trap_threshold}) < trap_constant <= 1'
)
return None
[docs]
def stodym_plus(
self,
stream_file: str,
storage_dict: dict[int, float],
sediment_density: int | float,
year_limit: int,
trap_equation: bool = True,
trap_threshold: int | float = 0.1,
brown_d: int | float = 0.1,
trap_constant: typing.Optional[int | float] = None,
release_threshold: int | float = 0.95,
config_validation: bool = True,
folder_path: typing.Optional[str] = None
) -> dict[str, pandas.DataFrame]:
'''
Simulate the annual storage dynamics of a dam system influenced by sedimentation,
and returns a dictionary containing dam lifespan, system-wide statistics,
individual metrics, and simulation parameters.
The simulation runs for a user-defined number of years, unless all dams become inactive earlier.
Sediment inflow to a dam is calculated as the sum of sediment yield from the
dam's controlled drainage area and the sediment outflow from upstream connected dams.
The amount of sediment trapped by a dam is calculated as the product of the sediment inflow
to the dam and its trap efficiency, which ranges from 0 to 1. At the end of each simulation year,
dam storage capacities are updated using a mass balance approach based on the trapped sediment.
The simulation ends when either all individual dams become inactive or the entire dam system becomes inactive,
whichever occurs first.
A dam is considered inactive if:
- Its storage capacity becomes non-positive (``<= 0``).
- Its sediment trap efficiency falls below or equals the threshold (``<= trap_threshold``).
The dam system is considered inactive if:
- The maximum number of simulation years (``year_limit``) is reached.
- The sediment release fraction of total stream input exceeds the threshold (``> release_threshold``).
Each dam in a system is uniquely identified by its stream segment identifier.
When a dam becomes inactive, system connectivity, controlled drainage areas,
and sediment inflows to the remaining dams are updated accordingly.
.. warning::
If a dam retains positive storage and the `trap_threshold` is not set appropriately,
it may remain active indefinitely. Since trap efficiency is computed via
:meth:`OptiDamTool.Network.trap_efficiency_brown`, which depends on dam storage,
using a very small threshold (e.g., 0.0001) may lead to unrealistic system behavior
due to prolonged dam lifespan despite minimal sediment trapping.
A recommended value is 0.1 based on `Pal et al. (2018) <https://doi.org/10.1016/j.jhydrol.2018.02.051>`_
and `Pal and Galelli (2019) <https://doi.org/10.1016/j.envsoft.2019.05.007>`_.
This value may be adjusted to suit user needs.
The simulation results are returned as a dictionary and can optionally be saved to the input directory as a set of JSON files.
Each JSON file is named after the corresponding dictionary key and contains the related DataFrame.
A detailed description of the keys and their corresponding DataFrames is provided below.
- **dam_initial_storage**: A DataFrame with the following columns:
- ``dam_id``: Stream segment identifier of the dam.
- ``storage_m3``: Initial storage capacity of the dam, in cubic meters.
- ``storage_%``: Initial storage of the dam as a percentage of the total storage across all dams.
- ``cum_m3``: Cumulative initial storage (up to this dam), in cubic meters, to give an idea of the total capacity.
- **dam_lifespan**: A DataFrame with the following columns:
- ``dam_id``: Stream segment identifier of the dam.
- ``life_year``: Number of years the dam was active.
- **system_statistics**: A DataFrame that stores system-wide statistics with the following columns:
- ``start_year``: Start of each simulation year, beginning from 1.
- ``drainage_%``: Percentage of the total controlled drainage area relative
to the total stream drainage area, at the beginning of the simulation year.
- ``storage_%``: Percentage of total remaining storage across all dams relative
to the initial total storage, at the beginning of the simulation year.
- ``sedtrap_%``: Percentage of total sediment trapped by all dams relative
to the total sediment input across all stream segments, during the simulation year.
- ``sedrelease_%``: Percentage of sediment released by terminal dams and
drainage areas not covered by the dam system, relative to the total sediment input across all stream segments,
during the simulation year.
- ``dam_removed``: List of dams removed at the beginning of the year. An empty list
implies all dams in the system remain active.
- ``dam_active``: List of active dams at the beginning of the year. An empty list
in the final simulation year, if it occurs, indicates that all dams in the system
became inactive before completing the user-defined number of simulation years.
.. note::
The sum of sediment trapping and release percentages for the dam system may not equal 100%.
This is because these percentages are calculated relative to the total sediment input
across all stream segments, using a fixed base value for consistency. In practice,
the actual sediment inflow to the dams is greater than the total sediment input
across all stream segments, due to additional contributions from sediment released by upstream dams.
Furthermore, a dam's sediment trapping efficiency depends on both its controlled drainage area
and remaining storage capacity, making sediment dynamics within the system more complex.
However, over time, the sum gradually converges to 100% as more dams become inactive,
resulting in reduced sediment contributions from upstream dams.
- **dam_drainage_area**: A DataFrame with columns ``start_year`` and dam identifiers.
The cell values represent the annual controlled drainage area percentages for each dam,
relative to the total stream drainage area at the beginning of the simulation year.
- **dam_remaining_storage**: A DataFrame with columns ``start_year`` and dam identifiers.
The cell values represent the annual remaining storage percentages, relative to each dam's initial storage,
at the beginning of the simulation year.
- **dam_trap_efficiency**: A DataFrame with columns ``start_year`` and dam identifiers.
The cell values represent the annual trap efficiency percentages at the beginning of the simulation year.
- **dam_trapped_sediment**: A DataFrame with columns ``start_year`` and dam identifiers.
The cell values represent the annual sediment trapping percentages, relative to the total sediment input
across all stream segments, at the end of the simulation year.
.. note::
The values in ``dam_drainage_area``, ``dam_remaining_storage``, and ``dam_trap_efficiency`` are recorded
at the beginning of each simulation year. This design ensures that the final state of each dam is captured
just before its removal from the system, allowing users to understand the conditions that led to its inactivation.
Parameters
----------
stream_file : str
Path to the input stream GeoJSON file, generated by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`.
storage_dict : dict
Dictionary mapping dam identifiers to initial storage capacities in cubic meters.
sediment_density : float
Density of sediment in kilograms per cubic meter.
year_limit : int
Maximum number of simulation years. Simulation stops earlier if all dams retire.
trap_equation : bool, optional
If True (default), trap efficiency is calculated using the Brown (1943) formula.
Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_.
trap_threshold : float, optional
Minimum trap efficiency required to keep a dam active (default 0.05).
Dams with efficiency below this value are treated as inactive.
brown_d : float, optional
Empirical parameter used in the trap efficiency equation. It ranges from 0.046 to 1,
with a mean value of 0.1, which is also the default. Refer to
`Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_.
trap_constant : float, optional
Fixed trap efficiency value applied when ``trap_equation=False``.
Must be greater than ``trap_threshold`` and less than 1. Default is None.
release_threshold: float, optional
Maximum sediment release fraction threshold of the total stream sediment input to stop the simulation.
The default is 0.95, meaning 95% of the total stream sediment input is leaving the study area.
If the user does not want to consider this parameter in the simulation, set its value to 1.0.
config_validation: bool, optional
If ``True`` (default), validates the configuration of input parameters. Users are advised not to modify this parameter.
This option is provided to speed up computations in the :class:`OptiDamTool.SystemDesign` class,
where configuration checking is not required for every scenario generated by genetic algorithms.
folder_path : str, optional
Path to the folder where JSON files will be saved. If None (default), no files are saved.
Returns
-------
dict
A dictionary containing detailed information on dam lifespan, system-wide statistics,
individual metrics, and simulation parameters.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.stodym_plus
),
vars_values=locals()
)
# check the vailidity of folder path
if folder_path is not None:
utility._validate_folder_path(
folder_path=folder_path
)
# validate input variable configuration
if config_validation:
self._validate_stodym_plus_config(
storage_dict=storage_dict,
year_limit=year_limit,
trap_equation=trap_equation,
trap_threshold=trap_threshold,
trap_constant=trap_constant
)
# input location and storage capacity of dams
base_storage = storage_dict.copy()
storage_sum = sum(base_storage.values())
dam_ids = list(storage_dict.keys())
# base storage DataFrame
bs_df = self._base_storage_df(
storage_dict=base_storage
)
# stream drainage area and sediment inflow to the stream
stream_gdf = geopandas.read_file(
filename=stream_file
)
stream_area = max(stream_gdf['csa_m2'])
stream_sediment = max(stream_gdf['cumsed_kg'])
# adjacent downstream connectivity of dams
connection_downstream = self.connectivity_adjacent_downstream_dam(
stream_file=stream_file,
dam_list=dam_ids
)
# upstream metrics of dams
metric_dict = self.upstream_metrics_summary(
stream_file=stream_file,
dam_list=dam_ids
)
# initial adjacent upstream connectivity of dams
connection_upstream = metric_dict['adjacent_upstream_dams']
# intial drainage area of dams
drainage_area = metric_dict['controlled_drainage_m2']
# intial sediment inflow to dams from own drainage area
sediment_local = metric_dict['sediment_inflow_kg']
# intial sediment release from dams
sediment_release = {
d_id: 0.0 for d_id in dam_ids
}
# DataFrames for storing dam-wise parameter in annual time steps
drainage_df = pandas.DataFrame(
columns=dam_ids
) # controlled drainage area percentage relative to total stream drainage area
storage_df = pandas.DataFrame(
columns=dam_ids
) # remaining storage percentage relative to own intial storage
te_df = pandas.DataFrame(
columns=dam_ids
) # trapping efficiency
sedtrap_df = pandas.DataFrame(
columns=dam_ids
) # trapped sediment percentage relative to total stream sediment input
# a DataFrame for storing dam lifespan in years
life_df = pandas.DataFrame(
index=dam_ids
)
life_df['life_year'] = 0
# a DataFrame to store dam-system paramters in annual time steps
system_df = pandas.DataFrame(
columns=[
'drainage_%',
'storage_%',
'sedtrap_%',
'sedrelease_%',
'dam_removed',
'dam_active'
]
)
# iterate years
for year in range(year_limit):
# break if all dams are removed
if len(dam_ids) == 0:
break
# trap efficiency of dams
if trap_equation:
trap_efficiency = self.trap_efficiency_brown(
storage_dict=storage_dict,
area_dict=drainage_area,
brown_d=brown_d
)
else:
assert isinstance(trap_constant, (int, float))
trap_efficiency = {
d_id: trap_constant for d_id in dam_ids
}
# store dam-wise parameters in annual time steps
for d_id in dam_ids:
drainage_df.loc[year + 1, d_id] = 100 * drainage_area[d_id] / stream_area
storage_df.loc[year + 1, d_id] = 100 * storage_dict[d_id] / base_storage[d_id]
te_df.loc[year + 1, d_id] = trap_efficiency[d_id]
# check if any dam removal is required
dam_removal = [
d_id for d_id in dam_ids if trap_efficiency[d_id] <= trap_threshold or storage_dict[d_id] <= 0
]
# store inactive dams
system_df.loc[year + 1, 'dam_removed'] = dam_removal
# if any dam is removed
if len(dam_removal) > 0:
# update dam identifiers
dam_ids = [i for i in dam_ids if i not in dam_removal]
# update network features
connection_downstream = self.connectivity_adjacent_downstream_dam(
stream_file=stream_file,
dam_list=dam_ids
)
metric_dict = self.upstream_metrics_summary(
stream_file=stream_file,
dam_list=dam_ids
)
connection_upstream = metric_dict['adjacent_upstream_dams']
drainage_area = metric_dict['controlled_drainage_m2']
sediment_local = metric_dict['sediment_inflow_kg']
# update storage dictionary
storage_dict = {
d_id: storage_dict[d_id] for d_id in dam_ids
}
# released sediment from upstream dams
sediment_upstream = {
d_id: sum([sediment_release[j] for j in connection_upstream[d_id]]) for d_id in dam_ids
}
# sediment inflow to dams from own drainage area plus outflow from upstream dams
sediment_inflow = {
d_id: sediment_local[d_id] + sediment_upstream[d_id] for d_id in dam_ids
}
# sediment trapping behind dams
sediment_trap = {
d_id: sediment_inflow[d_id] * trap_efficiency[d_id] for d_id in dam_ids
}
# sediment trapping percentage relative to total stream sediment input
sedtrap_percent = {
d_id: 100 * sediment_trap[d_id] / stream_sediment for d_id in dam_ids
}
# store dam-wise sediment trapping in annual time steps
for d_id in dam_ids:
sedtrap_df.loc[year + 1, d_id] = sedtrap_percent[d_id]
# sediment release from dams
sediment_release = {
d_id: sediment_inflow[d_id] - sediment_trap[d_id] for d_id in dam_ids
}
# store dam-system parameters in annual time steps
system_df.loc[year + 1, 'dam_active'] = dam_ids
system_df.loc[year + 1, 'drainage_%'] = 100 * sum(drainage_area.values()) / stream_area
system_df.loc[year + 1, 'storage_%'] = 100 * sum(storage_dict.values()) / storage_sum
system_df.loc[year + 1, 'sedtrap_%'] = 100 * sum(sediment_trap.values()) / stream_sediment
release_outlets = sum([sediment_release[d_id] for d_id in dam_ids if connection_downstream[d_id] == -1])
release_undrainage = stream_sediment - sum(sediment_local.values())
release_fraction = (release_outlets + release_undrainage) / stream_sediment
system_df.loc[year + 1, 'sedrelease_%'] = 100 * release_fraction
# break if sediment release cross a certain threshold of total stream sediment inputs
if release_fraction > release_threshold:
break
# annual filling of dam storage due to sediment trapping
storage_filling = {
d_id: sediment_trap[d_id] / sediment_density for d_id in dam_ids
}
# remaining storage of dams
storage_dict = {
d_id: storage_dict[d_id] - storage_filling[d_id] for d_id in dam_ids
}
# update dam-wise life expectancy
for d_id in dam_ids:
life_df.loc[d_id, 'life_year'] = life_df.loc[d_id, 'life_year'] + 1
# dam lifespan DataFrame
life_df = life_df.reset_index(
names=['dam_id']
)
# ouptut dictionary
output = {
'dam_initial_storage': bs_df,
'dam_lifespan': life_df,
'system_statistics': system_df,
'dam_drainage_area': drainage_df,
'dam_remaining_storage': storage_df,
'dam_trap_efficiency': 100 * te_df,
'dam_trapped_sediment': sedtrap_df
}
unchanged_keys = [
'dam_initial_storage',
'dam_lifespan'
]
for key, df in output.items():
if key not in unchanged_keys:
df = df.reset_index(
names=['start_year']
)
output[key] = df
# write output dictionary to JSON files
if folder_path is not None:
for key, df in output.items():
json_file = os.path.join(folder_path, f'{key}.json')
df.to_json(
path_or_buf=json_file,
orient='records',
indent=4
)
return output
[docs]
def stodym_plus_drainage_response(
self,
stream_file: str,
flwdir_file: str,
storage_dict: dict[int, float],
sediment_density: int | float,
year_limit: int,
folder_path: str,
trap_equation: bool = True,
trap_threshold: int | float = 0.1,
brown_d: int | float = 0.1,
trap_constant: typing.Optional[int | float] = None,
release_threshold: int | float = 0.95
) -> pandas.DataFrame:
'''
Simulate the detailed annual storage dynamics of a dam system and saves the output
to the specified input directory. Refer to :meth:`OptiDamTool.Network.stodym_plus`
for full implementation details.
Based on the output DataFrame ``system_statistics``, which contains a ``start_year`` column
(starting from 1), this method generates GeoDataFrames for dam location points and their
controlled drainage polygons. These are saved as:
- ``year_<start_year>_dam_location_point.geojson``
- ``year_<start_year>_dam_drainage_polygon.geojson``
Files are created for each year where a dam becomes inactive, along with two additional
GeoJSON files for ``start_year = 0``, representing the initial condition.
Refer to :meth:`OptiDamTool.WatemSedem.dam_controlled_drainage_polygons` for more details
on GeoDataFrame generation.
In addition, a file ``all_potential_dam_location.geojson`` is created, which contains potential dam locations.
Each point corresponds to a unique stream segment.
All GeoJSON files include the ``ws_id`` field to support cross-referencing with the stream network.
The ``year_{start_year}_dam_drainage_polygon.geojson`` files also contain an ``area_%`` column,
which represents the percentage of the total stream drainage area controlled by each dam.
Parameters
----------
stream_file : str
Path to the input stream GeoJSON file, generated by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`.
flwdir_file : str
Path to the input flow direction raster file ``flowdir.tif``,
generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`.
storage_dict : dict
Dictionary mapping dam identifiers to initial storage capacities in cubic meters.
sediment_density : float
Density of sediment in kilograms per cubic meter.
year_limit : int
Maximum number of simulation years. Simulation stops earlier if all dams retire.
folder_path : str
Path to the folder where JSON and GeoJSON files will be saved.
trap_equation : bool, optional
If True (default), trap efficiency is calculated using the Brown (1943) formula.
Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_.
trap_threshold : float, optional
Minimum trap efficiency required to keep a dam active (default 0.1). Dams with efficiency below this value are treated as inactive.
brown_d : float, optional
Empirical parameter used in the trap efficiency equation. It ranges from 0.046 to 1,
with a mean value of 0.1, which is also the default. Refer to
`Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_.
trap_constant : float, optional
Fixed trap efficiency value applied when ``trap_equation=False``.
Must be greater than ``trap_threshold`` and less than 1. Default is None.
release_threshold: float, optional
Minimum sediment release fraction threshold of the total stream sediment input to stop the simulation.
The default is 0.95, meaning 95% of the total stream sediment input is leaving the study area.
If the user does not want to consider this parameter in the simulation, set its value to 1.0.
Returns
-------
DataFrame
A DataFrame with columns ``start_year``, ``dam_removed``, and ``dam_active``.
The ``start_year`` starts from 0 and helps trace the creation of corresponding GeoJSON files.
'''
# dam system storage dynamics and save output
stodym_output = self.stodym_plus(
stream_file=stream_file,
storage_dict=storage_dict,
sediment_density=sediment_density,
year_limit=year_limit,
trap_equation=trap_equation,
trap_threshold=trap_threshold,
brown_d=brown_d,
trap_constant=trap_constant,
release_threshold=release_threshold,
folder_path=folder_path
)
# stream GeoDataFrame
stream_gdf = geopandas.read_file(
filename=stream_file
)
stream_area = max(stream_gdf['csa_m2'])
# all dam location Point GeoDataFrame
adl_gdf = stream_gdf.copy()
adl_gdf = adl_gdf[['ws_id', 'geometry']]
adl_gdf['pour_coords'] = adl_gdf.geometry.apply(
lambda x: x.coords[-2]
)
adl_gdf['geometry'] = adl_gdf.apply(
lambda row: shapely.Point(row['pour_coords']),
axis=1
)
adl_gdf = adl_gdf.drop(
columns=['pour_coords']
)
location_file = os.path.join(folder_path, 'all_potential_dam_location.geojson')
adl_gdf.to_file(
filename=location_file
)
# zipped iterator of year, dam_removed, and dam_active
system_df = stodym_output['system_statistics']
variable_list = list(
zip(
[0] + system_df['start_year'].tolist(),
[[]] + system_df['dam_removed'].tolist(),
[list(storage_dict.keys())] + system_df['dam_active'].tolist()
)
)
# iterate variables
for year, dam_removed, dam_active in variable_list:
# if dam is active
if len(dam_active) > 0:
# if year is 0 or dam is removed
if year == 0 or len(dam_removed) > 0:
# dam location points and their controlled drainage area polygons
gdf_dict = WatemSedem().dam_controlled_drainage_polygons(
flwdir_file=flwdir_file,
location_file=location_file,
dam_list=dam_active,
folder_path=folder_path
)
# area percentage of polygons
gdf_dict['dam_drainage_polygon']['area_%'] = 100 * gdf_dict['dam_drainage_polygon']['area_m2'] / stream_area
# save the GeoDataFrames
for key, gdf in gdf_dict.items():
geojson_file = os.path.join(folder_path, f'year_{year}_{key}.geojson')
gdf.to_file(
filename=geojson_file
)
# delete files that are not required
WatemSedem().file.delete_by_name(
folder_path=folder_path,
file_names=[
'dam_location_point',
'dam_drainage_polygon'
]
)
# output DataFrame of dam scenarios
output = pandas.DataFrame(
data=variable_list,
columns=[
'start_year',
'dam_removed',
'dam_active'
]
)
return output