262 lines
7.7 KiB
Python
262 lines
7.7 KiB
Python
import functools
|
|
import hashlib
|
|
import re
|
|
import secrets
|
|
import time
|
|
|
|
from better_profanity import profanity
|
|
from flask import (Blueprint, abort, flash, g, redirect, render_template,
|
|
request, session, url_for)
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from .db import get_db
|
|
from .utility import get_notifications, send_email, validate_email, email_enabled
|
|
|
|
bp = Blueprint("auth", __name__, url_prefix="/auth")
|
|
|
|
|
|
def login_required(admin=False):
|
|
def decorator(f):
|
|
@functools.wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for("auth.login", next=request.url))
|
|
elif admin and not g.user["admin"]:
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|
|
|
|
|
|
@bp.before_app_request
|
|
def load_logged_in_user():
|
|
"""If a user id is stored in the session, load the user object from
|
|
the database into g.user"""
|
|
user_id = session.get("user_id")
|
|
|
|
if user_id is None:
|
|
g.user = None
|
|
else:
|
|
db = get_db()
|
|
g.user = db.execute(
|
|
"SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
|
|
# Check if user is an admin
|
|
admin = db.execute(
|
|
"SELECT 1 FROM admins WHERE user_id = ?", (user_id,)
|
|
).fetchone()
|
|
if admin is None:
|
|
g.user["admin"] = False
|
|
else:
|
|
g.user["admin"] = True
|
|
|
|
# Flash any messages left for the user
|
|
notifications = get_notifications(user_id)
|
|
for notification in notifications:
|
|
flash(notification["notification"], notification["category"])
|
|
|
|
|
|
@bp.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if request.method == "GET":
|
|
return render_template("auth/register.j2")
|
|
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
email = request.form.get("email") or None
|
|
repeat_password = request.form["repeatPassword"]
|
|
next = request.form["next"]
|
|
error = None
|
|
|
|
if not username or not password or not repeat_password:
|
|
error = "Missing one or more fields"
|
|
elif re.search(r"\s", username):
|
|
error = "Usernames cannot have whitespace"
|
|
elif profanity.contains_profanity(username):
|
|
error = "Profanity detected in username"
|
|
elif len(password) < 8:
|
|
error = "Password is not long enough"
|
|
elif password != repeat_password:
|
|
error = "Passwords don't match"
|
|
elif email and not validate_email(email):
|
|
error = "Invalid email"
|
|
|
|
if error is None:
|
|
db = get_db()
|
|
try:
|
|
# Register the user
|
|
user_id = db.execute(
|
|
"INSERT INTO users(username, hash, email) VALUES(?, ?, ?)",
|
|
(username, generate_password_hash(password), email),
|
|
).lastrowid
|
|
|
|
if email:
|
|
send_email(
|
|
email,
|
|
"Welcome to Smarter!",
|
|
f"""Hello, {username}!
|
|
|
|
Welcome to Smarter: The self-hostable web application for playing trivia games using Flask and the Open Trivia Database API.
|
|
|
|
You may use this email to reset your password in the future - no future emails will be sent :)
|
|
|
|
- Team Smarter""",
|
|
)
|
|
except db.IntegrityError:
|
|
error = "Username or email already taken"
|
|
except RuntimeError:
|
|
error = "E-Mail is disabled"
|
|
else:
|
|
# Remember user
|
|
db.commit()
|
|
session.permanent = False
|
|
session["user_id"] = user_id
|
|
flash("You have successfully registered")
|
|
return redirect(next if next else url_for("index"))
|
|
|
|
flash(error)
|
|
|
|
# Keep the next argument between requests
|
|
if next:
|
|
return redirect(url_for("auth.register", next=next))
|
|
|
|
return render_template("auth/register.j2")
|
|
|
|
|
|
@bp.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "GET":
|
|
return render_template("auth/login.j2")
|
|
|
|
next = request.form["next"]
|
|
username = request.form["username"].strip()
|
|
password = request.form["password"]
|
|
error = None
|
|
|
|
db = get_db()
|
|
user_data = db.execute(
|
|
"SELECT id, hash FROM users WHERE username = ?", (username,)
|
|
).fetchone()
|
|
|
|
if user_data is None:
|
|
error = "Wrong username"
|
|
elif not check_password_hash(user_data["hash"], password):
|
|
error = "The password you entered is incorrect"
|
|
|
|
if error is not None:
|
|
flash(error)
|
|
|
|
# Keep the next argument between requests
|
|
if next:
|
|
return redirect(url_for("auth.login", next=next))
|
|
|
|
return render_template("auth/login.j2")
|
|
|
|
# Remember user
|
|
if request.form.get("remember_password"):
|
|
session.permanent = True
|
|
else:
|
|
session.permanent = False
|
|
session["user_id"] = user_data["id"]
|
|
|
|
flash("You have successfully logged-in")
|
|
|
|
return redirect(next if next else url_for("index"))
|
|
|
|
|
|
@bp.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
flash("You are now logged out")
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@bp.route("/pwreset", methods=["GET", "POST"])
|
|
def pwreset():
|
|
if not email_enabled():
|
|
abort(404)
|
|
|
|
if request.method == "GET":
|
|
return render_template("auth/pwreset.j2")
|
|
|
|
db = get_db()
|
|
|
|
token = request.form.get("token")
|
|
newpw = request.form.get("password")
|
|
newpw_confirm = request.form.get("repeatPassword")
|
|
|
|
# Actual password reset
|
|
|
|
if token and "reset_user_id" in session:
|
|
token = token.strip()
|
|
error = None
|
|
|
|
if not newpw:
|
|
error = "No password supplied"
|
|
elif newpw != newpw_confirm:
|
|
error = "Passwords do not match"
|
|
elif len(newpw) < 8:
|
|
error = "Password does not meet security requirements"
|
|
|
|
if error is not None:
|
|
flash(error)
|
|
return redirect(url_for("auth.pwreset"))
|
|
|
|
if (time.time() - session["reset_ts"]) < 600 and hashlib.sha256(
|
|
token.encode()
|
|
).digest() == session.get("reset_token_hash"):
|
|
db.execute(
|
|
"UPDATE users SET hash = ? WHERE id = ?",
|
|
(generate_password_hash(newpw), session["reset_user_id"]),
|
|
)
|
|
db.commit()
|
|
else:
|
|
session["incorrect_attempts"] += 1
|
|
if session["incorrect_attempts"] > 2:
|
|
flash("Too many incorrect attempts, generate a new token")
|
|
session.clear()
|
|
return redirect(url_for("auth.pwreset"))
|
|
|
|
flash("Invalid token")
|
|
return render_template("auth/pwreset-set.j2")
|
|
|
|
session.clear()
|
|
flash("Password has been reset")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# Password reset request
|
|
|
|
email = request.form["email"].strip()
|
|
user_data = db.execute(
|
|
"SELECT username, id FROM users WHERE email = ?", (email,)
|
|
).fetchone()
|
|
|
|
reset_token = secrets.token_hex(3)
|
|
|
|
if user_data is not None:
|
|
send_email(
|
|
email,
|
|
"Password reset confirmation for Smarter",
|
|
f"""Hello, {user_data["username"]}!
|
|
|
|
You (or someone else) has requested a password reset for your account. Please use the following token to reset your password:
|
|
|
|
{reset_token}
|
|
|
|
This token will expire in 10 minutes.
|
|
|
|
- Team Smarter""",
|
|
)
|
|
|
|
session["reset_token_hash"] = hashlib.sha256(
|
|
reset_token.encode()).digest()
|
|
session["reset_ts"] = time.time()
|
|
session["reset_user_id"] = user_data["id"]
|
|
session["incorrect_attempts"] = 0
|
|
else:
|
|
flash("No user matched this query")
|
|
return redirect(url_for("auth.pwreset"))
|
|
|
|
return render_template("auth/pwreset-set.j2")
|