Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91050310
bms_restful_client.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 7, 08:19
Size
6 KB
Mime Type
text/x-python
Expires
Sat, Nov 9, 08:19 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22186286
Attached To
R3852 EMS for Smart-Building
bms_restful_client.py
View Options
__author__
=
'Olivier Van Cutsem'
from
ems_config
import
*
from
bmsinterface.bms_interface_config
import
*
import
bmsinterface.auxiliary.requests.api
as
requests
from
building_data_management.category_management.category_config
import
*
import
sys
def
get_bms_info
(
resource_type
,
resource_id
):
"""Returns the info for the given ID according to the docs in openBMS
@IN:
resource_type: the type of required resource (unit info, room loads, etc)
resource_id: the ID corresponding to the desired resource
return: the JSON format of the request result
"""
geturl
=
'http://{0}:{1}/API/{2}/{3}'
.
format
(
OPENBMS_IP
,
OPENBMS_PORT
,
resource_type
,
resource_id
)
print
(
"Sending BMS request: GET {0}"
.
format
(
geturl
))
r
=
requests
.
get
(
geturl
)
# always check the return status code!
if
r
.
status_code
==
200
:
# Otherwise it means openBMS doesnt work
request_result
=
r
.
json
()
return
request_result
elif
r
.
status_code
==
404
and
r
.
content
:
# Server replied but not found the midid
print
(
'Failed to find in openBMS the {0}: {1}'
.
format
(
resource_type
,
resource_id
))
sys
.
exit
()
else
:
print
(
'Failed to connect to {0}:{1}, is openBMS running?'
.
format
(
OPENBMS_IP
,
OPENBMS_PORT
))
sys
.
exit
()
### UNIT INFO ###
def
get_rooms_in_unit
(
unit_id
):
"""
Fetch the BMS for the rooms info of a specified unit
:param unit_id: the unit whose rooms info should be known
:return: the rooms info contained in the specified unit
"""
### Make the BMS request
res_type
=
'unit_rooms'
res_id
=
unit_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
ret
=
dict
()
for
room
in
ret_bms
:
id
=
room
[
'pk'
]
data
=
room
[
'fields'
]
ret
[
id
]
=
{
BMS_KEY_ROOM_NAME
:
data
[
'name'
],
BMS_KEY_ROOM_SURFACE
:
data
[
'surface'
],
BMS_KEY_ROOM_VOLUME
:
data
[
'surface'
]
*
3
}
return
ret
def
get_interface_geometry
(
unit_id
):
"""
Fetch the BMS for the room interfaces info of a specified unit
:param unit_id: the unit whose room interfaces info should be known
:return: the room interfaces info contained in the specified unit
"""
### Make the BMS request
res_type
=
'TODO'
res_id
=
unit_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
ret
=
dict
()
#TODO ! get the interface info
return
ret
def
get_loads_data
(
unit_id
):
### Make the BMS request
res_type
=
'TODO'
res_id
=
unit_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
def
get_storage_data
(
unit_id
):
pass
def
get_generation_data
(
unit_id
):
pass
### ROOM INFO ###
def
get_ambient_sensors_in_room
(
room_id
):
"""
Fetch the BMS for the room info of a specified room
:param unit_id: the unit whose rooms info should be known
:return: the rooms info contained in the specified unit
"""
res_type
=
'room_sensors'
res_id
=
room_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
sensor_of_interest
=
[
EMS_COMFORT_temperature
,
EMS_COMFORT_luminosity
,
EMS_COMFORT_presence
,
EMS_COMFORT_humidity
]
ret
=
dict
()
for
sensor
in
ret_bms
:
data
=
sensor
[
'fields'
]
id
=
sensor
[
'pk'
]
type_meas
=
BMS_KEY_EMS_SENSORS_CONVERSION
[
data
[
'measure'
]]
comf_constraints
=
data
[
'additional_parameters'
][
'constraints'
]
if
type_meas
not
in
sensor_of_interest
:
# check only the comfort sensors
continue
current_value
=
get_sensors_last_value
(
id
)
ret
[
id
]
=
{
BMS_KEY_COMFORT_TYPE
:
type_meas
,
BMS_KEY_COMFORT_CONSTRAINTS
:
comf_constraints
,
BMS_KEY_COMFORT_VALUE
:
current_value
}
return
ret
def
get_energy_sensors_in_room
(
room_id
):
res_type
=
'room_sensors'
res_id
=
room_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
sensor_of_interest
=
[
EMS_ENERGY_soc
,
EMS_ENERGY_power
]
ret
=
dict
()
for
sensor
in
ret_bms
:
data
=
sensor
[
'fields'
]
id
=
sensor
[
'pk'
]
type_meas
=
BMS_KEY_EMS_SENSORS_CONVERSION
[
data
[
'measure'
]]
if
type_meas
not
in
sensor_of_interest
:
# check only the energy-related sensors
continue
current_value
=
get_sensor_last_value
(
id
)
type_entity
=
None
entity_related_id
=
None
if
data
[
'load'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_LOAD
entity_related_id
=
data
[
'load'
]
elif
data
[
'storage'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_STORAGE
entity_related_id
=
data
[
'storage'
]
elif
data
[
'generator'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_GENERATION
entity_related_id
=
data
[
'generator'
]
ret
[
id
]
=
{
BMS_KEY_ENERGY_MEASURE_TYPE
:
type_meas
,
BMS_KEY_ENERGY_MEASURE_VALUE
:
current_value
,
BMS_KEY_ENERGY_ENTITY_RELATED_TYPE
:
type_entity
,
BMS_KEY_ENERGY_ENTITY_RELATED_ID
:
entity_related_id
}
return
ret
def
get_actuators_in_room
(
room_id
):
res_type
=
'room_actuators'
res_id
=
room_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
ret
=
dict
()
for
actuator
in
ret_bms
:
data
=
actuator
[
'fields'
]
id
=
actuator
[
'pk'
]
current_value
=
get_actuator_status
(
id
)
type_entity
=
None
entity_related_id
=
None
if
data
[
'load'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_LOAD
entity_related_id
=
data
[
'load'
]
elif
data
[
'storage'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_STORAGE
entity_related_id
=
data
[
'storage'
]
elif
data
[
'generator'
]
is
not
None
:
type_entity
=
EMS_CATEGORY_ENTITY_GENERATION
entity_related_id
=
data
[
'generator'
]
ret
[
id
]
=
{
BMS_KEY_ENERGY_ENTITY_RELATED_TYPE
:
type_entity
,
BMS_KEY_ENERGY_ENTITY_RELATED_ID
:
entity_related_id
,
BMS_KEY_ACTUATOR_ACTION_TYPE
:
data
[
'action'
],
BMS_KEY_ACTUATOR_STATUS
:
current_value
}
return
ret
### SENSOR & ACTUATOR INFO ###
def
get_sensor_last_value
(
sensor_id
):
res_type
=
'last/sensor'
res_id
=
sensor_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
key_type
=
ret_bms
.
keys
()
key_type
=
key_type
[
0
]
time_value
=
ret_bms
[
key_type
]
# the answer is [time, value]
return
time_value
[
1
]
def
get_actuator_status
(
actuator_id
):
res_type
=
'status/actuator'
res_id
=
actuator_id
ret_bms
=
get_bms_info
(
res_type
,
res_id
)
### Process the request for answer formatting
if
'Status'
in
ret_bms
:
return
ret_bms
[
'Status'
]
else
:
return
None
Event Timeline
Log In to Comment