import sqlite3 as sqlite from model.distribution import Distribution from model.group import Group from model.member import Member from model.team import Team class IntegrityError(sqlite.IntegrityError): pass class Database: def __init__(self, path): self.connection = sqlite.connect(path) self.checkDb() self.connection.isolation_level = None # so we don't have to use self.connection.commit() self.connection.row_factory = sqlite.Row self.connection.execute("PRAGMA foreign_keys = ON") def checkDb(self): """ check if db is valid """ self.connection.execute("SELECT * FROM groups") def getGroups(self): """ :return: all groups in database """ curs = self.connection.execute("SELECT * FROM groups") res = [] for row in curs: res.append(Group(row['group_id'], row['name'])) curs.close() return res def getGroupMembers(self, group_id): """ :param group_id :return: members of group """ curs = self.connection.execute('''SELECT * FROM member WHERE group_id = ?''', (group_id,)) res = [] for row in curs: res.append(Member(row['member_id'], row['name'], row['group_id'])) curs.close() return res def createNewGroup(self): """ create new group (with some default name) :return: Group """ name = "Nová skupina" try: curs = self.connection.execute('''INSERT INTO groups (name) VALUES (?) ''', (name,)) except sqlite.IntegrityError as e: inserted = False i = 0 while inserted is False: i += 1 try: curs = self.connection.execute('''INSERT INTO groups (name) VALUES (?) ''', (name + " " + str(i),)) return Group(curs.lastrowid, name + " " + str(i)) except sqlite.IntegrityError as e: pass return Group(curs.lastrowid, name) def deleteGroup(self, group_id): """ delete group and all its members :param group_id: """ self.connection.execute('''DELETE FROM groups WHERE group_id = ?''', (group_id,)) def getGroupByName(self, group_name): """ :param group_name: :return: Group (instance) """ curs = self.connection.execute('''SELECT * FROM groups WHERE name LIKE ?''', (group_name,)) row = curs.fetchone() if row is None: return None return Group(row['group_id'], row['name']) def getGroupById(self, group_id): """ :param group_id: :return: Group (instance) """ curs = self.connection.execute('''SELECT * FROM groups WHERE group_id = ?''', (group_id,)) row = curs.fetchone() if row is None: return None return Group(row['group_id'], row['name']) def changeGroupName(self, group_id, new_name): """ set name of group to new_name :param group_id: :param new_name: :raise IntegrityError if the name is not unique """ try: self.connection.execute('''UPDATE groups SET name = ? WHERE group_id = ?''', (new_name, group_id,)) except sqlite.IntegrityError: raise IntegrityError() def createMember(self, group_id, name): """ create member of group :param group_id: :param name: :raise IntegrityError if the name is not unique in given group :return Member (instance) - the new created member """ try: curs = self.connection.execute( '''INSERT INTO member (name, group_id) VALUES (?, ?)''', (name, group_id,) ) except sqlite.IntegrityError: raise IntegrityError() return Member(curs.lastrowid, name, group_id) def getMemberById(self, member_id): """ get member by id :param member_id: :return: Member """ curs = self.connection.execute('''SELECT * FROM member WHERE member_id = ?''', (member_id,)) row = curs.fetchone() if row is None: return None return Member(member_id, row['name'], row['group_id']) def createDistribution(self, name, group_id): """ create new distribution :param name: :return: id of the newly created distribution """ curs = self.connection.execute('''INSERT INTO distribution (name, group_id) VALUES (?, ?)''', (name, group_id,)) return curs.lastrowid def createTeam(self, name, distribution_id, curs=None): """ create new team :param team_id: :param distribution_id: :param name: :param curs: cursor to operate on :return: id of the newly created team """ if curs is None: curs = self.connection.cursor() curs.execute('''INSERT INTO team (name, distribution_id) VALUES (?, ?)''', (name, distribution_id,)) return curs.lastrowid def addMembersToTeam(self, team_id, members, curs=None): """ add members to team :param team_id: :param members: iterable of Member :param curs: cursor to operate on """ if curs is None: curs = self.connection.cursor() for member in members: curs.execute('''INSERT INTO member_to_team (member_id, team_id) VALUES (?, ?)''', (member.id_, team_id,)) def getGroupName(self, group_id): """ :param group_id: :return: group name """ curs = self.connection.execute('''SELECT name FROM groups WHERE group_id = ?''', (group_id,)) row = curs.fetchone() return row['name'] def getHistoryCount(self, member_id): """ :param member_id: :return: list [(member, count how many times he was in the same team,), ...] """ curs = self.connection.execute( ''' SELECT m2.*, IFNULL(history.times_together, 0) times_together FROM member m1 JOIN member m2 ON m1.group_id = m2.group_id AND m1.member_id != m2.member_id LEFT JOIN (SELECT * FROM member_history_view) history ON m1.member_id = history.member_id AND m2.member_id = history.with_member_id WHERE m1.member_id = ? ''', (member_id,)) res = {} for row in curs: res[Member(row['member_id'], row['name'], row['group_id'])] = row['times_together'] return res def getDistributionsByGroupID(self, group_id): """ :param group_id: :return: list of all Distribution(s) of given group """ curs = self.connection.execute('''SELECT * FROM distribution WHERE group_id = ? ORDER BY creation_time DESC''', (group_id,)) res = [] for row in curs: teams = self.getTeamsByDistributionID(row['distribution_id']) res.append(Distribution(row['name'], teams, row['group_id'], row['distribution_id'])) return res def getTeamsByDistributionID(self, distribution_id): """ :param distribution_id: :return: list of Team(s) in given distribution """ curs = self.connection.execute('''SELECT * FROM team WHERE distribution_id = ?''', (distribution_id,)) res = [] for row in curs: members = self.getMembersByTeamId(row['team_id']) res.append(Team(row['team_id'], row['name'], members)) return res def getMembersByTeamId(self, team_id): """ :param team_id: :return: list of Member(s) in given team """ curs = self.connection.execute('''SELECT * FROM member_to_team JOIN member ON member_to_team.member_id = member.member_id WHERE team_id = ?''', (team_id,)) res = [] for row in curs: res.append(Member(row['member_id'], row['name'], row['group_id'])) return res def deleteMember(self, member_id): """ delete member from everywhere (distributions and stuff) :param member_id: """ self.connection.execute('''DELETE FROM member WHERE member_id = ?''', (member_id,)) def changeMemberName(self, member_id, new_name): """ set members name to new_name :param member_id: :param new_name: :raise IntegrityError if the name is not unique in member's group """ try: self.connection.execute('''UPDATE member SET name = ? WHERE member_id = ?''', (new_name, member_id,)) except sqlite.IntegrityError: raise IntegrityError() def deleteDistribution(self, distribution_id): """ delete distribution :param distribution_id: id of distribution that should be deleted """ self.connection.execute('''DELETE FROM distribution WHERE distribution_id = ?''', (distribution_id,)) def beginTransaction(self): """ starts transaction :return: curs with started transaction NOTE: whoever uses this has the responsibility of calling endTransaction providing the returned cursor (after finishing his operations) """ return self.connection.execute("begin") def endTransaction(self, curs): """ end transaction on given cursor :param curs: """ curs.execute("commit")