from enum import Enum
import matplotlib.lines as mlines
import numpy as np
from relsad.Time import Time, TimeUnit
from relsad.utils import convert_yearly_fail_rate, random_choice
from .Component import Component
from .ICTNode import ICTNode
from .Line import Line
[docs]class SensorState(Enum):
"""
Line sensor state
Attributes
----------
OK : int
The sensor is up and running
FAILED : int
The sensor has failed
REPAIR : int
The sensor is being repaired
"""
OK = 1
FAILED = 2
REPAIR = 3
[docs]class Sensor(Component):
"""
Class defining line sensors
...
Attributes
----------
name : string
Name of the sensor
line : Line
The line the sensor is connected to
coordinate : list
The coordinate of the sensor
ict_node : ICTNode
The ICT node connected to the sensor
fail_rate_per_year : float
The failure rate per year for the sensor
p_fail_repair_new_signal : float
The probability of a new signal can be sent
p_fail_repair_reboot : float
The probability that a reboot of the sensor works
new_signal_time : Time
The time it takes to send a new signal
reboot_time : Time
The time it takes to reboot the sensor
manual_repair_time : Time
The time it takes to manually repair the sensor
state : SensorState
Which state the sensor is in
remaining_repair_time : Time
The remaining repair time of the intelligent switch
history : dict
Dictonary attribute that stores the historic variables
monte_carlo_history : dict
Dictonary attribute that stores the historic variables from the Monte Carlo simulation
Methods
----------
fail()
Sets the sensor state to FAILED
not_fail()
Sets the sensor state to OK
draw_fail_status(dt)
Draws the state of the sensor for a given time step
draw_status(prob)
Sets the state of the sensor based on the probability of the state being FAILED
repair(dt)
Returns the repair time and fail status of the sensor
get_line_fail_status(dt)
Returns the repair time and fail status of the line
get_section()
Returns the line section
update_fail_status(dt)
Updates the fail status of the sensor
If the state of the sensor is REPAIR, the remaining repair time is set
If the state of the sensor is OK, the state of the sensor is drawn
update_history(prev_time, curr_time, save_flag)
Updates the history variables
get_history(attribute)
Returns the history variables of an attribute
add_random_instance(random_gen)
Adds global random instance
print_status()
Prints the status
reset_status(save_flag)
Resets the status of the sensor
initialize_history()
Initializes the history variables
"""
## Visual attributes
color = "rosybrown"
marker = "s"
size = 2**2
handle = mlines.Line2D(
[],
[],
marker=marker,
markeredgewidth=1,
markersize=size,
color=color,
linestyle="None",
)
## Random instance
ps_random: np.random.Generator = None
def __init__(
self,
name: str,
line: Line,
ict_node: ICTNode = None,
fail_rate_per_year: float = 0.023,
p_fail_repair_new_signal: float = 1 - 0.95,
p_fail_repair_reboot: float = 1 - 0.9,
new_signal_time: Time = Time(2, TimeUnit.SECOND),
reboot_time: Time = Time(5, TimeUnit.MINUTE),
manual_repair_time: Time = Time(2, TimeUnit.HOUR),
state: SensorState = SensorState.OK,
):
# Verify input
if line is None:
raise Exception("Sensor must be connected to a Line")
if line.parent_network is not None:
raise Exception(
"Sensor must be created before the line is connected to a network"
)
if fail_rate_per_year < 0:
raise Exception("The failure rate per year must be positive")
if p_fail_repair_new_signal < 0 or p_fail_repair_new_signal > 1:
raise Exception("p_fail_repair_new_signal must be between 0 and 1")
if p_fail_repair_reboot < 0 or p_fail_repair_reboot > 1:
raise Exception("p_fail_repair_reboot must be between 0 and 1")
self.name = name
self.line = line
line.sensor = self
# Define coordinate
x1, y1 = line.fbus.coordinate
x2, y2 = line.tbus.coordinate
x = (x1 + x2) / 2
y = (y1 + y2) / 2
self.coordinate = [x, y]
self.ict_node = ict_node
if ict_node is not None:
ict_node.coordinate = self.coordinate
self.fail_rate_per_year = fail_rate_per_year
self.p_fail_repair_new_signal = p_fail_repair_new_signal
self.p_fail_repair_reboot = p_fail_repair_reboot
self.new_signal_time = new_signal_time
self.reboot_time = reboot_time
self.remaining_repair_time = Time(0)
self.manual_repair_time = manual_repair_time
self.state = state
## History
self.history = {}
self.monte_carlo_history = {}
self.initialize_history()
def __str__(self):
return self.name
def __repr__(self):
return f"Sensor(name={self.name})"
def __eq__(self, other):
if hasattr(other, "name"):
return self.name == other.name and isinstance(other, Sensor)
else:
return False
def __hash__(self):
return hash(self.name)
[docs] def fail(self):
"""
Sets the sensor state to FAILED
Parameters
----------
None
Returns
----------
None
"""
self.state = SensorState.FAILED
[docs] def not_fail(self):
"""
Sets the sensor state to OK
Parameters
----------
None
Returns
----------
None
"""
self.state = SensorState.OK
[docs] def draw_fail_status(self, dt: Time):
"""
Draws the state of the sensor for a given time step
Parameters
----------
dt : Time
The current time step
Returns
----------
None
"""
p_not_fail = convert_yearly_fail_rate(self.fail_rate_per_year, dt)
self.draw_status(p_not_fail)
[docs] def draw_status(self, prob):
"""
Sets the state of the sensor based on the probability of the state being FAILED
Parameters
----------
prob : float
The probability that the sensor state is FAILED
Returns
----------
None
"""
if random_choice(self.ps_random, prob):
self.fail()
else:
self.not_fail()
[docs] def repair(self, dt: Time):
"""
Returns the repair time and fail status of the sensor
Parameters
----------
dt : Time
The current time step
Returns
----------
repair_time : Time
The repair time of the sensor
fail_status : bool
The fail status of the sensor
"""
repair_time = self.new_signal_time
fail_status = None
self.draw_status(self.p_fail_repair_new_signal)
if self.state == SensorState.OK:
fail_status = self.line.failed
return repair_time, fail_status
elif self.state == SensorState.FAILED:
repair_time += self.reboot_time
self.draw_status(self.p_fail_repair_reboot)
if self.state == SensorState.OK:
fail_status = self.line.failed
return repair_time, fail_status
elif self.state == SensorState.FAILED:
self.remaining_repair_time = self.manual_repair_time
self.state = SensorState.REPAIR
fail_status = True
return repair_time, fail_status
[docs] def get_line_fail_status(self, dt: Time):
"""
Returns the repair time and fail status of the line
Parameters
----------
dt : Time
The current time step
Returns
----------
repair_time : Time
The repair time of the line
fail_status : bool
The fail status of the line
"""
repair_time = None
fail_status = None
if self.state == SensorState.REPAIR:
repair_time = Time(0)
fail_status = True
return repair_time, fail_status
else:
if self.state == SensorState.OK:
repair_time = Time(0)
fail_status = self.line.failed
return repair_time, fail_status
elif self.state == SensorState.FAILED:
repair_time, fail_status = self.repair(dt)
return repair_time, fail_status
[docs] def get_section(self):
"""
Returns the line section
Parameters
----------
None
Returns
----------
line_section : Section
The parent section of the line
"""
line_section = self.line.section
return line_section
[docs] def update_fail_status(self, dt: Time):
"""
Updates the fail status of the sensor
If the state of the sensor is REPAIR, the remaining repair time is set
If the state of the sensor is OK, the state of the sensor is drawn
Parameters
----------
dt : Time
The current time step
Returns
----------
None
"""
if self.state == SensorState.REPAIR:
self.remaining_repair_time -= dt
if self.remaining_repair_time <= Time(0):
self.not_fail()
elif self.state == SensorState.OK:
self.draw_fail_status(dt)
[docs] def update_history(self, prev_time, curr_time, save_flag: bool):
"""
Updates the history variables
Parameters
----------
prev_time : Time
The previous time
curr_time : Time
The current time
save_flag : bool
Indicates if saving is on or off
Returns
----------
None
"""
if save_flag:
time = curr_time.get_unit_quantity(curr_time.unit)
self.history["remaining_repair_time"][
time
] = self.remaining_repair_time.get_unit_quantity(curr_time.unit)
self.history["state"][time] = self.state.value
[docs] def get_history(self, attribute: str):
"""
Returns the history variables of an attribute
Parameters
----------
attribute : str
System attribute
Returns
----------
history[attribute] : dict
Returns the history variables of an attribute
"""
return self.history[attribute]
[docs] def add_random_instance(self, random_gen):
"""
Adds global random instance
Parameters
----------
random_gen : np.random.default_rng()
Random number generator
Returns
----------
None
"""
self.ps_random = random_gen
[docs] def print_status(self):
"""
Prints the status
Parameters
----------
None
Returns
----------
None
"""
pass
[docs] def reset_status(self, save_flag: bool):
"""
Resets the status of the sensor
Parameters
----------
save_flag : bool
Indicates if saving is on or off
Returns
----------
None
"""
self.state = SensorState.OK
self.remaining_repair_time = Time(0)
if save_flag:
self.initialize_history()
[docs] def initialize_history(self):
"""
Initializes the history variables
Parameters
----------
None
Returns
----------
None
"""
self.history["remaining_repair_time"] = {}
self.history["state"] = {}