22_sessions/server_secure_jwt.py
"""Serve data from data model layer with JWT authentication."""
from flask import Flask, abort, make_response, redirect, request, url_for, jsonify
from flask_cors import CORS
from pathlib import Path
import jwt
from datetime import datetime, timedelta, timezone
import os
import models
import views
from util import AppException, HTTP_400_BAD_REQUEST, encrypt_password, get_secret
COOKIE_NAME = "webonomicon"
HEARTBEAT = {"message": "alive"}
JWT_SECRET = os.environ.get("JWT_SECRET")
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_DELTA = timedelta(hours=1)
def create_app():
"""Build application and configure routes."""
app = Flask("server", static_folder=Path("../static").absolute(), static_url_path="/static")
CORS(app)
secret = get_secret()
def create_token(staff_id):
payload = {
"exp": datetime.now(timezone.utc) + JWT_EXPIRATION_DELTA,
"iat": datetime.now(timezone.utc),
"staffId": staff_id
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload["staffId"]
except jwt.ExpiredSignatureError:
return None # Token has expired
except jwt.InvalidTokenError:
return None # Invalid token
@app.get("/")
def root():
try:
return views.all_staff(models.all_staff())
except AppException as exc:
abort(HTTP_400_BAD_REQUEST, f"Error serving all staff: {exc}")
@app.post("/login")
def login_handler():
"""Accept password and go back home."""
username = request.form["username"]
password = request.form["password"]
response = make_response(redirect(url_for("root")))
if (not username) or (not password):
response.set_cookie(COOKIE_NAME, "", expires=0)
return response
password = encrypt_password(secret, password)
staff_id = models.authenticate(username, password)
if staff_id is None:
response.set_cookie(COOKIE_NAME, "", expires=0)
return response
token = create_token(staff_id)
response.set_cookie(COOKIE_NAME, token, samesite="Lax")
return response
@app.get("/exp/<staff_id>")
def exp(staff_id):
staff_id = int(staff_id)
token = request.cookies.get(COOKIE_NAME)
if not token:
return jsonify({"error": "Not authenticated"}), 401
decoded_staff_id = decode_token(token)
if decoded_staff_id is None:
return jsonify({"error": "Invalid or expired token"}), 401
try:
if int(decoded_staff_id) == staff_id:
return views.experiments(models.experiments(staff_id), staff_id)
else:
return jsonify({"error": "Not authorized"}), 403
except AppException as exc:
abort(HTTP_400_BAD_REQUEST, f"Error serving experiments for {staff_id}: {exc}")
@app.get("/heartbeat")
def heartbeat():
try:
return views.heartbeat(HEARTBEAT)
except AppException as exc:
abort(HTTP_400_BAD_REQUEST, f"Error serving heartbeat: {exc}")
return app