110 lines
2.8 KiB
Python
110 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Sample SQLAlchemy application"""
|
|
|
|
from typing import Any, Final, Optional
|
|
from warnings import filterwarnings as filter_warnings
|
|
|
|
from flask import Flask, redirect, render_template_string, request, url_for
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
|
|
|
|
def create_app() -> Flask:
|
|
"""Create and configure a Flask app with SQLAlchemy"""
|
|
|
|
app: Flask = Flask(__name__)
|
|
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db"
|
|
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {"pool_pre_ping": True}
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
|
|
return app
|
|
|
|
|
|
app: Flask = create_app()
|
|
db: SQLAlchemy = SQLAlchemy(app)
|
|
|
|
|
|
class User(db.Model):
|
|
"""User model representing users table"""
|
|
|
|
id: int = db.Column(db.Integer, primary_key=True)
|
|
username: str = db.Column(db.String(80), unique=True, nullable=False)
|
|
email: str = db.Column(db.String(120), unique=True, nullable=False)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<User {self.username}>"
|
|
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
|
|
FORM_HTML: Final[
|
|
str
|
|
] = """
|
|
<h2>Create User</h2>
|
|
<form method="POST" action="/create_user">
|
|
Username: <input type="text" name="username" required><br>
|
|
Email: <input type="email" name="email" required><br>
|
|
<input type="submit" value="Create">
|
|
</form>
|
|
<hr>
|
|
<a href="/">View Users</a>
|
|
"""
|
|
|
|
|
|
@app.route("/")
|
|
def index() -> str:
|
|
"""List all users"""
|
|
|
|
users_list: str = "<br>".join(
|
|
f"{user.id}: {user.username} ({user.email})" for user in User.query.all()
|
|
)
|
|
|
|
return f'<h1>Users</h1>{users_list}<hr><a href="/create_user">Add new user</a>'
|
|
|
|
|
|
@app.route("/create_user", methods=["GET", "POST"])
|
|
def create_user() -> Any:
|
|
"""Display form and handle user creation"""
|
|
|
|
if request.method == "GET":
|
|
return render_template_string(FORM_HTML)
|
|
|
|
username: Optional[str] = request.form.get("username")
|
|
email: Optional[str] = request.form.get("email")
|
|
|
|
if not username or not email:
|
|
return "Username and email are required.", 400
|
|
|
|
existing_user: Optional[User] = User.query.filter(
|
|
(User.username == username) | (User.email == email)
|
|
).first()
|
|
|
|
if existing_user is not None:
|
|
return (
|
|
"Username or email already exists. Please choose another.",
|
|
400,
|
|
)
|
|
|
|
new_user: User = User(username=username, email=email)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
def main() -> int:
|
|
"""Entry / main function"""
|
|
|
|
app.run("127.0.0.1", 8080, True)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
assert main.__annotations__.get("return") is int, "main() should return an integer"
|
|
|
|
filter_warnings("error", category=Warning)
|
|
raise SystemExit(main())
|