from urllib.parse import urlparse, urljoin import ldap from flask import request, url_for, g, Blueprint, flash, render_template from flask_login import current_user, logout_user, login_user from werkzeug.utils import redirect from .. import messages, login_manager, ldap_service from ..database import db from ..forms import LoginForm from ..ldap_authentication.authenticator import NAME, SURNAME from ..models import AuthenticatedUser bp = Blueprint("auth", __name__) @bp.before_request def get_current_user(): g.user = current_user @login_manager.user_loader def load_user(user_id): return AuthenticatedUser.query.get(user_id) @bp.route("/login", methods=("GET", "POST")) @bp.route("/", methods=("GET", "POST")) def login(): next_pg = request.args.get("next") # Redirect to correct home page (according to role) if request.method == "GET" and current_user.is_authenticated: return redirect(to_next_page_or_home(next_pg)) # Perform login form = LoginForm(request.form) if request.method == "POST" and form.validate(): username, password = ( request.form.get("username").lower(), request.form.get("password"), ) attributes = ldap_service.authenticate(username, password) if attributes is None: flash(messages.LOGIN_UNSUCCESSFUL_ERROR) return render_template("pages/login.html", form=form) user = AuthenticatedUser.query.filter_by(username=username).first() if not user: user = AuthenticatedUser( username=username, firstname=attributes.get(NAME, username.upper()), surname=attributes.get(SURNAME, ""), ) db.session.add(user) db.session.commit() login_user(user) return redirect(to_next_page_or_home(next_pg)) if form.errors: flash(messages.LOGIN_UNSUCCESSFUL_ERROR) # User not authenticated tries to 'GET' or 'POST' invalid form return render_template("pages/login.html", form=form) @bp.route("/logout") def logout(): if not current_user.is_anonymous: db.session.delete(AuthenticatedUser.query.get(current_user.username)) db.session.commit() logout_user() return redirect(url_for("auth.login")) ################################################################## # U T I L I T I E S ################################################################## def to_next_page_or_home(next_pg): if next_pg and is_safe_url(request.host_url, next_pg): return next_pg else: return url_for("staff.projects") def is_safe_url(request_host_url, target): ref_url = urlparse(request_host_url) test_url = urlparse(urljoin(request_host_url, target)) return test_url.scheme in ("http", "https") and ref_url.netloc == test_url.netloc