gemspace/cgi-example/db.py

160 lines
4.1 KiB
Python

import sqlite3
from sqlite3 import Error
import os
import urllib.parse
dir_name = os.path.dirname(__file__)
db_name = r"db/cgi-example.sqlite3"
db_file = os.path.join(dir_name, db_name)
def create_connection():
""" create a database connection to the SQLite database
specified by db_file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
except Error as e:
print(e)
return conn
def add_user(conn):
"""
Create a new user record
:param conn: database connection object
:return: user id
"""
sql = ''' INSERT INTO users DEFAULT VALUES '''
cur = conn.cursor()
try:
cur.execute(sql)
conn.commit()
except:
return 'error in insert'
return cur.lastrowid
def add_hash(conn, tls_client_hash, user_id):
"""
Create a new record for this cert
:param conn: database connection object
:param tls_client_hash:
:param user_id:
:return: certificate id
"""
sql = ''' INSERT OR REPLACE INTO certs(hash,user_id) VALUES(?,?) '''
cur = conn.cursor()
cur.execute(sql, (tls_client_hash, user_id))
conn.commit()
return cur.lastrowid
def get_user(conn, tls_client_hash):
"""
Get user id matching tls_client_hash if it exists
:param conn: database connection object
:param tls_client_hash:
:return: user id or None
"""
cur = conn.cursor()
cur.execute("SELECT user_id FROM certs WHERE hash=?", (tls_client_hash,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_id
def get_user_by_keycode(conn, key_code):
"""
Get user id of tls_client_hash if it exists
:param conn: database connection object
:param key_code: authorization key code
:return: user id or None
"""
cur = conn.cursor()
cur.execute("SELECT id FROM users WHERE add_key_code=?", (key_code,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_id
def check_hash(tls_client_hash):
"""
Check for existing user with hash or add a new one
:param conn:
:param tls_client_hash:
:return: user id
"""
conn = create_connection()
with conn:
user_id = get_user(conn, tls_client_hash)
if (user_id is None):
user_id = add_user(conn)
add_hash(conn, tls_client_hash, user_id)
return user_id
def get_name(user_id):
"""
Get user name if it exists
:param user_id:
:return: user name
"""
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT name FROM users WHERE id=?", (user_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_name
def set_name(user_id, user_name):
"""
Update or set name on user
:param user_id:
:param user_name:
"""
conn = create_connection()
with conn:
sql = ''' UPDATE users SET name=(?) WHERE id=? '''
cur = conn.cursor()
cur.execute(sql, (urllib.parse.unquote(user_name), user_id))
conn.commit()
return cur.lastrowid
def set_add_key_code(user_id, key_code):
"""
Update or set key code on user
:param user_id:
:param key_code:
"""
conn = create_connection()
with conn:
""" There should probably be a check here to ensure no user
has this key_code already
"""
sql = ''' UPDATE users SET add_key_code=(?) WHERE id=? '''
cur = conn.cursor()
cur.execute(sql, (key_code, user_id))
conn.commit()
return cur.lastrowid
def add_cert_to_user(key_code, tls_client_hash):
"""
Check for existing user with hash or add a new one
:param conn:
:param key_code:
:param tls_client_hash:
:return: true if added, false if not found
"""
conn = create_connection()
with conn:
user_id = get_user_by_keycode(conn, key_code)
if (user_id is None):
return False
add_hash(conn, tls_client_hash, user_id)
return True
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai