import datetime import os import jwt from apiflask import APIBlueprint, abort from flask import jsonify from db import db from helper_functions import check_password, hash_password, get_username_or_abort_401, abort_if_no_admin from models import User from scheme import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema from auth import auth users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @users_blueprint.route('/users', methods=['GET']) @users_blueprint.output(UsersSchema(many=True), 200) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Get all users", description="Returns all existing users as array") def users(): abort_if_no_admin() res = [] for i in User.query.all(): res.append(i.as_dict()) return jsonify({"status": 200, "data": res}) @users_blueprint.route('/user', methods=['GET']) @users_blueprint.output(UsersSchema(), 200) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Get current user", description="Returns current user") def user(): username = get_username_or_abort_401() res = db.session.query(User).filter_by(username=username).first().as_dict() return jsonify({"status": 200, "data": res}) @users_blueprint.route('/user/login', methods=['POST']) @users_blueprint.output(TokenSchema(), 200) @users_blueprint.input(schema=LoginDataSchema) @users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error") def login(data): check_if_user_data_exists(data) username = data['username'] password = data['password'] query_user = db.session.query(User).filter_by(username=username).first() if query_user is None: # Username doesn't exist abort(500, message="Unable to login") if not check_password(query_user.password, password): # Password incorrect abort(500, message="Unable to login") token = jwt.encode({'username': query_user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256") return jsonify({"status": 200, "text": "Successfully logged in", "data": {"token": token}}) @users_blueprint.route('/user/register', methods=['POST']) @users_blueprint.output(UsersSchema(), 200) @users_blueprint.input(schema=LoginDataSchema) @users_blueprint.doc(summary="Register", description="Registers user") def register(data): check_if_user_data_exists(data) username = data['username'] password = data['password'] query_user = db.session.query(User).filter_by(username=username).first() if query_user is not None: # Username already exist abort(500, message="Username already exist") new_user = User( username=username, password=hash_password(password), admin=False ) db.session.add(new_user) db.session.commit() return jsonify({"status": 200, "text": "Successfully registered user", "data": new_user.as_dict()}) @users_blueprint.route('/user', methods=['PUT']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=LoginDataSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user") def update_user(data): username = get_username_or_abort_401() check_if_user_data_exists(data) new_username = data['username'] new_password = data['password'] query_user = db.session.query(User).filter_by(username=username).first() if query_user is None: # Username doesn't exist abort(500, message="Unable to login") if new_password is not None: query_user.password = hash_password(new_password) if new_username is not None: query_user.username = new_username db.session.commit() return jsonify({"status": 200, "text": "Successfully updated user", "data": {}}) @users_blueprint.route('/user/setAdmin', methods=['PUT']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=AdminDataSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user") def set_admin(data): abort_if_no_admin() # Only admin users can do this check_if_admin_data_exists(data) username = data['username'] admin = data['admin'] query_user = db.session.query(User).filter_by(username=username).first() if query_user is None: # Username doesn't exist abort(500, message="Unable to login") query_user.admin = admin db.session.commit() return jsonify({"status": 200, "text": "Successfully updated users admin rights", "data": {}}) @users_blueprint.route('/user', methods=['DELETE']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=DeleteUserSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Delete user", description="Deletes user by username") def delete_user(data): check_if_delete_data_exists(data) username = data['username'] if username == get_username_or_abort_401(): # Username is same as current user db.session.query(User).filter_by(username=username).delete() db.session.commit() else: # Delete different user than my user -> only admin users abort_if_no_admin() db.session.query(User).filter_by(username=username).delete() db.session.commit() return jsonify({"status": 200, "text": "Successfully removed user", "data": {}}) def check_if_user_data_exists(data): if "username" not in data: abort(400, message="Username missing") if data['username'] == "" or data['username'] is None: abort(400, message="Username missing") if "password" not in data: abort(400, message="Password missing") if data['password'] == "" or data['password'] is None: abort(400, message="Password missing") def check_if_admin_data_exists(data): if "username" not in data: abort(400, message="Username missing") if data['username'] == "" or data['username'] is None: abort(400, message="Username missing") if "admin" not in data: abort(400, message="Admin state missing") if data['admin'] == "" or data['admin'] is None: abort(400, message="Admin state missing") def check_if_delete_data_exists(data): if "username" not in data: abort(400, message="Username missing") if data['username'] == "" or data['username'] is None: abort(400, message="Username missing")