Commit 76d790e6 authored by Bc. Petr Elexa's avatar Bc. Petr Elexa

fix support to switch learn mode by card

parent 20599dfb
......@@ -295,7 +295,8 @@ class acs_can_proto(object):
return self.msg_auth_fail(src, user_id)
elif fc == self.FC_LEARN_USER:
if self.cb_learn_user is not None:
ok = self.cb_learn_user(src, int.from_bytes(msg_data[:4], "little", signed=True))
user_id = int.from_bytes(msg_data[:4], "little", signed=True)
ok = self.cb_learn_user(src, user_id)
if ok is None:
return self.NO_MESSAGE
elif ok:
......@@ -65,23 +65,26 @@ class acs_database(object):
group = self.__rclient_user.get(user_id)
return group
# Return True if user was added.
# add user to an existing group
def add_user(self, user_id:int, group:str=__ALL_GRP, expire_secs:int=0) -> bool:
if self.__rclient_group.exists(group) == 0:
return False
if expire_secs > 0:
status = self.__rclient_user.set(user_id, group, ex=expire_secs)
self.__rclient_user.set(user_id, group, ex=expire_secs)
status = self.__rclient_user.set(user_id, group)
return status
self.__rclient_user.set(user_id, group)
return True
# Return True if user was removed.
def remove_user(self, user_id) -> bool:
status = self.__rclient_user.delete(user_id)
return True if (status == 1) else False
return True if (status > 0) else False
# Return True if group was removed.
def remove_group(self, group) -> bool:
status = self.__rclient_group.delete(group)
return True if (status == 1) else False
def remove_group(self, group, only_empty) -> bool:
if not only_empty or (only_empty and (self.__rclient_group.scard(group) == 0)):
status = self.__rclient_group.delete(group)
return True if (status > 0) else False
# Return portition of users (depending on the cursor value).
def get_users(self, cursor:int=0):
......@@ -98,10 +101,8 @@ class acs_database(object):
# Return created group's name, None if failed.
def create_group_for_door(self, door_addr:int) -> str:
group = "{}{}".format(self.__NONAME_GRP_PREFIX, door_addr)
if self.add_doors_to_group(group, door_addr):
return group
return None
self.add_doors_to_group(group, door_addr)
return group
# Return the number of doors that were added,
# not including all the doors already present.
......@@ -143,8 +144,14 @@ class acs_database(object):
return self.USER_AUTH_FAIL
# Log that user accessed a door/door.
def log_user_access(self, user_id, door_addr):"User \"{}\" accessed \"{}\"".format(user_id, door_addr))
def log_user_access(self, user_id, door_addr, allowed):
group = self.get_user_group(user_id)
if group is None:
group = ""
if allowed:"User \"{}\" from \"{}\" access to \"{}\" allowed".format(user_id, group, door_addr))
else:"User \"{}\" from \"{}\" access to \"{}\" denied".format(user_id, group, door_addr))
# Mode is one of DOOR_MODE_...
def set_door_mode(self, door_addr, mode):
......@@ -59,7 +59,7 @@ class acs_server(object):
def change_door_mode(self, reader_addr, mode):
if self.debug:
logging.debug("change_door_mode: reader={}, mode={}".format(reader_addr, mode))
self.db.set_door_mode(reader_addr, self.db.DOOR_MODE_ENABLED)
self.db.set_door_mode(reader_addr, mode)
if mode == self.db.DOOR_MODE_LEARN:
can_id, dlc, data = self.proto.msg_reader_learn_mode(reader_addr)
self.proto.can_sock.send(can_id, dlc, data)
......@@ -75,10 +75,10 @@ class acs_server(object):
self.proto.can_sock.send(can_id, dlc, data)
# add a new user to database
def add_new_user(self, user_id, reader_addr, extend_group):
def add_new_user(self, reader_addr, user_id, extend_group):
if self.debug:
logging.debug("add_new_user: reader={}, user={}".format(reader_addr, user_id))
# do not add existing users
# check if existing user
group = self.db.get_user_group(user_id)
if group is not None:
if extend_group:
......@@ -89,16 +89,14 @@ class acs_server(object):
# create special group
group = self.db.create_group_for_door(reader_addr)
if group is not None:
if self.db.add_user(user_id, group):
return True
# remove group is adding failed
return False
if self.db.add_user(user_id, group):
return True
logging.error("Created group does not exist")
return False
# callback to learn user request
def _learn_user(self, user_id, reader_addr):
def _learn_user(self, reader_addr, user_id):
if self.debug:
logging.debug("learn_user: reader={}, user={}".format(reader_addr, user_id))
mode = self.db.get_door_mode(reader_addr)
......@@ -110,11 +108,14 @@ class acs_server(object):
self.change_door_mode(reader_addr, self.db.DOOR_MODE_ENABLED)
return None
elif user_auth_type == self.db.USER_NOT_EXIST:
return self.add_new_user(user_id, reader_addr, False)
return self.add_new_user(reader_addr, user_id, False)
elif user_auth_type == self.db.USER_AUTH_OK:
return self.add_new_user(user_id, reader_addr, True)
return self.add_new_user(reader_addr, user_id, True)
return False
# reset reader mode because it is inconsistent
can_id, dlc, data = self.proto.msg_reader_normal_mode(reader_addr)
self.proto.can_sock.send(can_id, dlc, data)
return None
# callback to authorization request
# return True if authorized to open door False otherwise
......@@ -127,12 +128,13 @@ class acs_server(object):
if mode == self.db.DOOR_MODE_ENABLED:
user_auth_type = self.db.user_authorization(user_id, reader_addr)
if user_auth_type == self.db.USER_AUTH_OK:
self.db.log_user_access(user_id, reader_addr)
self.db.log_user_access(user_id, reader_addr, True)
return True
elif user_auth_type == self.db.USER_AUTH_LEARN:
self.change_door_mode(reader_addr, self.db.DOOR_MODE_LEARN)
return None
self.db.log_user_access(user_id, reader_addr, False)
return False
return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment