160 lines
4.1 KiB
Python
160 lines
4.1 KiB
Python
import sqlite3
|
|
from sqlite3 import Error
|
|
import os
|
|
import urllib.parse
|
|
|
|
dir_name = os.path.dirname(__file__)
|
|
db_name = r"db/cgi-example.sqlite3"
|
|
db_file = os.path.join(dir_name, db_name)
|
|
|
|
def create_connection():
|
|
""" create a database connection to the SQLite database
|
|
specified by db_file
|
|
:return: Connection object or None
|
|
"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(db_file)
|
|
except Error as e:
|
|
print(e)
|
|
return conn
|
|
|
|
def add_user(conn):
|
|
"""
|
|
Create a new user record
|
|
:param conn: database connection object
|
|
:return: user id
|
|
"""
|
|
sql = ''' INSERT INTO users DEFAULT VALUES '''
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(sql)
|
|
conn.commit()
|
|
except:
|
|
return 'error in insert'
|
|
return cur.lastrowid
|
|
|
|
def add_hash(conn, tls_client_hash, user_id):
|
|
"""
|
|
Create a new record for this cert
|
|
:param conn: database connection object
|
|
:param tls_client_hash:
|
|
:param user_id:
|
|
:return: certificate id
|
|
"""
|
|
sql = ''' INSERT OR REPLACE INTO certs(hash,user_id) VALUES(?,?) '''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (tls_client_hash, user_id))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def get_user(conn, tls_client_hash):
|
|
"""
|
|
Get user id matching tls_client_hash if it exists
|
|
:param conn: database connection object
|
|
:param tls_client_hash:
|
|
:return: user id or None
|
|
"""
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT user_id FROM certs WHERE hash=?", (tls_client_hash,))
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return None
|
|
else:
|
|
return row[0] # user_id
|
|
|
|
def get_user_by_keycode(conn, key_code):
|
|
"""
|
|
Get user id of tls_client_hash if it exists
|
|
:param conn: database connection object
|
|
:param key_code: authorization key code
|
|
:return: user id or None
|
|
"""
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM users WHERE add_key_code=?", (key_code,))
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return None
|
|
else:
|
|
return row[0] # user_id
|
|
|
|
def check_hash(tls_client_hash):
|
|
"""
|
|
Check for existing user with hash or add a new one
|
|
:param conn:
|
|
:param tls_client_hash:
|
|
:return: user id
|
|
"""
|
|
conn = create_connection()
|
|
with conn:
|
|
user_id = get_user(conn, tls_client_hash)
|
|
if (user_id is None):
|
|
user_id = add_user(conn)
|
|
add_hash(conn, tls_client_hash, user_id)
|
|
return user_id
|
|
|
|
def get_name(user_id):
|
|
"""
|
|
Get user name if it exists
|
|
:param user_id:
|
|
:return: user name
|
|
"""
|
|
conn = create_connection()
|
|
with conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT name FROM users WHERE id=?", (user_id,))
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return None
|
|
else:
|
|
return row[0] # user_name
|
|
|
|
def set_name(user_id, user_name):
|
|
"""
|
|
Update or set name on user
|
|
:param user_id:
|
|
:param user_name:
|
|
"""
|
|
conn = create_connection()
|
|
with conn:
|
|
sql = ''' UPDATE users SET name=(?) WHERE id=? '''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (urllib.parse.unquote(user_name), user_id))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def set_add_key_code(user_id, key_code):
|
|
"""
|
|
Update or set key code on user
|
|
:param user_id:
|
|
:param key_code:
|
|
"""
|
|
conn = create_connection()
|
|
with conn:
|
|
""" There should probably be a check here to ensure no user
|
|
has this key_code already
|
|
"""
|
|
sql = ''' UPDATE users SET add_key_code=(?) WHERE id=? '''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (key_code, user_id))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def add_cert_to_user(key_code, tls_client_hash):
|
|
"""
|
|
Check for existing user with hash or add a new one
|
|
:param conn:
|
|
:param key_code:
|
|
:param tls_client_hash:
|
|
:return: true if added, false if not found
|
|
"""
|
|
conn = create_connection()
|
|
with conn:
|
|
user_id = get_user_by_keycode(conn, key_code)
|
|
if (user_id is None):
|
|
return False
|
|
add_hash(conn, tls_client_hash, user_id)
|
|
return True
|
|
|
|
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai
|