Smarter/smarter/auth.py
Arija A. a3a6316054
.html -> .j2 for Jinja2 correctness
Signed-off-by: Arija A. <ari@ari.lt>
2025-05-27 23:34:42 +03:00

262 lines
7.7 KiB
Python

import functools
import hashlib
import re
import secrets
import time
from better_profanity import profanity
from flask import (Blueprint, abort, flash, g, redirect, render_template,
request, session, url_for)
from werkzeug.security import check_password_hash, generate_password_hash
from .db import get_db
from .utility import get_notifications, send_email, validate_email, email_enabled
bp = Blueprint("auth", __name__, url_prefix="/auth")
def login_required(admin=False):
def decorator(f):
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for("auth.login", next=request.url))
elif admin and not g.user["admin"]:
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
@bp.before_app_request
def load_logged_in_user():
"""If a user id is stored in the session, load the user object from
the database into g.user"""
user_id = session.get("user_id")
if user_id is None:
g.user = None
else:
db = get_db()
g.user = db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
# Check if user is an admin
admin = db.execute(
"SELECT 1 FROM admins WHERE user_id = ?", (user_id,)
).fetchone()
if admin is None:
g.user["admin"] = False
else:
g.user["admin"] = True
# Flash any messages left for the user
notifications = get_notifications(user_id)
for notification in notifications:
flash(notification["notification"], notification["category"])
@bp.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("auth/register.j2")
username = request.form["username"]
password = request.form["password"]
email = request.form.get("email") or None
repeat_password = request.form["repeatPassword"]
next = request.form["next"]
error = None
if not username or not password or not repeat_password:
error = "Missing one or more fields"
elif re.search(r"\s", username):
error = "Usernames cannot have whitespace"
elif profanity.contains_profanity(username):
error = "Profanity detected in username"
elif len(password) < 8:
error = "Password is not long enough"
elif password != repeat_password:
error = "Passwords don't match"
elif email and not validate_email(email):
error = "Invalid email"
if error is None:
db = get_db()
try:
# Register the user
user_id = db.execute(
"INSERT INTO users(username, hash, email) VALUES(?, ?, ?)",
(username, generate_password_hash(password), email),
).lastrowid
if email:
send_email(
email,
"Welcome to Smarter!",
f"""Hello, {username}!
Welcome to Smarter: The self-hostable web application for playing trivia games using Flask and the Open Trivia Database API.
You may use this email to reset your password in the future - no future emails will be sent :)
- Team Smarter""",
)
except db.IntegrityError:
error = "Username or email already taken"
except RuntimeError:
error = "E-Mail is disabled"
else:
# Remember user
db.commit()
session.permanent = False
session["user_id"] = user_id
flash("You have successfully registered")
return redirect(next if next else url_for("index"))
flash(error)
# Keep the next argument between requests
if next:
return redirect(url_for("auth.register", next=next))
return render_template("auth/register.j2")
@bp.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("auth/login.j2")
next = request.form["next"]
username = request.form["username"].strip()
password = request.form["password"]
error = None
db = get_db()
user_data = db.execute(
"SELECT id, hash FROM users WHERE username = ?", (username,)
).fetchone()
if user_data is None:
error = "Wrong username"
elif not check_password_hash(user_data["hash"], password):
error = "The password you entered is incorrect"
if error is not None:
flash(error)
# Keep the next argument between requests
if next:
return redirect(url_for("auth.login", next=next))
return render_template("auth/login.j2")
# Remember user
if request.form.get("remember_password"):
session.permanent = True
else:
session.permanent = False
session["user_id"] = user_data["id"]
flash("You have successfully logged-in")
return redirect(next if next else url_for("index"))
@bp.route("/logout")
def logout():
session.clear()
flash("You are now logged out")
return redirect(url_for("index"))
@bp.route("/pwreset", methods=["GET", "POST"])
def pwreset():
if not email_enabled():
abort(404)
if request.method == "GET":
return render_template("auth/pwreset.j2")
db = get_db()
token = request.form.get("token")
newpw = request.form.get("password")
newpw_confirm = request.form.get("repeatPassword")
# Actual password reset
if token and "reset_user_id" in session:
token = token.strip()
error = None
if not newpw:
error = "No password supplied"
elif newpw != newpw_confirm:
error = "Passwords do not match"
elif len(newpw) < 8:
error = "Password does not meet security requirements"
if error is not None:
flash(error)
return redirect(url_for("auth.pwreset"))
if (time.time() - session["reset_ts"]) < 600 and hashlib.sha256(
token.encode()
).digest() == session.get("reset_token_hash"):
db.execute(
"UPDATE users SET hash = ? WHERE id = ?",
(generate_password_hash(newpw), session["reset_user_id"]),
)
db.commit()
else:
session["incorrect_attempts"] += 1
if session["incorrect_attempts"] > 2:
flash("Too many incorrect attempts, generate a new token")
session.clear()
return redirect(url_for("auth.pwreset"))
flash("Invalid token")
return render_template("auth/pwreset-set.j2")
session.clear()
flash("Password has been reset")
return redirect(url_for("auth.login"))
# Password reset request
email = request.form["email"].strip()
user_data = db.execute(
"SELECT username, id FROM users WHERE email = ?", (email,)
).fetchone()
reset_token = secrets.token_hex(3)
if user_data is not None:
send_email(
email,
"Password reset confirmation for Smarter",
f"""Hello, {user_data["username"]}!
You (or someone else) has requested a password reset for your account. Please use the following token to reset your password:
{reset_token}
This token will expire in 10 minutes.
- Team Smarter""",
)
session["reset_token_hash"] = hashlib.sha256(
reset_token.encode()).digest()
session["reset_ts"] = time.time()
session["reset_user_id"] = user_data["id"]
session["incorrect_attempts"] = 0
else:
flash("No user matched this query")
return redirect(url_for("auth.pwreset"))
return render_template("auth/pwreset-set.j2")