This commit is contained in:
rald 2023-08-04 22:14:10 +08:00
parent ac330d53b5
commit ec9cd8b249
17 changed files with 814 additions and 11 deletions

19
README.md Normal file
View File

@ -0,0 +1,19 @@
# CGI Example Project
The goal of this project is to give a working example of a basic Gemini CGI application. User identities are driven by TLS Client Certificates. This project will serve as a reference for CGI authors and be explained in detail in an instructional video.
## Features
* (DONE) New TLS certificates automatically create a new user account
* (DONE) Users can set data on their own account (a name field for example)
* (DONE) Users can add additional certificates to the same account by using a special code
## Setup
Create database with:
``` sqlite command to create database
sqlite3 db/cgi-example.sqlite3 < create_schema.sql
```
Then give write permission to the both the database and the containing directory (`db`) to the user that your Gemini server runs as. It is important that the containing folder write permissions are in place or you will see `50` errors returned when trying to write to the database.

26
add-cert.cgi Executable file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env python3
# modules are all in the parent directory
import sys
sys.path.append('..')
# import helpers from modules
from helpers import get_client_cert, get_query_string
from db import add_cert_to_user
tls_client_hash = get_client_cert(False)
key_code = get_query_string("What is your auth code?")
success = add_cert_to_user(key_code, tls_client_hash)
print("# Add Cert Page")
print()
if success:
print("Your new key has been added to this account. Congratulations!")
else:
print("Either your auth code is incorrect or something else has gone wrong.")
print()
print("=> login.cgi Back to main page")
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

View File

@ -1,9 +1,9 @@
#!/usr/bin/env python3
import random
from helpers import get_query_string, get_client_cert,show_header_ok
class Trie:
@ -21,14 +21,16 @@ class Trie:
curr.next[k]=Trie(j)
curr=curr.next[k]
curr.mark=True
f=open('dice.txt','r')
f=open('cgi-bin/dice.txt','r')
dice=f.read().split('\n')
f.close()
dice=list(filter(None,dice))
random.shuffle(dice)
b=[]
for j in range(4):
@ -37,16 +39,12 @@ for j in range(4):
die=dice[j*4+i]
b[j].append(die[random.randrange(0,len(die))].upper())
for j in range(4):
for i in range(4):
print(b[j][i],end=' ')
print()
def r(s):
return s and len(s)>=3 and len(s)<=16
f=open('enable.txt','r')
f=open('cgi-bin/enable.txt','r')
words=f.read().split('\n')
f.close()
words=list(filter(r,words))
@ -88,6 +86,18 @@ for j in range(4):
dfs(i,j,0,trie)
found.sort(key=len,reverse=True)
print(found)
show_header_ok()
print("```")
for j in range(4):
for i in range(4):
print(b[j][i],end=' ')
print()
print()
print("```")
# for z in found: print(z,end=' ')
print()
print("=> guess.cgi Guess a word")

BIN
boggle.sqlite3 Normal file

Binary file not shown.

24
change-name.cgi Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# modules are all in the parent directory
import sys
# sys.path.append('..')
# import helpers from modules
from helpers import get_query_string, get_client_cert
from db import check_hash, set_name
TLS_CLIENT_HASH = get_client_cert(False) # don't default to response 20
user_name = get_query_string("What is your name?")
user_id = check_hash(TLS_CLIENT_HASH)
set_name(user_id, user_name)
print("# Name Change Page")
print()
print("Your name has been changed.")
print()
print("=> login.cgi Back to main page")
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

15
check.cgi Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
import os
import random
from helpers import get_query_string, get_client_cert,show_header_ok,show_query_string_required
QUERY_STRING=os.getenv("QUERY_STRING")
if not QUERY_STRING:
print("30 guess.cgi",end='\r\n')
show_header_ok()
print(QUERY_STRING)

18
choose.cgi Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
import random
from helpers import get_client_cert,show_redirect
from db import *
from urllib import parse
TLS_CLIENT_HASH = get_client_cert(False)
user_id = check_hash(TLS_CLIENT_HASH)
user_name = get_name(user_id) or "[Not Set]"
url = os.getenv("GEMINI_URL")
game_id=parse.parse_qs(parse.urlparse(url).query)['game_id'][0]
set_current_game(user_id,game_id)
show_redirect("game.cgi")

34
create_schema.sql Normal file
View File

