import warnings
import numpy as np
from scipy.optimize import linprog
from relsad.network.systems import PowerSystem, Transmission
from relsad.Time import Time
from relsad.utils import INF
np.set_printoptions(suppress=True, linewidth=np.nan)
warnings.filterwarnings("ignore")
# flake8: noqa: C901
[docs]def shed_energy(
power_system: PowerSystem,
dt: Time,
alpha: float = 1e-4,
):
"""
Sheds the unsupplied loads of the power system over the
time period, dt, using a linear minimization
problem solved with linear programming
See :doc:`/theory/opt` for more details.
Parameters
----------
power_system : PowerSystem
The power system that is under consideration
dt : Time
The current time step
alpha : float
Slack variable to cope with numerical noise
Problem formulation
--------------------
minimize sum(P_shed_n)
such that
sum(P_shed_n) - sum(P_line_nl) + sum(P_gen_n) = sum(P_load_n) for all buses n
0 <= P_shed_n <= P_load_n
-P_line_nl_max <= P_line_nl <= P_line_nl_max
P_gen_n_min <= P_gen_n <= P_gen_n_max
The problem is solved with the following modification to prevent numerical difficulties
such that
sum(P_shed_n) - sum(P_line_nl) + sum(P_gen_n) + alpha = sum(P_load_n) for all buses n
0 <= P_shed_n <= P_load_n
-P_line_nl_max <= P_line_nl <= P_line_nl_max
P_gen_n_min <= P_gen_n <= P_gen_n_max
alpha_min <= alpha <= alpha_max
Returns
-------
None
"""
buses = list(power_system.buses)
lines = [x for x in power_system.lines if x.connected]
N_D = len(buses)
N_L = len(lines)
# Define cost function
c = [x.get_cost() for x in buses] + [0] * N_L + [0] * N_D + [0]
# Get loads
p_b = [max(0, bus.pload) for bus in buses] # Active bus load
q_b = [max(0, bus.qload) for bus in buses] # Reactive bus load
# Building lhs contraint matrix
A = _build_A_matrix(power_system)
# Gather bounds
p_bounds, q_bounds = _gather_bounds(power_system, alpha)
if sum(p_b) > alpha:
# Shed active loads
shedded_active_bus_loads = _shed_active_loads(
c=c,
A=A,
p_b=p_b,
p_bounds=p_bounds,
buses=buses,
lines=lines,
alpha=alpha,
)
if shedded_active_bus_loads is not None:
# Shed active energy
_shed_active_energy(
buses=buses,
shedded_active_bus_loads=shedded_active_bus_loads,
alpha=alpha,
dt=dt,
)
if sum(q_b) > alpha:
# Shed reactive loads
shedded_reactive_bus_loads = _shed_reactive_loads(
c=c,
A=A,
q_b=q_b,
q_bounds=q_bounds,
buses=buses,
lines=lines,
alpha=alpha,
)
if shedded_reactive_bus_loads is not None:
# Shed reactive energy
_shed_reactive_energy(
buses=buses,
shedded_reactive_bus_loads=shedded_reactive_bus_loads,
alpha=alpha,
dt=dt,
)
def _build_A_matrix(power_system: PowerSystem):
"""
Builds the LHS constraint matrix
Parameters
----------
power_system : PowerSystem
The power system that is under consideration
Returns
-------
A : array
LHS Constraint matrix
"""
buses = list(power_system.buses)
lines = [x for x in power_system.lines if x.connected]
N_D = len(buses)
N_L = len(lines)
A = np.zeros((N_D, N_D + N_L + N_D + 1))
for j, bus in enumerate(buses):
# Load shed coefficients
A[j, j] = 1 # mu_md
# Line load coefficients
for line in bus.connected_lines:
if line.connected:
index = lines.index(line)
if bus == line.fbus:
A[j, N_D + index] = -1
else:
A[j, N_D + index] = 1
# Generation coefficients
A[j, N_D + N_L + j] = 1 # lambda_md
# Slack parameter
A[j, -1] = 1
return A
def _get_generation_bounds(power_system: PowerSystem):
"""
Returns the generation units bounds
Parameters
----------
power_system : PowerSystem
The power system that is under consideration
Returns
-------
p_gen : list
List of the active power bounds for the generation units
q_gen : list
List of the reactive power bounds for the generation units
"""
p_gen = list() # Active bus generation
q_gen = list() # Reactive bus generation
for j, bus in enumerate(power_system.buses):
# Generation bounds
flag = False
for child_network in power_system.child_network_list:
if isinstance(child_network, Transmission):
if bus == child_network.get_trafo_bus():
p_gen.append(INF)
q_gen.append(INF)
flag = True
if flag is False:
p_gen.append(max(0, bus.pprod))
q_gen.append(max(0, bus.qprod))
return p_gen, q_gen
def _gather_bounds(
power_system: PowerSystem,
alpha: float,
):
"""
Returns the flow bounds between the components in the power system.
Parameters
----------
power_system : PowerSystem
The power system that is under consideration
alpha : float
Slack variable to cope with numerical noise
Returns
-------
p_bounds : list
The active power bounds for a component
q_bounds : list
The reactive power bounds for a component
"""
# Gather bounds
buses = list(power_system.buses)
lines = [x for x in power_system.lines if x.connected]
N_D = len(buses)
N_L = len(lines)
# Get loads
p_b = [max(0, bus.pload) for bus in buses] # Active bus load
q_b = [max(0, bus.qload) for bus in buses] # Reactive bus load
p_bounds = list()
q_bounds = list()
# Get generation bounds
p_gen, q_gen = _get_generation_bounds(power_system)
for n in range(N_D + N_L + N_D):
if n < N_D:
# Bus load
p_bounds.append((0, p_b[n]))
q_bounds.append((0, q_b[n]))
elif N_D <= n < N_D + N_L:
# Line flow
line = lines[n - N_D]
# Line flow in MW
max_available_p_flow = line.get_line_load()[0] * line.s_ref
max_available_q_flow = line.get_line_load()[1] * line.s_ref
PL_p_max = min(line.capacity, abs(max_available_p_flow))
PL_q_max = min(line.capacity, abs(max_available_q_flow))
p_bounds.append((-PL_p_max, PL_p_max))
q_bounds.append((-PL_q_max, PL_q_max))
else:
# Bus generation
p_bounds.append((0, p_gen[n - (N_D + N_L)]))
q_bounds.append((0, q_gen[n - (N_D + N_L)]))
# alpha bounds
p_bounds.append((-alpha, alpha))
q_bounds.append((-alpha, alpha))
return p_bounds, q_bounds
def _shed_active_loads(
c: list,
A: list,
p_b: list,
p_bounds: list,
buses: list,
lines: list,
alpha: float,
):
"""
Sheds active power load
Parameters
----------
c : list
Coefficients of the objective function
A : list
LHS constraint matrix
p_b : list
RHS constraint vector
p_bounds : list
Variable boundaries
buses : list
List containing buses
lines : list
List containing lines
alpha : float
Slack variable to cope with numerical noise
dt : Time
The current time step
Returns
-------
active_shedded_bus_loads: list
The active shedded bus loads of the current
time increment
"""
p_res = linprog(
c,
A_eq=A,
b_eq=p_b,
bounds=p_bounds,
)
if not p_res.success:
print("Active energy.shed was not successful, verify the results:")
print("buses: ", buses)
print("lines: ", lines)
print("A: ", A)
print("cost: ", c)
print("p_b: ", p_b)
print("p_bounds: ", p_bounds)
p_res = linprog(
c,
A_eq=A,
b_eq=p_b,
bounds=p_bounds,
options={
"disp": True,
},
)
print(p_res)
if p_res.fun > 0:
shedded_active_bus_loads = p_res.x
else:
shedded_active_bus_loads = None
return shedded_active_bus_loads
def _shed_active_energy(
buses: list,
shedded_active_bus_loads: list,
alpha: float,
dt: Time,
):
"""
Sheds active energy
Parameters
----------
buses : list
List containing buses
shedded_active_bus_loads : list
The shedded active bus loads of the current
time increment
alpha : float
Slack variable to cope with numerical noise
dt : Time
The current time step
Returns
-------
None
"""
for i, bus in enumerate(buses):
bus.add_to_energy_shed_stack(
shedded_active_bus_loads[i]
if shedded_active_bus_loads[i] > alpha
else 0,
0,
dt,
)
def _shed_reactive_loads(
c: list,
A: list,
q_b: list,
q_bounds: list,
buses: list,
lines: list,
alpha: float,
):
"""
Sheds reactive power load
Parameters
----------
c : list
Coefficients of the objective function
A : list
LHS constraint matrix
q_b : list
RHS constraint vector
q_bounds : list
Variable boundaries
buses : list
List containing buses
lines : list
List containing lines
alpha : float
Slack variable to cope with numerical noise
dt : Time
The current time step
Returns
-------
shedded_reactive_bus_loads : list
The shedded reactive bus loads of the current
time increment
"""
q_res = linprog(
c,
A_eq=A,
b_eq=q_b,
bounds=q_bounds,
)
if not q_res.success:
print("Reactive energy.shed was not successful, verify the results:")
print("buses: ", buses)
print("lines: ", lines)
print("A: ", A)
print("cost: ", c)
print("q_b: ", q_b)
print("q_bounds: ", q_bounds)
q_res = linprog(
c,
A_eq=A,
b_eq=q_b,
bounds=q_bounds,
options={
"disp": True,
},
)
print(q_res)
if q_res.fun > 0:
shedded_reactive_bus_loads = q_res.x
else:
shedded_reactive_bus_loads = None
return shedded_reactive_bus_loads
def _shed_reactive_energy(
buses: list,
shedded_reactive_bus_loads: list,
alpha: float,
dt: Time,
):
"""
Sheds reactive energy
Parameters
----------
buses : list
List containing buses
shedded_reactive_bus_loads : list
The shedded reactive bus loads of the current
time increment
alpha : float
Slack variable to cope with numerical noise
dt : Time
The current time step
Returns
-------
None
"""
for i, bus in enumerate(buses):
bus.add_to_energy_shed_stack(
0,
shedded_reactive_bus_loads[i]
if shedded_reactive_bus_loads[i] > alpha
else 0,
dt,
)