Source code for relsad.network.components.MicrogridController

from enum import Enum

import matplotlib.lines as mlines
import numpy as np

from relsad.network.containers import SectionState
from relsad.Time import Time, TimeUnit
from relsad.topology.ICT.dfs import is_connected
from relsad.utils import convert_yearly_fail_rate, random_choice, unique

from .Component import Component
from .Controller import Controller, ControllerState
from .ICTNode import ICTNode


[docs]class MicrogridMode(Enum): """ Microgrid mode Attributes ---------- SURVIVAL : int The microgrid is in survival mode, it disconnects from the parent power network for the entire failure duration FULL_SUPPORT : int The microgrid is in full support mode, it disconnects from the parent power network only during sectioning time. When connected to the parent power network, the microgrid provides all the support it has available. LIMITED_SUPPORT : int The microgrid is in limited support mode, it disconnects from the parent power network only during sectioning time. When connected to the parent power network, the microgrid provides only surplus support. """ SURVIVAL = 1 FULL_SUPPORT = 2 LIMITED_SUPPORT = 3
[docs]class MicrogridController(Component, Controller): """ Common class for microgrid controller ... Attributes ---------- name : string Name of the microgird controller ict_node : ICTNode The ICT node connected to the controller fail_rate : float The failure rate of the microgrid controller state : ControllerState The state of the microgrid controller sectioning_time : Time The sectioning time in the microgrid parent_sectioning_time : Time The sectioning time of the parent controller check_components : bool manual_sectioning_time : Time The sectioning time of a passive microgrid without controller (or controller is failed) power_network : PowerNetwork The power network the controller belongs to ict_network : ICTNetwork The ICT network the controller belongs to sensors : list List of sensors associated with the microgrid failed_sections : list List of failed sections in the microgrid history : dict Dictonary attribute that stores the historic variables monte_carlo_history : dict Methods ---------- check_circuitbreaker(curr_time, dt) Checks if the circuitbreaker in the distribution system is open. If sectioning time is finished, disconnect failed sections and close the circuitbreaker. check_circuitbreaker_manually(curr_time, dt) Checks if the circuitbreaker in the distribution system is open manually. If sectioning time is finished, disconnect failed sections and close the circuitbreaker. disconnect_failed_sections() Disconnects the failed sections in the microgrid check_sensors(curr_time, dt) Loops through the sections connected to the controller determining which sensors have failed. Performs actions according to the sensor status in the respective section. If a section was disconnected and no longer includes any failed sensor, it is connected. If a section was connected and now includes a failed sensor, it is disconnected. The total sectioning time is summed from each section. run_control_loop(curr_time, dt) System control check, determines if components have failed and performes the required action check_lines_manually(curr_time) Loops manually through the sections connected to the controller determining which lines have failed. Performs actions according to the line status in the respective section. If a section was disconnected and no longer includes any failed lines, it is connected. If a section was connected and now includes a failed line, it is disconnected. The total sectioning time is summed from each section. run_manual_control_loop(curr_time, dt) Manual system control check, determines if components have failed and performes the required action. set_sectioning_time(sectioning_time) Sets the sectiom time of the microgrid based on the max value of the microgrid sectioning time and the sectioning time set by the controller set_parent_sectioning_time(sectioning_time) Sets the sectioning time of the parent power network update_fail_status(dt) Updates the fail status update_history(prev_time, curr_time, save_flag) Updates the history variables get_history(attribute) Returns the history variables of an attribute add_random_instance(random_gen) Adds global random seed print_status() Prints the status reset_status(save_flag) Resets and sets the status of the class parameters initialize_history() Initializes the history variables """ def __init__( self, name: str, power_network, fail_rate: float = 0, state: ControllerState = ControllerState.OK, ): self.name = name self.ict_node = None self.fail_rate = fail_rate self.state = state self.sectioning_time = Time(0) self.parent_sectioning_time = Time(0) self.check_components = False self.manual_sectioning_time = None self.power_network = power_network self.ict_network = None self.sensors = [] self.failed_sections = [] ## History self.history = {} self.monte_carlo_history = {} def __str__(self): return self.name def __repr__(self): return f"MicrogridController(name={self.name})" def __eq__(self, other): if hasattr(other, "name"): return self.name == other.name and isinstance( other, MicrogridController ) else: return False def __hash__(self): return hash(self.name)
[docs] def check_circuitbreaker(self, curr_time: Time, dt: Time): """ Checks if the circuitbreaker in the distribution system is open. If sectioning time is finished, disconnect failed sections and close the circuitbreaker. Parameters ---------- curr_time : Time The current time dt : Time The current time step Returns ---------- None """ if self.power_network.connected_line.circuitbreaker.is_open: # If circuitbreaker in Microgrid with survival mode and parent Distribution # system has no failed lines if ( self.power_network.mode == MicrogridMode.SURVIVAL and self.power_network.distribution_network.failed_line is True ): return elif self.sectioning_time <= Time(0): self.disconnect_failed_sections() if ( not self.power_network.connected_line.failed and self.power_network.connected_line not in [ line for section in self.failed_sections for line in section.lines ] ): # Sectioning time finished self.power_network.connected_line.circuitbreaker.close() self.power_network.connected_line.section.connect(dt, self) self.failed_sections = []
[docs] def check_circuitbreaker_manually(self, curr_time: Time, dt: Time): """ Checks if the circuitbreaker in the distribution system is open manually. If sectioning time is finished, disconnect failed sections and close the circuitbreaker. Parameters ---------- curr_time : Time The current time dt : Time The current time step Returns ---------- None """ if self.power_network.connected_line.circuitbreaker.is_open: # If circuitbreaker in Microgrid with survival mode and parent Distribution # system has no failed lines if ( self.power_network.mode == MicrogridMode.SURVIVAL and self.power_network.distribution_network.failed_line is True ): return elif self.sectioning_time <= Time(0): self.disconnect_failed_sections() if not self.power_network.connected_line.failed: # Sectioning time finished self.power_network.connected_line.circuitbreaker.close() self.power_network.connected_line.section.connect_manually() self.failed_sections = []
[docs] def disconnect_failed_sections(self): """ Disconnects the failed sections in the microgrid Parameters ---------- None Returns ---------- None """ for section in self.failed_sections: section.disconnect()
[docs] def check_sensors(self, curr_time: Time, dt: Time): """ Loops through the sections connected to the controller determining which sensors have failed. Performs actions according to the sensor status in the respective section. If a section was disconnected and no longer includes any failed sensor, it is connected. If a section was connected and now includes a failed sensor, it is disconnected. The total sectioning time is summed from each section. Parameters ---------- curr_time : Time The current time dt : Time The current time step Returns ---------- None """ connected_sections = [ x for x in self.power_network.sections if x.state == SectionState.CONNECTED ] disconnected_sections = [ x for x in self.power_network.sections if x.state == SectionState.DISCONNECTED ] for section in disconnected_sections: sensors = unique( [ switch.line.sensor for switch in section.switches if switch.line.sensor is not None ] ) num_fails = 0 need_manual_attention = False for sensor in sensors: # If no ICT network if self.ict_node is None: ( repair_time, line_fail_status, ) = sensor.get_line_fail_status(dt) self.sectioning_time += repair_time # If both components have ICT nodes elif self.ict_node is not None and sensor.ict_node is not None: # If the ICT nodes are connected to each other if is_connected( node_1=self.ict_node, node_2=sensor.ict_node, network=self.ict_network, ): ( repair_time, line_fail_status, ) = sensor.get_line_fail_status(dt) self.sectioning_time += repair_time # If the ICT nodes are not connected to each other else: need_manual_attention = True line_fail_status = sensor.line.failed # If no ICT node on sensor else: need_manual_attention = True line_fail_status = sensor.line.failed num_fails += 1 if line_fail_status else 0 if need_manual_attention is True: self.sectioning_time += self.manual_sectioning_time if num_fails == 0: section.connect(dt, self) if section in self.failed_sections: self.failed_sections.remove(section) # Loop connected sections for section in connected_sections: sensors = unique( [ switch.line.sensor for switch in section.switches if switch.line.sensor is not None ] ) num_fails = 0 need_manual_attention = False # Loop sensors and count failed ones for sensor in sensors: # If no ICT network if self.ict_node is None: ( repair_time, line_fail_status, ) = sensor.get_line_fail_status(dt) self.sectioning_time += repair_time # If both components have ICT nodes elif self.ict_node is not None and sensor.ict_node is not None: # If the ICT nodes are connected to each other if is_connected( node_1=self.ict_node, node_2=sensor.ict_node, network=self.ict_network, ): ( repair_time, line_fail_status, ) = sensor.get_line_fail_status(dt) self.sectioning_time += repair_time # If the ICT nodes are not connected to each other else: need_manual_attention = True line_fail_status = sensor.line.failed # If no ICT node on sensor else: need_manual_attention = True line_fail_status = sensor.line.failed num_fails += 1 if line_fail_status else 0 if need_manual_attention is True: self.sectioning_time += self.manual_sectioning_time if num_fails > 0: section.state = SectionState.DISCONNECTED self.sectioning_time += section.get_disconnect_time(dt, self) self.failed_sections.append(section) self.failed_sections = unique(self.failed_sections)
[docs] def run_control_loop(self, curr_time: Time, dt: Time): """ System control check, determines if components have failed and performes the required action Parameters ---------- curr_time : Time The current time dt : Time The current time step Returns ---------- None """ self.sectioning_time = ( self.sectioning_time - dt if self.sectioning_time > Time(0) else Time(0) ) self.sectioning_time = max( self.sectioning_time, self.parent_sectioning_time, ) if ( self.power_network.connected_line.circuitbreaker.is_open and self.sectioning_time <= Time(0) ): self.check_components = True if self.check_components: self.check_sensors(curr_time, dt) self.check_components = False self.check_circuitbreaker(curr_time, dt)
[docs] def check_lines_manually(self, curr_time): """ Loops manually through the sections connected to the controller determining which lines have failed. Performs actions according to the line status in the respective section. If a section was disconnected and no longer includes any failed lines, it is connected. If a section was connected and now includes a failed line, it is disconnected. The total sectioning time is summed from each section. Parameters ---------- curr_time : Time The current time Returns ---------- None """ connected_sections = [ x for x in self.power_network.sections if x.state == SectionState.CONNECTED ] disconnected_sections = [ x for x in self.power_network.sections if x.state == SectionState.DISCONNECTED ] # Loop disconnected sections for section in disconnected_sections: if sum(x.failed for x in section.lines) == 0: section.connect_manually() if section in self.failed_sections: self.failed_sections.remove(section) # Loop connected sections for section in connected_sections: if sum(x.failed for x in section.lines) > 0: section.state = SectionState.DISCONNECTED self.failed_sections.append(section) self.failed_sections = unique(self.failed_sections) self.sectioning_time = self.manual_sectioning_time for line in section.lines: line.remaining_outage_time += self.sectioning_time
[docs] def run_manual_control_loop(self, curr_time: Time, dt: Time): """ Manual system control check, determines if components have failed and performes the required action. Parameters ---------- curr_time : Time The current time dt : Time The current time step Returns ---------- None """ self.sectioning_time = ( self.sectioning_time - dt if self.sectioning_time > Time(0) else Time(0) ) self.sectioning_time = max( self.sectioning_time, self.parent_sectioning_time, ) if ( self.power_network.connected_line.circuitbreaker.is_open and self.sectioning_time <= Time(0) ): self.check_components = True if self.check_components: self.check_lines_manually(curr_time) self.check_components = False self.check_circuitbreaker_manually(curr_time, dt)
[docs] def set_sectioning_time(self, sectioning_time): """ Sets the sectiom time of the microgrid based on the max value of the microgrid sectioning time and the sectioning time set by the controller Parameters ---------- sectioning_time : Time The sectioning time in the microgrid Returns ---------- None """ self.sectioning_time = max( self.sectioning_time, sectioning_time, )
[docs] def set_parent_sectioning_time(self, sectioning_time): """ Sets the sectioning time of the parent power network Parameters ---------- sectioning_time : Time The sectioning time in the microgrid Returns ---------- None """ self.parent_sectioning_time = sectioning_time
[docs] def update_fail_status(self, dt: Time): """ Updates the fail status Parameters ---------- dt : Time The current time step Returns ---------- None """ pass
[docs] def update_history( self, prev_time: Time, curr_time: Time, save_flag: bool ): """ Updates the history variables Parameters ---------- prev_time : Time The previous time curr_time : Time Current time save_flag : bool Indicates if saving is on or off Returns ---------- None """ if save_flag: time = curr_time.get_unit_quantity(curr_time.unit) self.history["sectioning_time"][ time ] = self.sectioning_time.get_unit_quantity(curr_time.unit)
[docs] def get_history(self, attribute: str): """ Returns the history variables of an attribute Parameters ---------- attribute : str Controller attribute Returns ---------- history[attribute] : dict Returns the history variables of an attribute """ return self.history[attribute]
[docs] def add_random_instance(self, random_gen): """ Adds global random seed Parameters ---------- random_gen : int Random number generator Returns ---------- None """ pass
[docs] def print_status(self): """ Prints the status Parameters ---------- None Returns ---------- None """
[docs] def reset_status(self, save_flag: bool): """ Resets and sets the status of the class parameters Parameters ---------- save_flag : bool Indicates if saving is on or off Returns ---------- None """ self.sectioning_time = Time(0) if save_flag: self.initialize_history()
[docs] def initialize_history(self): """ Initializes the history variables Parameters ---------- None Returns ---------- None """ self.history["sectioning_time"] = {}