Page MenuHomec4science

simple_control.py
No OneTemporary

File Metadata

Created
Sun, May 12, 04:11

simple_control.py

__author__ = 'Olivier Van Cutsem'
from building_data_management.category_management.category_config import *
from ems_config import *
from ems_main import EnergyManagementSystem
## Various strategies
class EMS_simple(EnergyManagementSystem):
def __init__(self, in_q, out_q, gui_q):
"""
Initialize ClusteringManagementSystem object.
Read messages from the BMS interface bubble messages through a Queue
"""
super(EMS_simple, self).__init__(in_q, out_q, gui_q)
def update(self):
"""
TODO
"""
commands_set = []
## 1) Battery commands
batteries_states = EMS_simple.battery_control(self.building_data.get_entity_list(EMS_CATEGORY_ENTITY_LOAD),
self.building_data.get_entity_list(EMS_CATEGORY_ENTITY_STORAGE),
self.building_data.get_entity_list(EMS_CATEGORY_ENTITY_GENERATION))
for (s_id, s_value) in batteries_states:
storage_command = {EMS_DOWNSTREAM_MSG_TYPE: EMS_DOWNSTREAM_ENERGY_ENTITY_COMMAND,
EMS_DOWNSTREAM_ENERGY_ENTITY_COMMAND_TYPE: EMS_ENERGY_power,
EMS_DOWNSTREAM_MSG_ENTITY_TYPE: EMS_CATEGORY_ENTITY_STORAGE,
EMS_DOWNSTREAM_MSG_ENTITY_ID: s_id,
EMS_DOWNSTREAM_MSG_PAYLOAD: {EMS_ENERGY_power: s_value}}
commands_set.append(storage_command)
print("EMS core decides to send: {0}".format(commands_set))
return commands_set
@staticmethod
def battery_control(_loads, _storages, _generations):
"""
:return:
"""
# 1) store the current state
power_load = 0.0
for l in _loads:
(l_id, l_o) = l
if l_o.current_power is not None:
power_load += l_o.current_power
power_gen = 0.0
for g in _generations:
(g_id, g_o) = g
if g_o.current_power is not None:
power_gen += g_o.current_power
imbalance_power = (power_load + power_gen)
if imbalance_power > 0: # Need to withdraw power
_storages = EMS_simple.remove_empty_storages(_storages)
else: # Need to store power
_storages = EMS_simple.remove_full_storages(_storages)
## 2) store/pump the imbalance to the batteries
nb_storages = len(_storages)
if nb_storages > 0:
imbalance_per_storage = imbalance_power / nb_storages
return [(s_id, -imbalance_per_storage) for (s_id, s_obj) in _storages]
else:
return []
@staticmethod
def remove_empty_storages(_storage_list):
ret = list()
for s in _storage_list:
(s_id, s_o) = s
if not(s_o.is_empty):
ret.append(s)
return ret
@staticmethod
def remove_full_storages(_storage_list):
ret = list()
for s in _storage_list:
(s_id, s_o) = s
if not s_o.is_full:
ret.append(s)
return ret

Event Timeline