98_fasthtml/models.py
"""Get data from database."""
import os
from pypika import Query, Table
import sqlite3
from util import ModelException, dict_factory
ENV_VAR = "DATA"
def connect():
"""Connect to database."""
path = os.getenv(ENV_VAR)
if not path:
raise ModelException(f"Environment variable {ENV_VAR} not set")
connection = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES)
connection.row_factory = dict_factory
return connection
def all_staff():
"""Get all staff."""
staff = Table("staff")
query = Query.from_(staff).select("staff_id", "personal", "family")
try:
connection = connect()
cursor = connection.execute(str(query))
return cursor.fetchall()
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))
def experiments(staff_id):
"""Get all experiments performed by staff member."""
performed = Table("performed")
experiment = Table("experiment")
query = Query.from_(experiment)\
.inner_join(performed)\
.on(experiment.sample_id == performed.sample_id)\
.where(performed.staff_id == staff_id)\
.select(experiment.star)
try:
connection = connect()
cursor = connection.execute(str(query))
return cursor.fetchall()
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))
def authenticate(username, password):
"""Try to authenticate, returning staff ID or None."""
staff = Table("staff")
query = Query.from_(staff)\
.select(staff.staff_id)\
.where((staff.username == username) & (staff.password == password))
try:
connection = connect()
cursor = connection.execute(str(query))
result = cursor.fetchall()
return result[0]["staff_id"] if (len(result) == 1) else None
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))
def create_session(cookie, staff_id):
"""Create a new session."""
sessions = Table("sessions")
query = Query.into(sessions).columns("cookie", "staff_id").insert(cookie, staff_id)
try:
connection = connect()
connection.execute(str(query))
connection.commit()
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))
def get_session(cookie):
"""Get session by cookie."""
sessions = Table("sessions")
query = Query.from_(sessions).select("staff_id").where(sessions.cookie == cookie)
try:
connection = connect()
cursor = connection.execute(str(query))
result = cursor.fetchone()
return result["staff_id"] if result else None
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))
def delete_session(cookie):
"""Delete a session."""
sessions = Table("sessions")
query = Query.from_(sessions).delete().where(sessions.cookie == cookie)
try:
connection = connect()
connection.execute(str(query))
connection.commit()
except sqlite3.DatabaseError as exc:
raise ModelException(str(exc))