@ -0,0 +1,34 @@
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "certs" (
"hash" TEXT NOT NULL UNIQUE,
"user_id" INTEGER,
PRIMARY KEY("hash"),
FOREIGN KEY("user_id") REFERENCES "users"("id")
);
CREATE TABLE IF NOT EXISTS "users" (
"id" INTEGER NOT NULL UNIQUE,
"name" TEXT,
"add_key_code" TEXT,
"game_id" INTEGER,
"score" INTEGER DEFAULT 0,
PRIMARY KEY("id" AUTOINCREMENT),
FOREIGN KEY("game_id") REFERENCES "games"("id")
);
CREATE TABLE IF NOT EXISTS "games" (
"id" INTEGER NOT NULL UNIQUE,
"board" TEXT,
"words" TEXT,
"graph" TEXT,
"stats" TEXT,
"stime" DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY("id" AUTOINCREMENT)
);
CREATE TABLE IF NOT EXISTS "plays" (
"game_id" INTEGER,
"user_id" INTEGER,
"words" TEXT,
PRIMARY KEY("game_id","user_id"),
FOREIGN KEY("game_id") REFERENCES "games"("id"),
FOREIGN KEY("user_id") REFERENCES "users"("id")
);
COMMIT;

279
db.py Normal file
View File

