Source code for OptiDamTool.network

import GeoAnalyze
import geopandas
import shapely
import pandas
import typing
import os
from .watem_sedem import WatemSedem
from . import utility


[docs] class Network: ''' Provide functionality to establish network-based connectivity and operations between dams. '''
[docs] def connectivity_adjacent_downstream_dam( self, stream_file: str, dam_list: list[int], sort_dam: bool = False ) -> dict[int, int]: ''' Generate adjacent downstream connectivity between dams based on the input stream network. Each dam is represented by a unique stream segment identifier. Parameters ---------- stream_file: str Path to the input stream shapefile, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. dam_list : list List of stream segment identifiers representing dam locations. sort_dam : bool, optional If True, the dam identifiers in the output dictionary keys will be sorted in ascending order. Default is False. Returns ------- dict A dictionary with each key is a dam's stream identifier, and the corresponding value is the stream identifier of the directly connected downstream dam. A value of -1 indicates that the dam has no downstream connectivity. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.connectivity_adjacent_downstream_dam ), vars_values=locals() ) # check distinct stream identifiers for dams if len(set(dam_list)) < len(dam_list): raise ValueError('Duplicate stream identifiers found in the input dam list') # connectivity from upstream to downstream connect_dict = GeoAnalyze.Stream()._connectivity_upstream_to_downstream( stream_file=stream_file, stream_col='ws_id' ) # adjacent downstream connectvity adc_dict = {} for i in dam_list: if i not in connect_dict: raise ValueError(f'Invalid stream identifier {i} for a dam') # all dam connectivity towards outlet stream_connect = connect_dict[i] dam_connect = list( filter(lambda x: x in stream_connect, dam_list) ) # if no downstream dam is found if len(dam_connect) == 0: adc_dict[i] = -1 # extract the adjacent downstream dam else: dam_indices = [ stream_connect.index(j) for j in dam_connect ] adc_dict[i] = stream_connect[min(dam_indices)] # filtered connectivity for stream outlet identifiers where key and value are same adc_dict = { k: v if k != v else -1 for k, v in adc_dict.items() } # output dictionary output = dict(sorted(adc_dict.items())) if sort_dam else adc_dict return output
[docs] def connectivity_adjacent_upstream_dam( self, stream_file: str, dam_list: list[int], sort_dam: bool = False ) -> dict[int, list[int]]: ''' Compute adjacent upstream connectivity between dams based on the input stream network. Each dam is represented by a unique stream segment identifier. Parameters ---------- stream_file: str Path to the input stream shapefile, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. dam_list : list List of stream segment identifiers representing dam locations. sort_dam : bool, optional If True, both the dam identifier keys and their corresponding value lists in the output dictionary will be sorted in ascending order. Default is False. Returns ------- dict A dictionary where each key is a dam's stream identifier, and the corresponding value is a list of adjacent upstream dam identifiers. An empty list indicates no upstream connectivity. ''' # adjacent downstream connectivity dictionary adc_dict = self.connectivity_adjacent_downstream_dam( stream_file=stream_file, dam_list=dam_list ) # DataFrame creation for adjacent downstream connectivity df_dict = { 'dam_id': adc_dict.keys(), 'adc_id': adc_dict.values() } df = pandas.DataFrame( data=df_dict ) # non-empty adjacent upstream connectivity auc_dict = { j: k['dam_id'].tolist() for j, k in df.groupby(by='adc_id') } # adjacent upstream connectivity of all dams auc_dict = { i: auc_dict[i] if i in auc_dict else list() for i in dam_list } # output dictionary output = {k: sorted(v) for k, v in sorted(auc_dict.items())} if sort_dam else auc_dict return output
[docs] def controlled_drainage_area( self, stream_file: str, dam_list: list[int], sort_dam: bool = False ) -> dict[int, float]: ''' Compute the controlled upstream drainage area for selected dams based on their locations within the input stream network. Each dam is represented by a unique stream segment identifier. Parameters ---------- stream_file : str Path to the input stream shapefile, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. info_file : str Path to the stream information text file ``stream_information.txt``, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. dam_list : list List of stream segment identifiers representing dam locations. sort_dam : bool, optional If True, the dam identifiers in the output dictionary keys will be sorted in ascending order. Default is False. Returns ------- dict A dictionary where each key is a dam's stream segment identifier, and the corresponding value is the effective upstream drainage area in square meters. ''' # adjacent downstream connectivity dictionary auc_dict = self.connectivity_adjacent_upstream_dam( stream_file=stream_file, dam_list=dam_list ) # cumulative subbasin area dictionary from stream information DataFrame stream_gdf = geopandas.read_file( filename=stream_file ) cumarea_dict = dict( zip(stream_gdf['ws_id'], stream_gdf['csa_m2']) ) # effective drainage area dictionary eda_dict = {} for i in dam_list: eda_dict[i] = cumarea_dict[i] - sum([cumarea_dict[j] for j in auc_dict[i]]) # output dictionary output = dict(sorted(eda_dict.items())) if sort_dam else eda_dict return output
[docs] def sediment_inflow_from_drainage_area( self, stream_file: str, dam_list: list[int], sort_dam: bool = False ) -> dict[int, float]: ''' Compute the sediment inflows in kilograms for selected dams based on their local drainage areas within the input stream network. Each dam is identified by a unique stream segment identifier. Parameters ---------- stream_file : str Path to the input stream GeoJSON file, generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`. dam_list : list List of stream segment identifiers representing dam locations. sort_dam : bool, optional If True, the dam identifiers in the output dictionary keys will be sorted in ascending order. Default is False. Returns ------- dict A dictionary where each key is a dam's stream segment identifier, and the corresponding value is the effective sediment inflow in kilograms. ''' # adjacent downstream connectivity dictionary auc_dict = self.connectivity_adjacent_upstream_dam( stream_file=stream_file, dam_list=dam_list ) # cumulative sediment input from stream information GeoDataFrame stream_gdf = geopandas.read_file( filename=stream_file ) cumsed_dict = dict( zip(stream_gdf['ws_id'], stream_gdf['cumsed_kg']) ) # effective sediment inflow dictionary esi_dict = {} for i in dam_list: esi_dict[i] = cumsed_dict[i] - sum([cumsed_dict[j] for j in auc_dict[i]]) # output dictionary output = dict(sorted(esi_dict.items())) if sort_dam else esi_dict return output
[docs] def upstream_metrics_summary( self, stream_file: str, dam_list: list[int], sort_dam: bool = False ) -> dict[str, dict[int, typing.Any]]: ''' Compute a summary of upstream metrics for selected dams based on their locations within the input stream network. Each dam is identified by a unique stream segment identifier. The function returns a dictionary containing three sub-dictionaries, where each key corresponds to a dam's stream identifier: - **adjacent_upstream_dams**: A dictionary where each value is a list of directly connected upstream dam identifiers. An empty list indicates no upstream connectivity. - **controlled_drainage_m2**: A dictionary mapping each dam to its effective upstream drainage area in square meters, based on upstream dam connections. - **sediment_inflow_kg**: A dictionary mapping each dam to its effective sediment inflow in kilograms, based on upstream dam connections. Parameters ---------- stream_file : str Path to the input stream GeoJSON file, generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`. dam_list : list List of stream segment identifiers representing dam locations. sort_dam : bool, optional If True, the dam identifiers in three sub-dictionaries keys will be sorted in ascending order. Default is False. Returns ------- dict A dictionary with three keys: ``adjacent_upstream_connections``, ``controlled_drainage_m2``, and ``sediment_inflow_kg``. Each key maps to a sub-dictionary where the keys are dam segment identifiers and the values are the computed metrics. ''' # adjacent downstream connectivity dictionary auc_dict = self.connectivity_adjacent_upstream_dam( stream_file=stream_file, dam_list=dam_list ) # cumulative inputs dictionary stream_gdf = geopandas.read_file(stream_file) cumarea_dict = dict(zip(stream_gdf['ws_id'], stream_gdf['csa_m2'])) cumsed_dict = dict(zip(stream_gdf['ws_id'], stream_gdf['cumsed_kg'])) # effective metric dictionaries eda_dict = {} esi_dict = {} for i in dam_list: # effective drainage area for upstream connections eda_dict[i] = cumarea_dict[i] - sum([cumarea_dict[j] for j in auc_dict[i]]) # effective sediment inflow for upstream connections esi_dict[i] = cumsed_dict[i] - sum([cumsed_dict[j] for j in auc_dict[i]]) # output metric dictionary output = { 'adjacent_upstream_dams': {k: sorted(v) for k, v in sorted(auc_dict.items())} if sort_dam else auc_dict, 'controlled_drainage_m2': dict(sorted(eda_dict.items())) if sort_dam else eda_dict, 'sediment_inflow_kg': dict(sorted(esi_dict.items())) if sort_dam else esi_dict } return output
[docs] def trap_efficiency_brown( self, storage_dict: dict[int, float], area_dict: dict[int, float], brown_d: int | float = 0.1 ) -> dict[int, float]: ''' Compute the Trap Efficiency (TE) without percentage conversion of a dam using the Brown (1943) formula. Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_. Parameters ---------- storage_dict : dict A dictionary where each key is a dam identifier, and the corresponding value is the dam's storage capacity in cubic meters. area_dict : dict A dictionary where each key is a dam identifier, and the corresponding value is the dam's catchment area in square meters. The function internally converts the area from square meters to square kilometers. brown_d : float, optional Empirical parameter used in the equation. It ranges from 0.046 to 1, with a mean value of 0.1, which is also the default. Returns ------- dict A dictionary where each key is a dam identifier, and the corresponding value is the computed TE. ''' # check if both dictionaries have same keys if area_dict.keys() != storage_dict.keys(): raise KeyError('Mismatch of keys between storage and area dictionaries') # sediment trapping efficinecy ste_dict = {} for dam_id in storage_dict: area_km2 = area_dict[dam_id] / 10**6 ste_dict[dam_id] = 1 - (1 / (1 + 0.0021 * brown_d * storage_dict[dam_id] / area_km2)) return ste_dict
def _base_storage_df( self, storage_dict: dict[int, float] ) -> pandas.DataFrame: ''' Convert the input storage dictionary of dams into a DataFrame. ''' # DataFrame from dictionary df = pandas.DataFrame.from_dict( data=storage_dict, orient='index', columns=['storage_m3'] ) df = df.reset_index( names=['dam_id'] ) # storage percentage df['storage_%'] = 100 * df['storage_m3'] / df['storage_m3'].sum() # Cummulative storage df['cum_m3'] = df['storage_m3'].cumsum() return df def _validate_stodym_plus_config( self, storage_dict: dict[int, float], year_limit: int, trap_equation: bool, trap_threshold: float, trap_constant: typing.Optional[float] ) -> None: ''' Validate input variables configuration for the method :meth:`OptiDamTool.Network.stodym_plus`. ''' # check intial dam storage capacity cannot be negative for d_id in storage_dict: if storage_dict[d_id] < 0: raise ValueError( f'Invalid negative intial storage volume {storage_dict[d_id]} for the dam identifier {d_id} was received' ) # check that year_limit value is greater than 0 if year_limit <= 0: raise ValueError( f'year_limit must be greater than 0, but received {year_limit}' ) # check trap_equation if not trap_equation: if trap_constant is None or not isinstance(trap_constant, (int, float)): raise TypeError( f'trap_constant must be a numeric value when "trap_equation=False", but got type "{type(trap_constant).__name__}"' ) if not (trap_threshold < trap_constant <= 1): raise ValueError( f'trap_constant {trap_constant} is invalid: must satisfy trap_threshold ({trap_threshold}) < trap_constant <= 1' ) return None
[docs] def stodym_plus( self, stream_file: str, storage_dict: dict[int, float], sediment_density: int | float, year_limit: int, trap_equation: bool = True, trap_threshold: int | float = 0.1, brown_d: int | float = 0.1, trap_constant: typing.Optional[int | float] = None, release_threshold: int | float = 0.95, config_validation: bool = True, folder_path: typing.Optional[str] = None ) -> dict[str, pandas.DataFrame]: ''' Simulate the annual storage dynamics of a dam system influenced by sedimentation, and returns a dictionary containing dam lifespan, system-wide statistics, individual metrics, and simulation parameters. The simulation runs for a user-defined number of years, unless all dams become inactive earlier. Sediment inflow to a dam is calculated as the sum of sediment yield from the dam's controlled drainage area and the sediment outflow from upstream connected dams. The amount of sediment trapped by a dam is calculated as the product of the sediment inflow to the dam and its trap efficiency, which ranges from 0 to 1. At the end of each simulation year, dam storage capacities are updated using a mass balance approach based on the trapped sediment. The simulation ends when either all individual dams become inactive or the entire dam system becomes inactive, whichever occurs first. A dam is considered inactive if: - Its storage capacity becomes non-positive (``<= 0``). - Its sediment trap efficiency falls below or equals the threshold (``<= trap_threshold``). The dam system is considered inactive if: - The maximum number of simulation years (``year_limit``) is reached. - The sediment release fraction of total stream input exceeds the threshold (``> release_threshold``). Each dam in a system is uniquely identified by its stream segment identifier. When a dam becomes inactive, system connectivity, controlled drainage areas, and sediment inflows to the remaining dams are updated accordingly. .. warning:: If a dam retains positive storage and the `trap_threshold` is not set appropriately, it may remain active indefinitely. Since trap efficiency is computed via :meth:`OptiDamTool.Network.trap_efficiency_brown`, which depends on dam storage, using a very small threshold (e.g., 0.0001) may lead to unrealistic system behavior due to prolonged dam lifespan despite minimal sediment trapping. A recommended value is 0.1 based on `Pal et al. (2018) <https://doi.org/10.1016/j.jhydrol.2018.02.051>`_ and `Pal and Galelli (2019) <https://doi.org/10.1016/j.envsoft.2019.05.007>`_. This value may be adjusted to suit user needs. The simulation results are returned as a dictionary and can optionally be saved to the input directory as a set of JSON files. Each JSON file is named after the corresponding dictionary key and contains the related DataFrame. A detailed description of the keys and their corresponding DataFrames is provided below. - **dam_initial_storage**: A DataFrame with the following columns: - ``dam_id``: Stream segment identifier of the dam. - ``storage_m3``: Initial storage capacity of the dam, in cubic meters. - ``storage_%``: Initial storage of the dam as a percentage of the total storage across all dams. - ``cum_m3``: Cumulative initial storage (up to this dam), in cubic meters, to give an idea of the total capacity. - **dam_lifespan**: A DataFrame with the following columns: - ``dam_id``: Stream segment identifier of the dam. - ``life_year``: Number of years the dam was active. - **system_statistics**: A DataFrame that stores system-wide statistics with the following columns: - ``start_year``: Start of each simulation year, beginning from 1. - ``drainage_%``: Percentage of the total controlled drainage area relative to the total stream drainage area, at the beginning of the simulation year. - ``storage_%``: Percentage of total remaining storage across all dams relative to the initial total storage, at the beginning of the simulation year. - ``sedtrap_%``: Percentage of total sediment trapped by all dams relative to the total sediment input across all stream segments, during the simulation year. - ``sedrelease_%``: Percentage of sediment released by terminal dams and drainage areas not covered by the dam system, relative to the total sediment input across all stream segments, during the simulation year. - ``dam_removed``: List of dams removed at the beginning of the year. An empty list implies all dams in the system remain active. - ``dam_active``: List of active dams at the beginning of the year. An empty list in the final simulation year, if it occurs, indicates that all dams in the system became inactive before completing the user-defined number of simulation years. .. note:: The sum of sediment trapping and release percentages for the dam system may not equal 100%. This is because these percentages are calculated relative to the total sediment input across all stream segments, using a fixed base value for consistency. In practice, the actual sediment inflow to the dams is greater than the total sediment input across all stream segments, due to additional contributions from sediment released by upstream dams. Furthermore, a dam's sediment trapping efficiency depends on both its controlled drainage area and remaining storage capacity, making sediment dynamics within the system more complex. However, over time, the sum gradually converges to 100% as more dams become inactive, resulting in reduced sediment contributions from upstream dams. - **dam_drainage_area**: A DataFrame with columns ``start_year`` and dam identifiers. The cell values represent the annual controlled drainage area percentages for each dam, relative to the total stream drainage area at the beginning of the simulation year. - **dam_remaining_storage**: A DataFrame with columns ``start_year`` and dam identifiers. The cell values represent the annual remaining storage percentages, relative to each dam's initial storage, at the beginning of the simulation year. - **dam_trap_efficiency**: A DataFrame with columns ``start_year`` and dam identifiers. The cell values represent the annual trap efficiency percentages at the beginning of the simulation year. - **dam_trapped_sediment**: A DataFrame with columns ``start_year`` and dam identifiers. The cell values represent the annual sediment trapping percentages, relative to the total sediment input across all stream segments, at the end of the simulation year. .. note:: The values in ``dam_drainage_area``, ``dam_remaining_storage``, and ``dam_trap_efficiency`` are recorded at the beginning of each simulation year. This design ensures that the final state of each dam is captured just before its removal from the system, allowing users to understand the conditions that led to its inactivation. Parameters ---------- stream_file : str Path to the input stream GeoJSON file, generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`. storage_dict : dict Dictionary mapping dam identifiers to initial storage capacities in cubic meters. sediment_density : float Density of sediment in kilograms per cubic meter. year_limit : int Maximum number of simulation years. Simulation stops earlier if all dams retire. trap_equation : bool, optional If True (default), trap efficiency is calculated using the Brown (1943) formula. Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_. trap_threshold : float, optional Minimum trap efficiency required to keep a dam active (default 0.1). Dams with efficiency below this value are treated as inactive. brown_d : float, optional Empirical parameter used in the trap efficiency equation. It ranges from 0.046 to 1, with a mean value of 0.1, which is also the default. Refer to `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_. trap_constant : float, optional Fixed trap efficiency value applied when ``trap_equation=False``. Must be greater than ``trap_threshold`` and less than 1. Default is None. release_threshold: float, optional Maximum sediment release fraction threshold of the total stream sediment input to stop the simulation. The default is 0.95, meaning 95% of the total stream sediment input is leaving the study area. If the user does not want to consider this parameter in the simulation, set its value to 1.0. config_validation: bool, optional If ``True`` (default), validates the configuration of input parameters. Users are advised not to modify this parameter. This option is provided to speed up computations in the :class:`OptiDamTool.SystemDesign` class, where configuration checking is not required for every scenario generated by genetic algorithms. folder_path : str, optional Path to the folder where JSON files will be saved. If None (default), no files are saved. Returns ------- dict A dictionary containing detailed information on dam lifespan, system-wide statistics, individual metrics, and simulation parameters. ''' # check static type of input variable origin utility._validate_variable_origin_static_type( vars_types=typing.get_type_hints( obj=self.stodym_plus ), vars_values=locals() ) # check the vailidity of folder path if folder_path is not None: utility._validate_folder_path( folder_path=folder_path ) # validate input variable configuration if config_validation: self._validate_stodym_plus_config( storage_dict=storage_dict, year_limit=year_limit, trap_equation=trap_equation, trap_threshold=trap_threshold, trap_constant=trap_constant ) # input location and storage capacity of dams base_storage = storage_dict.copy() storage_sum = sum(base_storage.values()) dam_ids = list(storage_dict.keys()) # base storage DataFrame bs_df = self._base_storage_df( storage_dict=base_storage ) # stream drainage area and sediment inflow to the stream stream_gdf = geopandas.read_file( filename=stream_file ) stream_area = max(stream_gdf['csa_m2']) stream_sediment = max(stream_gdf['cumsed_kg']) # adjacent downstream connectivity of dams connection_downstream = self.connectivity_adjacent_downstream_dam( stream_file=stream_file, dam_list=dam_ids ) # upstream metrics of dams metric_dict = self.upstream_metrics_summary( stream_file=stream_file, dam_list=dam_ids ) # initial adjacent upstream connectivity of dams connection_upstream = metric_dict['adjacent_upstream_dams'] # intial drainage area of dams drainage_area = metric_dict['controlled_drainage_m2'] # intial sediment inflow to dams from own drainage area sediment_local = metric_dict['sediment_inflow_kg'] # intial sediment release from dams sediment_release = { d_id: 0.0 for d_id in dam_ids } # DataFrames for storing dam-wise parameter in annual time steps drainage_df = pandas.DataFrame( columns=dam_ids ) # controlled drainage area percentage relative to total stream drainage area storage_df = pandas.DataFrame( columns=dam_ids ) # remaining storage percentage relative to own intial storage te_df = pandas.DataFrame( columns=dam_ids ) # trapping efficiency sedtrap_df = pandas.DataFrame( columns=dam_ids ) # trapped sediment percentage relative to total stream sediment input # a DataFrame for storing dam lifespan in years life_df = pandas.DataFrame( index=dam_ids ) life_df['life_year'] = 0 # a DataFrame to store dam-system paramters in annual time steps system_df = pandas.DataFrame( columns=[ 'drainage_%', 'storage_%', 'sedtrap_%', 'sedrelease_%', 'dam_removed', 'dam_active' ] ) # iterate years for year in range(year_limit): # break if all dams are removed if len(dam_ids) == 0: break # trap efficiency of dams if trap_equation: trap_efficiency = self.trap_efficiency_brown( storage_dict=storage_dict, area_dict=drainage_area, brown_d=brown_d ) else: assert isinstance(trap_constant, (int, float)) trap_efficiency = { d_id: trap_constant for d_id in dam_ids } # store dam-wise parameters in annual time steps for d_id in dam_ids: drainage_df.loc[year + 1, d_id] = 100 * drainage_area[d_id] / stream_area storage_df.loc[year + 1, d_id] = 100 * storage_dict[d_id] / base_storage[d_id] te_df.loc[year + 1, d_id] = trap_efficiency[d_id] # check if any dam removal is required dam_removal = [ d_id for d_id in dam_ids if trap_efficiency[d_id] <= trap_threshold or storage_dict[d_id] <= 0 ] # store inactive dams system_df.loc[year + 1, 'dam_removed'] = dam_removal # if any dam is removed if len(dam_removal) > 0: # update dam identifiers dam_ids = [i for i in dam_ids if i not in dam_removal] # update network features connection_downstream = self.connectivity_adjacent_downstream_dam( stream_file=stream_file, dam_list=dam_ids ) metric_dict = self.upstream_metrics_summary( stream_file=stream_file, dam_list=dam_ids ) connection_upstream = metric_dict['adjacent_upstream_dams'] drainage_area = metric_dict['controlled_drainage_m2'] sediment_local = metric_dict['sediment_inflow_kg'] # update storage dictionary storage_dict = { d_id: storage_dict[d_id] for d_id in dam_ids } # released sediment from upstream dams sediment_upstream = { d_id: sum([sediment_release[j] for j in connection_upstream[d_id]]) for d_id in dam_ids } # sediment inflow to dams from own drainage area plus outflow from upstream dams sediment_inflow = { d_id: sediment_local[d_id] + sediment_upstream[d_id] for d_id in dam_ids } # sediment trapping behind dams sediment_trap = { d_id: sediment_inflow[d_id] * trap_efficiency[d_id] for d_id in dam_ids } # sediment trapping percentage relative to total stream sediment input sedtrap_percent = { d_id: 100 * sediment_trap[d_id] / stream_sediment for d_id in dam_ids } # store dam-wise sediment trapping in annual time steps for d_id in dam_ids: sedtrap_df.loc[year + 1, d_id] = sedtrap_percent[d_id] # sediment release from dams sediment_release = { d_id: sediment_inflow[d_id] - sediment_trap[d_id] for d_id in dam_ids } # store dam-system parameters in annual time steps system_df.loc[year + 1, 'dam_active'] = dam_ids system_df.loc[year + 1, 'drainage_%'] = 100 * sum(drainage_area.values()) / stream_area system_df.loc[year + 1, 'storage_%'] = 100 * sum(storage_dict.values()) / storage_sum system_df.loc[year + 1, 'sedtrap_%'] = 100 * sum(sediment_trap.values()) / stream_sediment release_outlets = sum([sediment_release[d_id] for d_id in dam_ids if connection_downstream[d_id] == -1]) release_undrainage = stream_sediment - sum(sediment_local.values()) release_fraction = (release_outlets + release_undrainage) / stream_sediment system_df.loc[year + 1, 'sedrelease_%'] = 100 * release_fraction # break if sediment release cross a certain threshold of total stream sediment inputs if release_fraction > release_threshold: break # annual filling of dam storage due to sediment trapping storage_filling = { d_id: sediment_trap[d_id] / sediment_density for d_id in dam_ids } # remaining storage of dams storage_dict = { d_id: storage_dict[d_id] - storage_filling[d_id] for d_id in dam_ids } # update dam-wise life expectancy for d_id in dam_ids: life_df.loc[d_id, 'life_year'] = life_df.loc[d_id, 'life_year'] + 1 # dam lifespan DataFrame life_df = life_df.reset_index( names=['dam_id'] ) # ouptut dictionary output = { 'dam_initial_storage': bs_df, 'dam_lifespan': life_df, 'system_statistics': system_df, 'dam_drainage_area': drainage_df, 'dam_remaining_storage': storage_df, 'dam_trap_efficiency': 100 * te_df, 'dam_trapped_sediment': sedtrap_df } unchanged_keys = [ 'dam_initial_storage', 'dam_lifespan' ] for key, df in output.items(): if key not in unchanged_keys: df = df.reset_index( names=['start_year'] ) output[key] = df # write output dictionary to JSON files if folder_path is not None: for key, df in output.items(): json_file = os.path.join(folder_path, f'{key}.json') df.to_json( path_or_buf=json_file, orient='records', indent=4 ) return output
[docs] def stodym_plus_drainage_response( self, stream_file: str, flwdir_file: str, storage_dict: dict[int, float], sediment_density: int | float, year_limit: int, folder_path: str, trap_equation: bool = True, trap_threshold: int | float = 0.1, brown_d: int | float = 0.1, trap_constant: typing.Optional[int | float] = None, release_threshold: int | float = 0.95 ) -> pandas.DataFrame: ''' Simulate the detailed annual storage dynamics of a dam system and saves the output to the specified input directory. Refer to :meth:`OptiDamTool.Network.stodym_plus` for full implementation details. Based on the output DataFrame ``system_statistics``, which contains a ``start_year`` column (starting from 1), this method generates GeoDataFrames for dam location points and their controlled drainage polygons. These are saved as: - ``year_<start_year>_dam_location_point.geojson`` - ``year_<start_year>_dam_drainage_polygon.geojson`` Files are created for each year where a dam becomes inactive, along with two additional GeoJSON files for ``start_year = 0``, representing the initial condition. Refer to :meth:`OptiDamTool.WatemSedem.dam_controlled_drainage_polygons` for more details on GeoDataFrame generation. In addition, a file ``all_potential_dam_location.geojson`` is created, which contains potential dam locations. Each point corresponds to a unique stream segment. All GeoJSON files include the ``ws_id`` field to support cross-referencing with the stream network. The ``year_{start_year}_dam_drainage_polygon.geojson`` files also contain an ``area_%`` column, which represents the percentage of the total stream drainage area controlled by each dam. Parameters ---------- stream_file : str Path to the input stream GeoJSON file, generated by :meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`. flwdir_file : str Path to the input flow direction raster file ``flowdir.tif``, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`. storage_dict : dict Dictionary mapping dam identifiers to initial storage capacities in cubic meters. sediment_density : float Density of sediment in kilograms per cubic meter. year_limit : int Maximum number of simulation years. Simulation stops earlier if all dams retire. folder_path : str Path to the folder where JSON and GeoJSON files will be saved. trap_equation : bool, optional If True (default), trap efficiency is calculated using the Brown (1943) formula. Further details can be found in `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_. trap_threshold : float, optional Minimum trap efficiency required to keep a dam active (default 0.1). Dams with efficiency below this value are treated as inactive. brown_d : float, optional Empirical parameter used in the trap efficiency equation. It ranges from 0.046 to 1, with a mean value of 0.1, which is also the default. Refer to `Verstraeten and Poesen (2000) <https://doi.org/10.1177/030913330002400204>`_. trap_constant : float, optional Fixed trap efficiency value applied when ``trap_equation=False``. Must be greater than ``trap_threshold`` and less than 1. Default is None. release_threshold: float, optional Minimum sediment release fraction threshold of the total stream sediment input to stop the simulation. The default is 0.95, meaning 95% of the total stream sediment input is leaving the study area. If the user does not want to consider this parameter in the simulation, set its value to 1.0. Returns ------- DataFrame A DataFrame with columns ``start_year``, ``dam_removed``, and ``dam_active``. The ``start_year`` starts from 0 and helps trace the creation of corresponding GeoJSON files. ''' # dam system storage dynamics and save output stodym_output = self.stodym_plus( stream_file=stream_file, storage_dict=storage_dict, sediment_density=sediment_density, year_limit=year_limit, trap_equation=trap_equation, trap_threshold=trap_threshold, brown_d=brown_d, trap_constant=trap_constant, release_threshold=release_threshold, folder_path=folder_path ) # stream GeoDataFrame stream_gdf = geopandas.read_file( filename=stream_file ) stream_area = max(stream_gdf['csa_m2']) # all dam location Point GeoDataFrame adl_gdf = stream_gdf.copy() adl_gdf = adl_gdf[['ws_id', 'geometry']] adl_gdf['pour_coords'] = adl_gdf.geometry.apply( lambda x: x.coords[-2] ) adl_gdf['geometry'] = adl_gdf.apply( lambda row: shapely.Point(row['pour_coords']), axis=1 ) adl_gdf = adl_gdf.drop( columns=['pour_coords'] ) location_file = os.path.join(folder_path, 'all_potential_dam_location.geojson') adl_gdf.to_file( filename=location_file ) # zipped iterator of year, dam_removed, and dam_active system_df = stodym_output['system_statistics'] variable_list = list( zip( [0] + system_df['start_year'].tolist(), [[]] + system_df['dam_removed'].tolist(), [list(storage_dict.keys())] + system_df['dam_active'].tolist() ) ) # iterate variables for year, dam_removed, dam_active in variable_list: # if dam is active if len(dam_active) > 0: # if year is 0 or dam is removed if year == 0 or len(dam_removed) > 0: # dam location points and their controlled drainage area polygons gdf_dict = WatemSedem().dam_controlled_drainage_polygons( flwdir_file=flwdir_file, location_file=location_file, dam_list=dam_active, folder_path=folder_path ) # area percentage of polygons gdf_dict['dam_drainage_polygon']['area_%'] = 100 * gdf_dict['dam_drainage_polygon']['area_m2'] / stream_area # save the GeoDataFrames for key, gdf in gdf_dict.items(): geojson_file = os.path.join(folder_path, f'year_{year}_{key}.geojson') gdf.to_file( filename=geojson_file ) # delete files that are not required WatemSedem().file.delete_by_name( folder_path=folder_path, file_names=[ 'dam_location_point', 'dam_drainage_polygon' ] ) # output DataFrame of dam scenarios output = pandas.DataFrame( data=variable_list, columns=[ 'start_year', 'dam_removed', 'dam_active' ] ) return output