from urllib.parse import urljoin, urlparse from flask import Blueprint, flash, g, render_template, request, url_for from flask_login import current_user, login_user, logout_user from werkzeug.utils import redirect from .. import ldap_service, login_manager, messages from ..database import db from ..forms.login import LoginForm from ..ldap_authentication.authenticator import NAME, SURNAME, TITLE from ..models.administration import AuthenticatedUser bp = Blueprint("auth", __name__) @bp.before_request def get_current_user(): g.user = current_user @login_manager.user_loader def load_user(user_id): return AuthenticatedUser.query.get(user_id) @bp.route("/login", methods=("GET", "POST")) @bp.route("/", methods=("GET", "POST")) def login(): next_pg = request.args.get("next") # Redirect to correct home page (according to role) if request.method == "GET" and current_user.is_authenticated: return redirect(to_next_page_or_home(next_pg)) # Perform login form = LoginForm(request.form) if request.method == "POST" and form.validate(): username, password = ( request.form.get("username").lower(), request.form.get("password"), ) attributes = ldap_service.authenticate(username, password) if attributes is None: flash(messages.LOGIN_UNSUCCESSFUL_ERROR) return render_template("pages/login.html", form=form) username = attributes["name"] user = AuthenticatedUser.query.filter_by(username=username).first() if not user: user = AuthenticatedUser( username=username, firstname=attributes.get(NAME, username.upper()), surname=attributes.get(SURNAME, ""), role=normalize_role(attributes.get(TITLE)), ) db.session.add(user) db.session.commit() login_user(user) return redirect(to_next_page_or_home(next_pg)) if form.errors: flash(messages.LOGIN_UNSUCCESSFUL_ERROR) # User not authenticated tries to 'GET' or 'POST' invalid form return render_template("pages/login.html", form=form) @bp.route("/logout") def logout(): if not current_user.is_anonymous: db.session.delete(AuthenticatedUser.query.get(current_user.username)) db.session.commit() logout_user() return redirect(url_for("auth.login")) ################################################################## # U T I L I T I E S ################################################################## def normalize_role(ldap_role): return "student" if ldap_role in {"Student", "PGT", "Casual"} else "staff" def to_next_page_or_home(next_pg): if next_pg and is_safe_url(request.host_url, next_pg): return next_pg else: return url_for(f"{current_user.role}.projects") def is_safe_url(request_host_url, target): ref_url = urlparse(request_host_url) test_url = urlparse(urljoin(request_host_url, target)) return test_url.scheme in ("http", "https") and ref_url.netloc == test_url.netloc