@ -0,0 +1,279 @@
import sqlite3
from sqlite3 import Error
import os
import urllib.parse
dir_name = os.path.dirname(__file__)
db_name = r"boggle.sqlite3"
db_file = os.path.join(dir_name, db_name)
def create_connection():
""" create a database connection to the SQLite database
specified by db_file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
except Error as e:
print(e)
return conn
def add_user(conn):
"""
Create a new user record
:param conn: database connection object
:return: user id
"""
sql = ''' INSERT INTO users DEFAULT VALUES '''
cur = conn.cursor()
try:
cur.execute(sql)
conn.commit()
except:
return 'error in insert'
return cur.lastrowid
def add_hash(conn, tls_client_hash, user_id):
"""
Create a new record for this cert
:param conn: database connection object
:param tls_client_hash:
:param user_id:
:return: certificate id
"""
sql = ''' INSERT OR REPLACE INTO certs(hash,user_id) VALUES(?,?) '''
cur = conn.cursor()
cur.execute(sql, (tls_client_hash, user_id))
conn.commit()
return cur.lastrowid
def get_user(conn, tls_client_hash):
"""
Get user id matching tls_client_hash if it exists
:param conn: database connection object
:param tls_client_hash:
:return: user id or None
"""
cur = conn.cursor()
cur.execute("SELECT user_id FROM certs WHERE hash=?", (tls_client_hash,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_id
def get_user_by_keycode(conn, key_code):
"""
Get user id of tls_client_hash if it exists
:param conn: database connection object
:param key_code: authorization key code
:return: user id or None
"""
cur = conn.cursor()
cur.execute("SELECT id FROM users WHERE add_key_code=?", (key_code,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_id
def check_hash(tls_client_hash):
"""
Check for existing user with hash or add a new one
:param conn:
:param tls_client_hash:
:return: user id
"""
conn = create_connection()
with conn:
user_id = get_user(conn, tls_client_hash)
if (user_id is None):
user_id = add_user(conn)
add_hash(conn, tls_client_hash, user_id)
return user_id
def get_name(user_id):
"""
Get user name if it exists
:param user_id:
:return: user name
"""
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT name FROM users WHERE id=?", (user_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # user_name
def set_name(user_id, user_name):
"""
Update or set name on user
:param user_id:
:param user_name:
"""
conn = create_connection()
with conn:
sql = ''' UPDATE users SET name=(?) WHERE id=? '''
cur = conn.cursor()
cur.execute(sql, (urllib.parse.unquote(user_name), user_id))
conn.commit()
return cur.lastrowid
def set_add_key_code(user_id, key_code):
"""
Update or set key code on user
:param user_id:
:param key_code:
"""
conn = create_connection()
with conn:
""" There should probably be a check here to ensure no user
has this key_code already
"""
sql = ''' UPDATE users SET add_key_code=(?) WHERE id=? '''
cur = conn.cursor()
cur.execute(sql, (key_code, user_id))
conn.commit()
return cur.lastrowid
def add_cert_to_user(key_code, tls_client_hash):
"""
Check for existing user with hash or add a new one
:param conn:
:param key_code:
:param tls_client_hash:
:return: true if added, false if not found
"""
conn = create_connection()
with conn:
user_id = get_user_by_keycode(conn, key_code)
if (user_id is None):
return False
add_hash(conn, tls_client_hash, user_id)
return True
def create_new_game(conn,board,words,graph):
"""
Create a new game record
:param conn: database connection object
:param board: game board
:param words: found words
:param graph: word guessed list
:return: game id
"""
sql = ''' INSERT INTO games(board,words,graph,stats) VALUES(?,?,?,?) '''
cur = conn.cursor()
cur.execute(sql,
(board,words,graph,"PLAY")
)
conn.commit()
return cur.lastrowid
def set_current_game(user_id,game_id):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("UPDATE users SET game_id=? WHERE id=?", (game_id,user_id))
def get_game_id(user_id):
"""
Get game id if it exists
:param user_id:.
:return: game id
"""
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT game_id FROM users WHERE id=?", (user_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # game_id
def get_play_words(game_id,user_id):
"""
Get game id if it exists
:param user_id:.
:return: game id
"""
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT words FROM plays WHERE game_id=? and user_id=?", (game_id,user_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # game_id
def add_plays(game_id,user_id,words):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("INSERT OR REPLACE INTO plays(game_id,user_id,words) values(?,?,?)", (game_id,user_id,words))
def set_game_graph(game_id,graph):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("UPDATE games SET graph=? WHERE id=?", (graph,game_id))
def get_game_graph(game_id):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT graph FROM games WHERE id=?", (game_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # game_id
def set_game_stats(game_id,stats):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("UPDATE games SET stats=? WHERE id=?", (stats,game_id))
def get_game_stats(game_id):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT stats FROM games WHERE id=?", (game_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # game_id
def add_score(user_id,points):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("UPDATE users SET score=score+? WHERE id=?", (points,user_id))
def get_score(user_id):
conn = create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT score FROM users WHERE id=?", (user_id,))
row = cur.fetchone()
if row is None:
return None
else:
return row[0] # game_id
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

112
game.cgi Executable file
View File

@ -0,0 +1,112 @@
#!/usr/bin/env python3
import random
from helpers import get_client_cert
from db import *
from urllib import parse
TLS_CLIENT_HASH = get_client_cert()
user_id = check_hash(TLS_CLIENT_HASH)
user_name = get_name(user_id) or "[Not Set]"
game_id = get_game_id(user_id) or "[Not Set]"
word=os.getenv("QUERY_STRING").upper()
conn=create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT board,words,graph FROM games WHERE id=?",(game_id,))
row = cur.fetchone()
if row:
board=row[0]
words=row[1]
graph=row[2]
a=board.split(',')
w=words.split(',')
g=graph.split(',')
b=[]
for j in range(4):
b.append([])
for i in range(4):
b[j].append(a[j*4+i])
print("```")
for j in range(4):
for i in range(4):
print(b[j][i],end=" ")
print()
print("```")
if word:
i=-1
for j in range(len(w)):
if w[j]==word:
i=j
break
if i!=-1:
if g[i]=='0':
g[i]='1'
add_plays(game_id,user_id,words)
set_game_graph(game_id,','.join(g))
l=len(word)
if l==3 or l==4: p=1
elif l==5: p=2
elif l==6: p=3
elif l==7: p=5
elif l>=8: p=11
add_score(user_id,p)
score=get_score(user_id)
print("you guessed",word,"plus",p,"points")
s=1
for t in g:
if t=='0':
s=0
break
if s==1:
set_game_stats(game_id,"STOP")
print("game is finished")
else:
print(word,"is already guessed")
else:
print(word,"not found")
print()
print("game:",game_id)
print("username:",user_name)
score=get_score(user_id)
print("your score is",score)
g=get_game_graph(game_id).split(',')
lg=len(g)
c=0
for t in g:
if t=='1':
c+=1
print("words guessed",c,"left",lg-c,"total",lg)
print()
stats=get_game_stats(game_id)
if stats=='PLAY':
print("=> guess.cgi guess a word")
elif stats=='STOP':
print("game is finished")
print("=> lobby.cgi lobby")
print("=> new_game.cgi new game")
print("=> login.cgi login")

13
guess.cgi Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
import os
import random
from helpers import get_query_string, get_client_cert,show_header_ok,show_query_string_required
from urllib import parse
word=os.getenv("QUERY_STRING")
if not word:
word = show_query_string_required("Guess a word?")
print("30 game.cgi?"+word,end='\r\n')

40
helpers.py Normal file
View File

@ -0,0 +1,40 @@
import os
import string
import random
def get_client_cert(ok_if_found = True):
TLS_CLIENT_HASH = os.getenv('TLS_CLIENT_HASH')
if (TLS_CLIENT_HASH is None):
show_header_cert_required()
elif ok_if_found:
show_header_ok()
return TLS_CLIENT_HASH
def get_query_string(msg, ok_if_found = True):
QUERY_STRING = os.getenv('QUERY_STRING')
if(not QUERY_STRING):
show_query_string_required(msg)
elif ok_if_found:
show_header_ok()
return QUERY_STRING
def show_header_ok():
print("20 text/gemini; charset=utf-8", end = "\r\n")
def show_header_cert_required():
print("60 text/gemini; charset=utf-8", end = "\r\n")
quit()
def show_query_string_required(msg):
print("10 " + msg, end = "\r\n")
quit()
def show_redirect(url):
print("30 " + url, end = "\r\n")
quit()
def id_generator(size=12, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

16
index.cgi Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
from helpers import show_header_ok
show_header_ok()
with open('cgi-bin/README.md') as f:
contents = f.read()
print(contents)
print("## Project Source:")
print("=> https://tildegit.org/tomasino/gemspace/src/branch/master/cgi-example CGI Example Source on tildegit.org")
print("")
print("## Get Started")
print("=> login.cgi Login with your client certificate to begin")
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

28
lobby.cgi Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
import random
from helpers import get_client_cert
from db import *
TLS_CLIENT_HASH = get_client_cert()
user_id = check_hash(TLS_CLIENT_HASH)
user_name = get_name(user_id) or "[Not Set]"
print("username:",user_name)
print()
print("=> login.cgi login")
print("=> new_game.cgi new game")
print()
conn=create_connection()
with conn:
cur = conn.cursor()
cur.execute("SELECT * FROM games ORDER BY stime DESC")
rows = cur.fetchall()
for row in rows:
print("=> choose.cgi?game_id="
+str(row[0])+" Game "
+str(row[0]),end="\r\n")

36
login.cgi Executable file
View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
# modules are all in the parent directory
import sys
# sys.path.append('..')
# import helpers from modules
from helpers import get_client_cert
from db import *
TLS_CLIENT_HASH = get_client_cert()
user_id = check_hash(TLS_CLIENT_HASH)
user_name = get_name(user_id) or "[Not Set]"
print("# Welcome: User Logged In")
print()
print("## Your USER ID:")
print(user_id)
print()
print("## Your USER NAME:")
print(user_name)
print("=> change-name.cgi Change User Name")
print()
print("## TLS Certs associated with account")
print("=> update-auth-code.cgi Generate an auth code to add another cert to your account")
print("=> add-cert.cgi Add another cert to your account (with auth code)")
print()
print("=> new_game.cgi new game")
print("=> lobby.cgi lobby")
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai

105
new_game.cgi Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
import random
from helpers import get_query_string, get_client_cert,show_header_ok,show_redirect
from db import check_hash, set_name, create_connection, create_new_game
class Trie:
def __init__(self,letter):
self.letter=letter
self.mark=False
self.next=[None]*26
@staticmethod
def add(root,word):
curr=root
for i in word:
j=i.upper()
k=ord(j)-65
if curr.next[k]==None:
curr.next[k]=Trie(j)
curr=curr.next[k]
curr.mark=True
f=open('cgi-bin/dice.txt','r')
dice=f.read().split('\n')
f.close()
dice=list(filter(None,dice))
random.shuffle(dice)
b=[]
for j in range(4):
b.append([])
for i in range(4):
die=dice[j*4+i]
b[j].append(die[random.randrange(0,len(die))].upper())
def r(s):
return s and len(s)>=3 and len(s)<=16
f=open('cgi-bin/enable.txt','r')
words=f.read().split('\n')
f.close()
words=list(filter(r,words))
trie=Trie(None)
for word in words:
Trie.add(trie,word.upper())
g=[]
for j in range(4):
g.append([])
for i in range(4):
g[j].append(False)
found=[]
l=['' for i in range(16)]
def dfs(x,y,d,trie):
if x<0 or x>=4 or y<0 or y>=4:
return
if g[y][x]:
return
k=ord(b[y][x].upper())-65
t=trie.next[k]
if not t:
return
l[d]=b[y][x].upper()
if t.mark:
w=''.join(l[0:d+1])
if w not in found:
found.append(w)
g[y][x]=True
for j in [-1,0,1]:
for i in [-1,0,1]:
if i!=0 or j!=0:
dfs(x+i,y+j,d+1,t)
g[y][x]=False
for j in range(4):
for i in range(4):
dfs(i,j,0,trie)
found.sort(key=len,reverse=True)
board=[]
for j in range(4):
for i in range(4):
board.append(b[j][i])
bstr=','.join(board)
fstr=','.join(found)
gstr=','.join(['0']*len(found))
conn=create_connection()
game_id=create_new_game(conn,bstr,fstr,gstr)
show_redirect("choose.cgi?game_id="+str(game_id))

28
update-auth-code.cgi Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# modules are all in the parent directory
import sys
# sys.path.append('..')
# import helpers from modules
from helpers import get_client_cert, id_generator
from db import check_hash, set_add_key_code
TLS_CLIENT_HASH = get_client_cert()
user_id = check_hash(TLS_CLIENT_HASH)
key_code = id_generator()
set_add_key_code(user_id, key_code)
print("# Add Cert Page")
print()
print("Your authorization code to add new client certificates has been updated. Visit the 'Add Cert' page from another device or with another key and enter your authorization code to connect the accounts.")
print()
print("## AUTH CODE")
print(key_code)
print("=> add-cert.cgi?" + key_code + " Link to Add Cert page with Auth Code")
print()
print("=> login.cgi Back to main page")
#vim:fenc=utf-8:ts=4:sw=4:sta:noet:sts=4:fdm=marker:ai