import geopandas
import pandas
import scipy
import numpy
import platypus
import functools
import inspect
import typing
import os
import time
import datetime
import concurrent.futures
from .network import Network
from . import utility
[docs]
class SystemDesign:
'''
Provide functionality to optimize dam systems by evolutionary computing framework
`platypus-opt <https://github.com/Project-Platypus/Platypus>`_.
'''
def __init__(
self
) -> None:
'''
Initialize the class instance with a default attribute ``lifespan_epsilon`` to :math:`10^{-6}`.
This value serves as a lower bound for dam lifespans (in years) when calculating the
summation of logarithms of lifespans. It prevents undefined behavior from computing :math:`\\log(0)`
and ensures that the logarithmic penalty correctly applies to dams with zero or negligible lifespans.
'''
self.lifespan_epsilon = pow(10, - 6)
def _validate_preliminary_config(
self,
stream_file: str,
dam_number: int,
storage_bounds: tuple[int, int],
storage_multiplier: float,
seeds: int
) -> list[int]:
'''
Validate the input configuration of dams and their storage volumes.
'''
# check that storage_bounds has exactly two elements
if len(storage_bounds) != 2:
raise ValueError(
f'storage_bounds must contain exactly 2 integers, but received {len(storage_bounds)} elements'
)
# check that each element is a positive integer
for i in storage_bounds:
if not isinstance(i, int):
raise TypeError(
f'Each value in storage_bounds must be an integer, but got {i} of type "{type(i).__name__}"'
)
if i < 1:
raise ValueError(
f'Each value in storage_bounds must be greater than or equal to 1, but received {i}'
)
# Check that the lower bound is strictly less than the upper bound
if storage_bounds[0] >= storage_bounds[1]:
raise ValueError(
f'The lower bound {storage_bounds[0]} must be strictly less than the upper bound {storage_bounds[1]} in storage_bounds'
)
# check that storage_multiplier is greater than 0
if storage_multiplier <= 0:
raise ValueError(
f'storage_multiplier must be greater than 0, but received {storage_multiplier}'
)
# stream GeoDataFrame
stream_gdf = geopandas.read_file(
filename=stream_file
)
# list of stream identifiers for dams
stream_ids = list(stream_gdf['ws_id'])
# check that dam number is less than length of stream identifiers
if dam_number >= len(stream_ids):
raise ValueError(
f'dam_number {dam_number} is out of range; expected 1 <= dam_number < {len(stream_ids)}'
)
# check validity of seeds number
if seeds <= 0:
raise ValueError(
f'Input seeds must be greater than or equal to 1, but received {seeds}'
)
return stream_ids
def _validate_stodym_kwargs(
self,
stodym_config: dict[str, typing.Any]
) -> dict[str, typing.Any]:
'''
Validate the input configuration for the method :meth:`OptiDamTool.Network.stodym_plus`.
'''
# dictionary of valid keys
vars_valid = [
'sediment_density',
'year_limit',
'trap_equation',
'trap_threshold',
'brown_d',
'trap_constant',
'release_threshold'
]
# check validty of key names and their value types
for m_key, m_val in stodym_config.items():
if not isinstance(m_key, str):
raise TypeError(
f'Key "{m_key}" in stodym_config must be a string, but got type "{type(m_key).__name__}"'
)
if m_key not in vars_valid:
raise NameError(
f'Invalid key "{m_key}" in stodym_config; valid names are {vars_valid}'
)
# check required keys
required_keys = [
'sediment_density',
'year_limit'
]
for r_key in required_keys:
if r_key not in stodym_config:
raise KeyError(
f'Required key "{r_key}" is missing from stodym_config'
)
# add config_validation key to speed up computation
stodym_config['config_validation'] = False
return stodym_config
def _validate_algorithm_config(
self,
algorithm_name: str,
algorithm_config: dict[str, typing.Any]
) -> dict[str, typing.Any]:
'''
Validate the genetic algorithm configuration.
'''
# dictionary mapping between required key and algorithms
alg_rk = {
'population_size': [
'EpsNSGAII',
'EpsMOEA',
'IBEA',
'MOEAD',
'NSGAII',
'PESA2',
'SPEA2'
],
'epsilons': [
'EpsMOEA',
'EpsNSGAII'
],
'divisions_outer': [
'NSGAIII'
]
}
# supported algorithms
alg_support = sorted(set(j for i in alg_rk.values() for j in i))
# check valid algorithm_name
if algorithm_name not in alg_support:
raise NameError(
f'Invalid algorithm name "{algorithm_name}"; valid names are {alg_support}'
)
# list of valid input arguments name
alg_args = []
args_types = inspect.signature(getattr(platypus, algorithm_name))
for param_name in args_types.parameters.keys():
if param_name in ['problem', 'kwargs']:
continue
else:
alg_args.append(param_name)
# add special argument if any
special_args = {
'MOEAD': ['population_size']
}
if algorithm_name in special_args:
alg_args.extend(special_args[algorithm_name])
# check input keyword in algorithm_config
for a_arg in algorithm_config:
if a_arg not in alg_args:
raise NameError(
f'Invalid key "{a_arg}" in algorithm_config; valid names are {alg_args}'
)
# check required key in algorithm_config
for req_key in alg_rk:
if algorithm_name in alg_rk[req_key]:
if req_key not in algorithm_config:
raise KeyError(
f'Missing required key "{req_key}" in algorithm_config'
)
# check required_key value in algorithm_config
for req_key in alg_rk:
if req_key in algorithm_config:
if req_key != 'epsilons':
if not isinstance(algorithm_config[req_key], int):
raise TypeError(
f'Value for "{req_key}" must be an integer, but got type "{type(algorithm_config[req_key]).__name__}"'
)
else:
if not 0 < algorithm_config[req_key] < 1:
raise ValueError(
f'Value for "{req_key}" must be between 0 and 1, but got "{algorithm_config[req_key]}"'
)
# default mixed variator will be added in algorithm_kwargs
if 'variator' not in algorithm_config:
default_variator = platypus.CompoundOperator(
platypus.SSX(),
platypus.Replace(),
platypus.HUX(),
platypus.BitFlip(),
)
algorithm_config['variator'] = default_variator
return algorithm_config
def _validate_objectives(
self,
objectives: list[str],
objs_dirs: dict[str, str]
) -> list[str]:
'''
Validate the list of objectives.
'''
# check validity of objective names
for obj in objectives:
if obj not in objs_dirs:
raise ValueError(
f'Invalid objective "{obj}"; valid names are {list(objs_dirs.keys())}'
)
# check non-empty objective list
if len(objectives) == 0:
raise ValueError(
'"objectives" cannot be an empty list'
)
# check objective names cannot be repeated in the list
if len(objectives) != len(set(objectives)):
raise ValueError(
f'Duplicate names found in objective list: {objectives}'
)
return objectives
def _validate_constraints(
self,
constraints: dict[str, float],
constrs_ops: dict[str, str]
) -> dict[str, float]:
'''
Validate the constratint dictionary.
'''
# check non-empty dictionary
if len(constraints) == 0:
raise ValueError(
'"constraints" cannot be an empty dictionary'
)
# check validity of constraint key and and their value type
for constr, value in constraints.items():
if constr not in constrs_ops:
raise NameError(
f'Invalid constraint "{constr}"; valid names are {list(constrs_ops.keys())}'
)
if not isinstance(value, (int, float)):
raise TypeError(
f'Value of key "{constr}" in "constraints" must be numeric, but got type "{type(value).__name__}"'
)
return constraints
def _scenario_nondominated(
self,
scenario: platypus.Solution,
dam_number: int,
storage_vars: platypus.Integer,
storage_multiplier: float,
objectives: list[str],
constraints: dict[str, float],
stream_file: str,
stodym_config: dict[str, typing.Any],
objs_bounds: dict[str, list[float]],
objs_dirs: dict[str, str],
constrs_ops: dict[str, str]
) -> pandas.DataFrame:
'''
Simulate a non-dominated scenario obtained using :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
'''
# Empty dictionary to store output of non-dominated scenario simulation
scen_out = {}
# scenario dam locations and storage volumes
dam_ids = scenario.variables[0]
dam_storage = [
storage_vars.decode(i) * storage_multiplier for i in scenario.variables[1:]
]
dam_sort = sorted(
zip(dam_ids, dam_storage), key=lambda x: x[0]
)
# store dam locations
for i in range(dam_number):
scen_out[f'd_{i + 1}'] = dam_sort[i][0]
# store dam storage volumes
for i in range(dam_number):
scen_out[f'sv_{i + 1}'] = dam_sort[i][1]
# scenario storage dynamics simulation
scenario_stodym = Network().stodym_plus(
stream_file=stream_file,
storage_dict=dict(dam_sort),
**stodym_config
)
# store dam lifespan
for i in range(dam_number):
scen_out[f'ls_{i + 1}'] = scenario_stodym['dam_lifespan']['life_year'].iloc[i]
# objective lower bounds
objs_lb = [
objs_bounds[obj][0] for obj in objectives
]
# objective upper bounds
objs_ub = [
objs_bounds[obj][1] for obj in objectives
]
# normalized objective values
objs_normalize = [
val if objs_dirs[obj] == 'min' else - val for obj, val in zip(objectives, scenario.objectives)
]
# store objectives
for i, obj in enumerate(objectives):
actual_val = objs_normalize[i] * (objs_ub[i] - objs_lb[i]) + objs_lb[i]
scen_out[f'{obj}({objs_dirs[obj]})'] = actual_val
# store constraints
for constr, val in zip(constraints, scenario.constraints):
scen_out[f'{constr}{constrs_ops[constr]}{constraints[constr]}'] = val
# store normalized objective list
scen_out['obj_normalize'] = objs_normalize
return scen_out
def _cpu_number(
self,
processes: typing.Optional[int]
) -> int:
'''
Determine the number of CPUs to use for computation.
'''
cpu_num = os.cpu_count()
if processes is None:
if cpu_num is None:
raise OSError(
'Provide an integer value of "processes" as "os.cpu_count()" returned None'
)
else:
cpu_num = processes
return cpu_num
[docs]
def compute_curve_pairwise_deviation(
self,
df: pandas.DataFrame
) -> float:
'''
Compute a scalar metric representing the pairwise variability among
annual remaining storage percentage curves of dams.
The method processes the input DataFrame as follows:
- Selects the columns containing remaining storage percentages of dams.
- Replaces NaN and negative values with 0.
- Normalizes percentages to the [0, 1] range by dividing by 100.
- Computes pairwise Euclidean distances between the normalized curves.
- Returns the standard deviation of these pairwise distances.
Parameters
----------
df : DataFrame
DataFrame corresponding to the ``dam_remaining_storage`` key from the output dictionary
generated by :meth:`OptiDamTool.Network.stodym_plus`.
Returns
-------
float
Standard deviation of the normalized pairwise Euclidean distances between dam storage curves.
A smaller value indicates that dams follow more similar patterns in their remaining storage dynamics.
'''
# conside the positive values only
df = df.where(
cond=df > 0,
other=0
)
# transpose DataFrame and divide by 100
t_df = df.iloc[:, 1:].T / 100
t_arr = t_df.values.astype(float)
# pairwise Euclidean distances
pairwise_dist = scipy.spatial.distance.pdist(
X=t_arr,
metric='euclidean'
)
# normalized pairwise distances
norm_dist = pairwise_dist / pow(t_df.shape[1], 0.5)
# standard deviation of Euclidean distances
std_dist = float(norm_dist.std())
return std_dist
[docs]
def compute_curve_mean_deviation(
self,
df: pandas.DataFrame
) -> float:
'''
Compute a scalar metric representing the maximum deviation of annual
remaining storage percentage curves of dams from their mean curve.
The method processes the input DataFrame as follows:
- Selects columns containing the remaining storage percentages of dams.
- Replaces NaN and negative values with 0.
- Normalizes percentages to the [0, 1] range by dividing by 100.
- Computes the mean curve across all dams.
- Calculates Euclidean distances between each normalized curve and the mean curve.
- Returns the maximum of these normalized Euclidean distances.
Parameters
----------
df : DataFrame
DataFrame corresponding to the ``dam_remaining_storage`` key from the output dictionary
generated by :meth:`OptiDamTool.Network.stodym_plus`.
Returns
-------
float
Maximum normalized Euclidean distance between annual remaining storage percentage curves of dams
and their mean curve. A smaller value indicates that dams follow more similar patterns relative
to the mean remaining storage dynamics.
'''
# conside the positive values only
df = df.where(
cond=df > 0,
other=0
)
# transpose DataFrame and divide by 100
t_df = df.iloc[:, 1:].T / 100
t_arr = t_df.values.astype(float)
# mean of rows
mean_row = t_arr.mean(
axis=0
)
# Euclidean distances from mean row
euclidean_dist = numpy.linalg.norm(
x=t_arr - mean_row,
axis=1
)
# normalized Euclidean distances
norm_dist = euclidean_dist / pow(t_df.shape[1], 0.5)
# maximum normalized Euclidean distances
max_dist = float(norm_dist.max())
return max_dist
def _objective_bounds(
self,
dam_number: int,
storage_vars: platypus.Integer,
storage_multiplier: float,
objectives: list[str],
stodym_config: dict[str, typing.Any]
) -> dict[str, list[float]]:
'''
Construct a dictionary where each key is an objective name from
:attr:`OptiDamTool.SystemDesign.mapping_objective_direction`, and each value
is a two-element list representing the lower and upper bounds.
'''
# objectives lower and upper bounds
objs_bounds = {}
for obj in objectives:
if obj == 'lifespan':
lb_val = dam_number * numpy.log(self.lifespan_epsilon)
ub_val = dam_number * numpy.log(stodym_config['year_limit'])
if obj == 'lifespan_std':
lb_val = 0
ub_val = stodym_config['year_limit'] / 2
if obj == 'storage_sum':
lb_val = dam_number * storage_vars.min_value * storage_multiplier
ub_val = dam_number * storage_vars.max_value * storage_multiplier
if obj == 'curve_pairwise_deviation':
lb_val = 0
ub_val = 0.5
if obj == 'curve_mean_deviation':
lb_val = 0
ub_val = 1
if obj in ['sediment_trapped_initial', 'sediment_released_median', 'drainage_area']:
lb_val = 0
ub_val = 100
objs_bounds[obj] = [lb_val, ub_val]
return objs_bounds
def _scenario_sediment_control(
self,
variables: list[list[int] | int],
storage_multiplier: float,
stream_file: str,
stodym_config: dict[str, typing.Any],
objectives: list[str],
objs_bounds: dict[str, list[float]],
objs_dirs: dict[str, str],
constraints: dict[str, float]
) -> tuple[list[float], list[float]]:
'''
Generate objective and constraint values for the scenario produced by
:meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
'''
# selected list of dams
selected_dam = typing.cast(list[int], variables[0])
# selcted sotrage volumns
selected_storage = typing.cast(list[int], variables[1:])
storage_dict = {
k: v * storage_multiplier for k, v in zip(selected_dam, selected_storage)
}
# storage dynamics simulation from selected dam and storage volumes
vars_stodym = Network().stodym_plus(
stream_file=stream_file,
storage_dict=storage_dict.copy(),
**stodym_config
)
# compute objective
sim_objs = typing.cast(list[float], [])
for obj in objectives:
if obj == 'lifespan':
# dam lifespan with small positive value s a lower bound to penalize low lifespans
dam_lifespan = vars_stodym['dam_lifespan']['life_year'].clip(
lower=self.lifespan_epsilon
)
obj_val = sum(numpy.log(dam_lifespan))
if obj == 'lifespan_std':
lifespan_std = vars_stodym['dam_lifespan']['life_year'].std(
ddof=0
)
obj_val = float(lifespan_std)
if obj == 'storage_sum':
obj_val = sum(storage_dict.values())
if obj == 'curve_pairwise_deviation':
obj_val = self.compute_curve_pairwise_deviation(
df=vars_stodym['dam_remaining_storage']
)
if obj == 'curve_mean_deviation':
obj_val = self.compute_curve_mean_deviation(
df=vars_stodym['dam_remaining_storage']
)
if obj == 'sediment_trapped_initial':
sediment_trapped = vars_stodym['system_statistics']['sedtrap_%']
obj_val = float(sediment_trapped.iloc[0])
if obj == 'drainage_area':
drainage_area = vars_stodym['system_statistics']['drainage_%']
obj_val = float(drainage_area.iloc[0])
if obj == 'sediment_released_median':
sediment_released = vars_stodym['system_statistics']['sedrelease_%']
median_year = len(sediment_released) // 2
obj_val = float(sediment_released.iloc[median_year:].mean())
# normalized objective
obj_norm = (obj_val - objs_bounds[obj][0]) / (objs_bounds[obj][1] - objs_bounds[obj][0])
# maximum to minimum conversion if any
obj_min = obj_norm if objs_dirs[obj] == 'min' else - obj_norm
# sim_objs.append(obj_norm)
sim_objs.append(obj_min)
# compute constraint values
sim_constrs = typing.cast(list[float], [])
for constr in constraints:
if constr == 'lb_lifespan':
constr_val = min(vars_stodym['dam_lifespan']['life_year'])
if constr == 'ub_storage_sum':
constr_val = sum(storage_dict.values())
sim_constrs.append(constr_val)
# scenario output
scenario_output = (sim_objs, sim_constrs)
return scenario_output
@property
def mapping_constraint_operator(
self
) -> dict[str, str]:
'''
Provide a dictionary mapping optimization constraints
to their corresponding operators. Each constraint is listed with its
operator and an explanatory remark.
- ``lb_lifespan``
- **Operator**: ``>=``
- **Remark**: Lower bound on dam lifespan, ensuring that each dam remains operates
for at least the minimum required number of years.
- ``ub_storage_sum``
- **Operator**: ``<=``
- **Remark**: Upper bound on the total storage volume across all dams, used to prevent
unrealistic designs with excessively large reservoirs or prohibitively high deployment costs.
'''
constrs_ops = {
'lb_lifespan': '>=',
'ub_storage_sum': '<='
}
return constrs_ops
@property
def mapping_objective_direction(
self
) -> dict[str, str]:
'''
Provide a dictionary mapping optimization objectives to their respective directions.
Each objective is listed with its direction and an explanatory remark justifying the choice.
- ``lifespan``
- **Direction**: Maximize
- **Remark**: Extend the functional lifespan of dams. The objective is computed as the summation
of logarithms of lifespans, with :math:`10^{-6}` as a lower bound to avoid undefined behavior
from computing :math:`\\log(0)`. This ensures that dams with zero lifespans are penalized appropriately.
- ``lifespan_std``
- **Direction**: Minimize
- **Remark**: Reduce variability in dam lifespans so benefits are distributed more evenly across all dams.
The objective is computed as the standard deviation of lifespans.
- ``sediment_trapped_initial``
- **Direction**: Maximize
- **Remark**: Retain more sediment within the watershed during the initial year, when trapping is most effective.
This supports dam performance early in the system’s lifespan, before sedimentation reduces trapping capacity.
- ``sediment_released_median``
- **Direction**: Minimize
- **Remark**: Reduce the average sediment release from the median lifetime of the dam system onward to capture long-term effectiveness.
For example, if the system has a 10–11 year lifespan, the average sediment release from year 6 to the end of operation is used as the metric.
- ``storage_sum``
- **Direction**: Minimize
- **Remark**: Promote cost-efficient deployment by reducing the total storage volume of the dam system.
- ``curve_pairwise_deviation``
- **Direction**: Minimize
- **Remark**: Limit variability among the annual remaining storage curves of individual dams, ensuring consistency
in how dams respond to sediment inflow over time. Computed via :meth:`OptiDamTool.SystemDesign.compute_curve_pairwise_deviation`.
- ``curve_mean_deviation``
- **Direction**: Minimize
- **Remark**: Reduce deviation of each dam’s annual remaining storage curve from the overall mean curve, promoting uniform
storage behavior across the dam system. Computed via :meth:`OptiDamTool.SystemDesign.compute_curve_mean_deviation`.
'''
objs_dirs = {
'lifespan': 'max',
'lifespan_std': 'min',
'sediment_trapped_initial': 'max',
'sediment_released_median': 'min',
'drainage_area': 'max',
'storage_sum': 'min',
'curve_pairwise_deviation': 'min',
'curve_mean_deviation': 'min'
}
return objs_dirs
[docs]
def sediment_control_by_fixed_dams(
self,
dam_number: int,
storage_bounds: tuple[int, int],
storage_multiplier: int | float,
stream_file: str,
stodym_config: dict[str, typing.Any],
objectives: list[str],
algorithm_name: str,
algorithm_config: dict[str, typing.Any],
seeds: int,
nfe: int,
folder_path: str,
constraints: dict[str, float] = {'lb_lifespan': 0},
processes: typing.Optional[int] = None
) -> pandas.DataFrame:
'''
Optimize dam locations and storage volumes using a multi-objective evolutionary algorithm,
based on annual sediment inflow to watershed drainage pathways.
This function uses built-in `evolutionary algorithms <https://platypus.readthedocs.io/en/latest/api/platypus.algorithms.html>`_
from the ``platypus-opt`` Python package. Users can perform multiple experiments for
the same problem, with each run starting from a different initial population.
The function returns a dictionary with two keys, where each value is a DataFrame.
The DataFrames are also saved to the input directory as JSON files, with filenames
corresponding to the dictionary keys.
- ``solutions_nondominated``
DataFrame of non-dominated solutions extracted from merged feasible solutions across
multiple experiments. Columns include:
- ``count``
Sequential index (starting from 0) for solution numbering.
- ``d_<i>``
Stream identifiers for dams (``<i>`` ranges from 1 to ``dam_number``).
- ``sv_<i>``
Initial storage volume of dams in cubic meters (``<i>`` ranges from 1 to ``dam_number``).
- ``ls_<i>``
Lifespan of dams in years (``<i>`` ranges from 1 to ``dam_number``).
- ``<obj>(<dir>)``
Objective values, where ``<obj>`` is the objective name from the list of input ``objectives`` and
``<dir>`` is its direction (``min`` or ``max``).
- ``<constr><op><val>``
Constraint values, where ``<constr>`` is the constraint name from the dictionary of input ``constraints``,
``<op>`` is the operator, and ``<val>`` is the bound.
- ``obj_normalize``
List of normalized objective values, computed as ``(obj_val - obj_lb) / (obj_ub - obj_lb)``,
where ``obj_val`` is the objective value, and ``obj_lb`` and ``obj_ub`` are the lower and upper bounds of the objectives.
- ``metric_euclidean(<solution_ideal>)``
Euclidean distance of normalized solutions to the ideal solution list ``<solution_ideal>``.
- ``computation_statistics``
DataFrame with two columns, ``feature`` and ``value``. The ``feature`` column includes:
- ``total_execution_time (s)``: Total execution time in seconds.
- ``total_execution_timedelta``: Total execution time in `datetime.timedelta <https://docs.python.org/3/library/datetime.html#timedelta-objects>`_
string format (HH:MM:SS).
- ``CPU_number``: Number of CPU cores used for computation (either `processes` or `os.cpu_count()`).
- ``experiment_number``: Number of experiments specified by the user.
- ``batch_run_number``: Number of batch runs across CPUs, defined as the ceiling division of the experiment number by the CPU count.
- ``experiment_time (s)``: Total experiment time in seconds.
- ``experiment_timedelta``: Total experiment time in ``datetime.timedelta`` string format (HH:MM:SS).
- ``average_experiment_time (s)``: Average execution time per experiment in seconds.
- ``average_experiment_timedelta``: Average execution time per experiment in ``datetime.timedelta`` string format (HH:MM:SS).
- ``total_function_evaluations``: Total function evaluations, calculated as the product of the number of experiments and the function evaluations per experiment,
both specified by the user.
- ``total_nondominated_solutions``: Number of solutions in the ``solutions_nondominated`` DataFrame.
Parameters
----------
dam_number : int
Number of dams to deploy in the watershed area.
storage_bounds : tuple[int, int]
Tuple of two integers specifying the lower and upper bounds of storage volumes.
Both values must be greater than 0.
storage_multiplier : float
Multiplier applied to the values within the storage bounds to obtain the actual storage volumes in cubic meters.
For example, if ``storage_bounds = (1, 5)`` and ``storage_multiplier = 1000``,
the resulting storage volumes for the dams will be [1000, 2000, 3000, 4000, 5000].
stream_file : str
Path to the input stream GeoJSON file, generated by
:meth:`OptiDamTool.Analysis.sediment_delivery_to_stream_geojson`.
stodym_config : dict
Dictionary specifying input variable configurations for :meth:`OptiDamTool.Network.stodym_plus`.
Each key corresponds to the name of an input variable, and the value is the parameter supplied to the method.
- **Required keys**: ``sediment_density`` and ``year_limit``.
- **Optional keys**: ``trap_equation``, ``trap_threshold``, ``brown_d``, ``trap_constant``, and ``release_threshold``.
Example
-------
.. code-block:: python
# for dynamic sediment trapping efficiency of dams
stodym_config = {
'sediment_density': 1300,
'year_limit': 100,
'trap_threshold': 0.05,
'brown_d': 1,
'release_threshold': 0.9
}
# for constant sediment trapping efficiency of dams
stodym_config = {
'sediment_density': 1300,
'year_limit': 100,
'trap_equation': False,
'trap_constant': 0.8,
'release_threshold': 0.9
}
objectives : list
List of valid objective names, which can be obtained from the keys of the output dictionary
defined in the attribute :attr:`OptiDamTool.SystemDesign.mapping_objective_direction`.
algorithm_name : str
Name of the evolutionary algorithm to use. Supported options:
- ``PESA2``: Pareto Envelope-based Selection Algorithm that divides the objective space into regions and
maintains diversity by regulating density across those regions.
- ``SPEA2``: Strength Pareto Evolutionary Algorithm that assigns fitness based on
dominance strength and incorporates density estimation for better selection pressure.
- ``IBEA``: Indicator-Based Evolutionary Algorithm that uses a ``Hypervolume`` indicator
to guide the search toward well-distributed Pareto-optimal solutions.
- ``MOEAD``: Multi-Objective Evolutionary Algorithm with Decomposition, which transforms a multi-objective
problem into multiple scalar subproblems and solves them cooperatively.
- ``NSGAII``: Classic Non-dominated Sorting Genetic Algorithm (NSGA-II) that ensures convergence toward the Pareto
front while maintaining diversity using crowding distance.
- ``NSGAIII``: Extension of NSGA-II designed for many-objective problems, using a set of reference directions
to maintain diversity across high-dimensional objective spaces.
- ``EpsMOEA``: ε-dominance Multi-Objective Evolutionary Algorithm that employs an ε-box archive to
control archive size and promote solution diversity.
- ``EpsNSGAII``: Variant of NSGA-II that integrates ε-dominance into selection to maintain a bounded,
diverse set of non-dominated solutions.
.. note::
- ``MOEAD`` and ``NSGAIII`` are particularly suited for many-objective optimization problems
(more than three objectives). For problems with fewer objectives, ``EpsNSGAII`` is widely adopted.
- The mapping of required keys to algorithms is as follows:
.. code-block:: python
{
'population_size': ['EpsNSGAII', 'EpsMOEA', 'IBEA', 'MOEAD', 'NSGAII', 'PESA2', 'SPEA2'],
'epsilons': ['EpsMOEA', 'EpsNSGAII'],
'divisions_outer': ['NSGAIII']
}
- The required key ``epsilons`` must be set between 0 and 1, consistent with the assumption
that all objective values are normalized to the [0, 1] range.
- For ``NSGAIII``, the required key ``divisions_outer`` determines the number of reference
directions and implicitly controls the population size. The population size is calculated
as the combination ``C(O + D - 1, D)``, where ``O`` is the number of objectives and ``D`` is ``divisions_outer``.
algorithm_config : dict
Dictionary of algorithm parameters. Must include ``population_size`` along with
other valid keyword arguments supported by the chosen algorithm.
Example
-------
.. code-block:: python
algorithm_config={
'population_size': 10
}
seeds : int
Number of independent experiments for the same problem,
with each run starting from a different initial population.
nfe : int
Maximum number of function evaluations per experiment. This parameter controls how many generations
the algorithm will execute. For example, if ``nfe`` is set to 1000 and the population size is 100,
the algorithm will run for at least ``1000 // 100 = 10`` generations. For best results,
``nfe`` should be a positive integer multiple of the population size,ensuring a more consistent
and effective optimization process.
folder_path : str
Path to the folder where the JSON files will be saved.
constraints : dict, optional
Dictionary containing valid constraint names as keys and their corresponding values.
The valid names can be obtained from the keys of the output dictionary defined in the
attribute :attr:`OptiDamTool.SystemDesign.mapping_constraint_operator`. The default value is
``{'lb_lifespan': 0}``, which is used to run the algorithm but has no effect on the optimization.
Example
-------
.. code-block:: python
constraints={
'lb_lifespan': 1
}
processes : int, optional
Number of logical CPUs to use for running ``seeds`` in parallel. Defaults to all available CPUs.
Returns
-------
dict
Dictionary with two keys:
- ``solutions_nondominated``: DataFrame of non-dominated solutions.
- ``computation_statistics``: DataFrame of computation statistics.
'''
# time start form entire execution
excecution_start = time.time()
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.sediment_control_by_fixed_dams
),
vars_values=locals()
)
# check the vailidity of folder_path
utility._validate_folder_path(
folder_path=folder_path
)
# check the folder_path is empty
utility._validate_empty_folder(
folder_path=folder_path
)
# check validity of variable configuration for optimization
stream_ids = self._validate_preliminary_config(
stream_file=stream_file,
dam_number=dam_number,
storage_bounds=storage_bounds,
storage_multiplier=storage_multiplier,
seeds=seeds
)
# check keyword validity of stodym_plus input arguments
self._validate_stodym_kwargs(
stodym_config=stodym_config
)
# check genetic algorithm name and keywords
algorithm_kwargs = self._validate_algorithm_config(
algorithm_name=algorithm_name,
algorithm_config=algorithm_config
)
# mapping between objectives and their directions
objs_dirs = self.mapping_objective_direction
# mapping between constraints and their operators
constrs_ops = self.mapping_constraint_operator
# check validity of objectives
self._validate_objectives(
objectives=objectives,
objs_dirs=objs_dirs
)
# check validity of constraints
self._validate_constraints(
constraints=constraints,
constrs_ops=constrs_ops
)
# location variables of dams
location_vars = platypus.Subset(
elements=stream_ids,
size=dam_number
)
# storage variables of dams
storage_vars = platypus.Integer(
min_value=storage_bounds[0],
max_value=storage_bounds[1]
)
# objective bounds
objs_bounds = self._objective_bounds(
dam_number=dam_number,
storage_vars=storage_vars,
storage_multiplier=storage_multiplier,
objectives=objectives,
stodym_config=stodym_config
)
# problem definition
problem = platypus.Problem(
nvars=dam_number + 1,
nobjs=len(objectives),
nconstrs=len(constraints)
)
# problem variable types
problem.types[0] = location_vars
problem.types[1:] = storage_vars
# problem objective directions
problem.directions[:] = platypus.Problem.MINIMIZE
# problem constraints
for idx, constr in enumerate(constraints):
problem.constraints[idx] = platypus.Constraint(
op=constrs_ops[constr],
value=constraints[constr]
)
# problem function
problem.function = functools.partial(
self._scenario_sediment_control,
storage_multiplier=storage_multiplier,
stream_file=stream_file,
stodym_config=stodym_config,
objectives=objectives,
objs_bounds=objs_bounds,
objs_dirs=objs_dirs,
constraints=constraints
)
# number of batch run from seeds and CPUs
cpu_num = self._cpu_number(
processes=processes
)
batch_number = seeds // cpu_num if seeds % cpu_num == 0 else seeds // cpu_num + 1
# time start for simulating the problem by multiple experiment
experiment_start = time.time()
# run algorithm with parallel computing support
with platypus.ProcessPoolEvaluator(processes=cpu_num) as evaluator:
simulation = platypus.experiment(
algorithms=[
(getattr(platypus, algorithm_name), algorithm_kwargs)
],
problems=[problem],
seeds=seeds,
nfe=nfe,
evaluator=evaluator,
display_stats=True
)
# time for experiment
time_experiment = round(time.time() - experiment_start)
# average time to run per seed
time_seed = round(time_experiment / batch_number)
# merge fesible scenario from seeds
scenario_seed = []
for s_key in simulation.keys():
for p_key in simulation[s_key].keys():
for seed_solution in simulation[s_key][p_key]:
seed_feasible = [
fs for fs in seed_solution if fs.feasible
]
scenario_seed.extend(seed_feasible)
# non-dominated scenarios
scenario_nd = platypus.nondominated(
solutions=scenario_seed
)
# simulation of individual non-ominated scenario in separate CPU
cpu_sim = functools.partial(
self._scenario_nondominated,
dam_number=dam_number,
storage_vars=storage_vars,
storage_multiplier=storage_multiplier,
objectives=objectives,
constraints=constraints,
stream_file=stream_file,
stodym_config=stodym_config,
objs_bounds=objs_bounds,
objs_dirs=objs_dirs,
constrs_ops=constrs_ops
)
# save CPU simulation output in a list
cpu_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_num) as executor:
# Multicore simulation
futures = [
executor.submit(cpu_sim, sol) for sol in scenario_nd
]
for future in concurrent.futures.as_completed(futures):
cpu_list.append(future.result())
# DataFrame from non-dominated solutions
solution_df = pandas.DataFrame.from_records(
data=cpu_list
)
df_columns = list(solution_df.columns)
# remove duplicate rows from DataFrame
solution_df = solution_df.drop_duplicates(
subset=df_columns[-1],
ignore_index=True
)
# Ideal utopian objective array
objs_utopian = []
for obj in objectives:
utopian_val = 0 if objs_dirs[obj] == 'min' else 1
objs_utopian.append(utopian_val)
utopian_array = numpy.array([objs_utopian])
# array from normalized solutions
normalized_array = numpy.array(
solution_df[df_columns[-1]].values.tolist(),
dtype=float
)
# Eucliean distance from normalized solutions to utopian point
solution_df[f'metric_euclidean({objs_utopian})'] = numpy.linalg.norm(
x=normalized_array - numpy.array(objs_utopian),
axis=1
)
# insert 'count' column to the DataFrame
solution_df = solution_df.reset_index(
names=['count']
)
# save the DataFrame of non-dominated solutions
solution_df.to_json(
path_or_buf=os.path.join(folder_path, 'solutions_nondominated.json'),
orient='records',
indent=4
)
# total time for execution
time_excecution = round(time.time() - excecution_start)
# statistic of computation
computation_stats = {
'total_execution_time (s)': time_excecution,
'total_execution_timedelta': f'{datetime.timedelta(seconds=time_excecution)}',
'CPU_number': cpu_num,
'experiment_number': seeds,
'batch_run_number': batch_number,
'experiment_time (s)': time_experiment,
'experiment_timedelta': f'{datetime.timedelta(seconds=time_experiment)}',
'average_experiment_time (s)': time_seed,
'average_experiment_timedelta': f'{datetime.timedelta(seconds=time_seed)}',
'algorithm_name': algorithm_name,
'total_function_evaluations': seeds * nfe,
'total_nondominated_solutions': len(solution_df)
}
# DataFrame of computation statistics
computation_df = pandas.DataFrame()
for k, v in computation_stats.items():
computation_df.loc['value', k] = v
# save computation statistics DataFrame
computation_df.to_json(
path_or_buf=os.path.join(folder_path, 'computation_statistics.json'),
orient='records',
indent=4
)
# output dictionary
output = {
'solutions_nondominated': solution_df,
'computation_statistics': computation_df.T.reset_index(
names=['feature']
)
}
return output
def _storage_dict_from_solution_file(
self,
solution_file: str,
count: int
) -> dict[int, float]:
'''
Get storage dictionary for a specific solution scenario obtained via
:meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
'''
# DataFrame of non-dominated solutions
df = pandas.read_json(
path_or_buf=solution_file,
orient='records'
)
# scenario from count
scenario = df.iloc[count]
# number of dams
dam_number = len(
[i for i in scenario.index if i.startswith('d_')]
)
# storage dictionary
storage_dict = {}
for i in range(1, dam_number + 1):
storage_dict[int(scenario[f'd_{i}'])] = float(scenario[f'sv_{i}'])
return storage_dict
[docs]
def scenario_stodym_plus(
self,
solution_file: str,
count: int,
stream_file: str,
stodym_config: dict[str, typing.Any]
) -> dict[str, pandas.DataFrame]:
'''
Retrieve detailed simulation outputs from :meth:`OptiDamTool.Network.stodym_plus`
for a specific solution scenario obtained via :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
Parameters
----------
solution_file : str
Path to the ``solutions_nondominated.json`` file generated by :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`,
containing non-dominated solution scenarios.
count : int
Integer value of ```count`` key of the selected scenario in ``solutions_nondominated.json`` for which detailed simulation
results are to be retrieved.
stream_file : str
Path to the input stream GeoJSON file passed to :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
stodym_config : dict
Same dictionary passed to :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
To store the results, include an additional key ``folder_path`` specifying the output directory where JSON files will be saved.
.. warning::
If the ``stream_file`` or the ``stodym_config`` dictionary does not match the input used in
:meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`, the retrieved
output parameter values will not correspond to those in ``solutions_nondominated.json``.
Returns
-------
dict
Dictionary produced by :meth:`OptiDamTool.Network.stodym_plus`,
containing detailed simulation results for the selected solution scenario.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.scenario_stodym_plus
),
vars_values=locals()
)
# storage dictionary
storage_dict = self._storage_dict_from_solution_file(
solution_file=solution_file,
count=count
)
# scenario storage dynamics simulation
scenario_stodym = Network().stodym_plus(
stream_file=stream_file,
storage_dict=storage_dict,
**stodym_config
)
return scenario_stodym
[docs]
def scenario_stodym_plus_drainage_response(
self,
solution_file: str,
count: int,
stream_file: str,
flwdir_file: str,
folder_path: str,
stodym_config: dict[str, typing.Any]
) -> pandas.DataFrame:
'''
Retrieve detailed simulation outputs from :meth:`OptiDamTool.Network.stodym_plus_drainage_response`
for a specific solution scenario obtained via :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
Parameters
----------
solution_file : str
Path to the ``solutions_nondominated.json`` file generated by :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`,
containing non-dominated solution scenarios.
count : int
Integer value of ```count`` key of the selected scenario in ``solutions_nondominated.json`` for which detailed simulation
results are to be retrieved.
stream_file : str
Path to the input stream GeoJSON file passed to :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`.
flwdir_file : str
Path to the input flow direction raster file ``flowdir.tif``, generated by :meth:`OptiDamTool.WatemSedem.dem_to_stream`
during the creation of ``stream_file``.
folder_path : str
Path to the folder where JSON and GeoJSON files will be saved.
stodym_config : dict
Dictionary of input configuration parameters passed to :meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`
to obtain the solution scenario.
.. warning::
If the ``stream_file``, ``flwdir_file``, or the ``stodym_config`` dictionary does not match the input used in
:meth:`OptiDamTool.SystemDesign.sediment_control_by_fixed_dams`, the retrieved
output parameter values will not correspond to those in ``solutions_nondominated.json``.
Returns
-------
DataFrame
Dataframe produced by :meth:`OptiDamTool.Network.stodym_plus_drainage_response`,
containing detailed information on the generated GeoJSON files.
'''
# check static type of input variable origin
utility._validate_variable_origin_static_type(
vars_types=typing.get_type_hints(
obj=self.scenario_stodym_plus_drainage_response
),
vars_values=locals()
)
# storage dictionary
storage_dict = self._storage_dict_from_solution_file(
solution_file=solution_file,
count=count
)
# scenario storage dynamics simulation
scenario_stodym = Network().stodym_plus_drainage_response(
stream_file=stream_file,
flwdir_file=flwdir_file,
storage_dict=storage_dict,
folder_path=folder_path,
**stodym_config
)
return scenario_stodym