Commit f885dc8c authored by Bc. Petr Elexa's avatar Bc. Petr Elexa

server: unify reader/door/panel naming

parent ba66686b
......@@ -11,14 +11,15 @@ class acs_database(object):
Containts users that belong to groups.
If user is part of a group it means that he is authorized to use doors in the group.
User ID (key)
User ID database
- value is group name (can be part of only one group)
- the user id must be an unique number (raw number of RFID card or chip)
Group (key)
- value is set of of door/reader addresses
- the group must be an unique name
- is a string (group name) and must not start with "__"
- key must be an unique number (raw number of RFID card or chip)
Group database
- value is set of of door/reader addresses that member of the group can use
- key is a string (unique group name) and must not start with "__"
Door database
- value is list containing mode and status
- key must be door address (unique)
"""
__ALL_GRP = b"__all" # Special group representing all doors.
......@@ -48,8 +49,8 @@ class acs_database(object):
socket_timeout=10, socket_keepalive=True, retry_on_timeout=True)
# create basic groups (if not present)
self.add_panels_to_group(self.__ALL_GRP, self.__RESERVED_ADDR)
self.add_panels_to_group(self.__EMPTY_GRP, self.__RESERVED_ADDR)
self.add_doors_to_group(self.__ALL_GRP, self.__RESERVED_ADDR)
self.add_doors_to_group(self.__EMPTY_GRP, self.__RESERVED_ADDR)
# Return user's group name or None if user does not exist.
def get_user_group(self, user_id:int) -> str:
......@@ -87,71 +88,71 @@ class acs_database(object):
return (cursor, users)
# Return created group's name, None if failed.
def create_group_for_panel(self, reader_addr:int) -> str:
group = "{}{}".format(self.__NONAME_GRP_PREFIX, reader_addr)
if self.add_panels_to_group(group, reader_addr):
def create_group_for_door(self, door_addr:int) -> str:
group = "{}{}".format(self.__NONAME_GRP_PREFIX, door_addr)
if self.add_doors_to_group(group, door_addr):
return group
else:
return None
# Return the number of readers that were added,
# not including all the readers already present.
def add_panels_to_group(self, group:str, *readers) -> int:
for reader_addr in readers:
if self.__rclient_door.llen(reader_addr) == 0:
self.__rclient_door.rpush(reader_addr, self.DOOR_MODE_ENABLED, self.DOOR_STATUS_CLOSED)
return self.__rclient_group.sadd(group, *readers)
# Return the number of readers that were removed from the set,
# not including non existing readers.
def remove_panels_from_group(self, group:str, *readers) -> int:
for reader_addr in readers:
self.__rclient_door.delete(reader_addr)
return self.__rclient_group.srem(group, readers)
# Return all readers in a group. Note that this can be a demanding operation.
def get_panels_in_group(self, group:str):
# Return the number of doors that were added,
# not including all the doors already present.
def add_doors_to_group(self, group:str, *doors) -> int:
for door_addr in doors:
if self.__rclient_door.llen(door_addr) == 0:
self.__rclient_door.rpush(door_addr, self.DOOR_MODE_ENABLED, self.DOOR_STATUS_CLOSED)
return self.__rclient_group.sadd(group, *doors)
# Return the number of doors that were removed from the set,
# not including non existing doors.
def remove_doors_from_group(self, group:str, *doors) -> int:
for door_addr in doors:
self.__rclient_door.delete(door_addr)
return self.__rclient_group.srem(group, doors)
# Return all doors in a group. Note that this can be a demanding operation.
def get_doors_in_group(self, group:str):
return self.__rclient_group.smembers(group)
# Return true if door is in database
def is_door_registered(self, reader_addr:int) -> bool:
self.__rclient_door.llen(reader_addr) == 2
def is_door_registered(self, door_addr:int) -> bool:
self.__rclient_door.llen(door_addr) == 2
# Return True if user is authorized for given reader (door) number.
def is_user_authorized(self, user_id:int, reader_addr:int) -> bool:
# Return True if user is authorized for given door (door) number.
def is_user_authorized(self, user_id:int, door_addr:int) -> bool:
group = self.get_user_group(user_id)
if group is None or user_id == self.__RESERVED_ADDR or group == self.__EMPTY_GRP:
return False
elif group == self.__ALL_GRP:
return True
else:
return self.__rclient_group.sismember(group, reader_addr)
return self.__rclient_group.sismember(group, door_addr)
# Log that user accessed a door/reader.
def log_user_access(self, user_id, reader_addr):
logging.info("User \"{}\" accessed \"{}\"".format(user_id, reader_addr))
# Log that user accessed a door/door.
def log_user_access(self, user_id, door_addr):
logging.info("User \"{}\" accessed \"{}\"".format(user_id, door_addr))
# Mode is one of DOOR_MODE_...
def set_door_mode(self, reader_addr, mode):
self.__rclient_door.lset(reader_addr, self.__DOOR_MODE_IDX, mode)
def set_door_mode(self, door_addr, mode):
self.__rclient_door.lset(door_addr, self.__DOOR_MODE_IDX, mode)
# Return one of DOOR_MODE_...
def get_door_mode(self, reader_addr):
door_mode = self.__rclient_door.lindex(reader_addr, self.__DOOR_MODE_IDX)
def get_door_mode(self, door_addr):
door_mode = self.__rclient_door.lindex(door_addr, self.__DOOR_MODE_IDX)
if door_mode is None:
logging.warning("Door {} does not exist! Check DB consistency.".format(reader_addr))
logging.warning("Door {} does not exist! Check DB consistency.".format(door_addr))
return door_mode
# Door open/close status is tracked.
def set_door_is_open(self, reader_addr, is_open:bool):
def set_door_is_open(self, door_addr, is_open:bool):
status = self.DOOR_STATUS_CLOSED
if is_open:
status = self.DOOR_STATUS_OPEN
self.__rclient_door.lset(reader_addr, self.__DOOR_STATUS_IDX, status)
self.__rclient_door.lset(door_addr, self.__DOOR_STATUS_IDX, status)
# Door open/close status is tracked.
def is_door_open(self, reader_addr) -> bool:
if self.__rclient_door.lindex(reader_addr, self.__DOOR_STATUS_IDX) == self.DOOR_STATUS_OPEN:
def is_door_open(self, door_addr) -> bool:
if self.__rclient_door.lindex(door_addr, self.__DOOR_STATUS_IDX) == self.DOOR_STATUS_OPEN:
return True
else:
return False
......@@ -69,7 +69,7 @@ class acs_server(object):
if self.db.get_user_group(user_id) is not None:
return
# create special group
group = self.db.create_group_for_panel(reader_addr)
group = self.db.create_group_for_door(reader_addr)
if group is not None:
if self.db.add_user(user_id, group):
return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment