import sqlite3 from sqlite3 import Error import os import urllib.parse dir_name = os.path.dirname(__file__) db_name = r"cgi-example.sqlite3" db_file = os.path.join(dir_name, db_name) def create_connection(): """ create a database connection to the SQLite database specified by db_file :return: Connection object or None """ conn = None try: conn = sqlite3.connect(db_file) except Error as e: print(e) return conn def add_user(conn): """ Create a new user record :param conn: database connection object :return: user id """ sql = ''' INSERT INTO users DEFAULT VALUES ''' cur = conn.cursor() try: cur.execute(sql) conn.commit() except: return 'error in insert' return cur.lastrowid def add_hash(conn, tls_client_hash, user_id): """ Create a new record for this cert :param conn: database connection object :param tls_client_hash: :param user_id: :return: certificate id """ sql = ''' INSERT OR REPLACE INTO certs(hash,user_id) VALUES(?,?) ''' cur = conn.cursor() cur.execute(sql, (tls_client_hash, user_id)) conn.commit() return cur.lastrowid def get_user(conn, tls_client_hash): """ Get user id matching tls_client_hash if it exists :param conn: database connection object :param tls_client_hash: :return: user id or None """ cur = conn.cursor() cur.execute("SELECT user_id FROM certs WHERE hash=?", (tls_client_hash,)) row = cur.fetchone() if row is None: return None else: return row[0] # user_id def get_user_by_keycode(conn, key_code): """ Get user id of tls_client_hash if it exists :param conn: database connection object :param key_code: authorization key code :return: user id or None """ cur = conn.cursor() cur.execute("SELECT id FROM users WHERE add_key_code=?", (key_code,)) row = cur.fetchone() if row is None: return None else: return row[0] # user_id def check_hash(tls_client_hash): """ Check for existing user with hash or add a new one :param conn: :param tls_client_hash: :return: user id """ conn = create_connection() with conn: user_id = get_user(conn, tls_client_hash) if (user_id is None): user_id = add_user(conn) add_hash(conn, tls_client_hash, user_id) return user_id def get_name(user_id): """ Get user name if it exists :param user_id: :return: user name """ conn = create_connection() with conn: cur = conn.cursor() cur.execute("SELECT name FROM users WHERE id=?", (user_id,)) row = cur.fetchone() if row is None: return None else: return row[0] # user_name def set_name(user_id, user_name): """ Update or set name on user :param user_id: :param user_name: """ conn = create_connection() with conn: sql = ''' UPDATE users SET name=(?) WHERE id=? ''' cur = conn.cursor() cur.execute(sql, (urllib.parse.unquote(user_name), user_id)) conn.commit() return cur.lastrowid def set_add_key_code(user_id, key_code): """ Update or set key code on user :param user_id: :param key_code: """ conn = create_connection() with conn: """ There should probably be a check here to ensure no user has this key_code already """ sql = ''' UPDATE users SET add_key_code=(?) WHERE id=? ''' cur = conn.cursor() cur.execute(sql, (key_code, user_id)) conn.commit() return cur.lastrowid def add_cert_to_user(key_code, tls_client_hash): """ Check for existing user with hash or add a new one :param conn: :param key_code: :param tls_client_hash: :return: true if added, false if not found """ conn = create_connection() with conn: user_id = get_user_by_keycode(conn, key_code) if (user_id is None): return False add_hash(conn, tls_client_hash, user_id) return True #vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai