__author__ = "Florian Kaiser" __copyright__ = "Copyright 2022, Project Aktienbot" __credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"] __license__ = "GPL 3.0" __version__ = "1.0.0" import datetime import os import jwt from apiflask import APIBlueprint, abort from app.auth import auth from app.db import database as db from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401, get_user from app.models import User from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema, CronDataSchema from flask import current_app users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @users_blueprint.route('/users', methods=['GET']) @users_blueprint.output(UsersSchema(many=True), 200) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Get all users", description="Returns all existing users as array") def users(): abort_if_no_admin() res = [] # Query all users and convert them to dicts for i in User.query.all(): res.append(i.as_dict()) return make_response(res, 200, "Successfully received all users") @users_blueprint.route('/user', methods=['GET']) @users_blueprint.output(UsersSchema(), 200) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Get current user", description="Returns current user") def user(): email = get_email_or_abort_401() # Query current user query_user = get_user(email) return make_response(query_user.as_dict(), 200, "Successfully received current user data") @users_blueprint.route('/user/login', methods=['POST']) @users_blueprint.output(TokenSchema(), 200) @users_blueprint.input(schema=LoginDataSchema) @users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error") def login(data): # Check if required data is available if not check_if_password_data_exists(data): abort(400, "Password missing") if not check_if_email_data_exists(data): abort(400, "Email missing") # Query current user query_user = get_user(data['email']) # Check if password matches if not check_password(query_user.password, data['password'].encode("utf-8")): # Password incorrect abort(500, message="Unable to login") # Check if user is bot if query_user.email == current_app.config['BOT_EMAIL']: # Set bot token valid for 1 year token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, current_app.config['SECRET_KEY'], "HS256") else: # Set token valid for 1 day token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, current_app.config['SECRET_KEY'], "HS256") return make_response({"token": token}, 200, "Successfully logged in") @users_blueprint.route('/user/register', methods=['POST']) @users_blueprint.output(UsersSchema(), 200) @users_blueprint.input(schema=RegisterDataSchema) @users_blueprint.doc(summary="Register", description="Registers user") def register(data): # Check if required data is available if not check_if_email_data_exists(data): abort(400, "Email missing") if not check_if_username_data_exists(data): abort(400, "Username missing") if not check_if_password_data_exists(data): abort(400, "Password missing") # Check if user already exists query_user = db.session.query(User).filter_by(email=data['email']).first() if query_user is not None: abort(500, message="Email already exist") # Add user to database new_user = User( email=data['email'], username=data['username'], password=hash_password(data['password']), admin=False, cron="0 8 * * *" ) db.session.add(new_user) db.session.commit() return make_response(new_user.as_dict(), 200, "Successfully registered user") @users_blueprint.route('/user', methods=['PUT']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=UpdateUserDataSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user") def update_user(data): email = get_email_or_abort_401() # Query current user query_user = get_user(email) # Check if password data is available -> if, change password if check_if_password_data_exists(data): query_user.password = hash_password(data['password']) # Check if username data is available -> if, change username if check_if_username_data_exists(data): query_user.username = data['username'] db.session.commit() return make_response({}, 200, "Successfully updated user") @users_blueprint.route('/user/setAdmin', methods=['PUT']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=AdminDataSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user") def set_admin(data): abort_if_no_admin() # Only admin users can do this # Check if required data is available if not check_if_email_data_exists(data): abort(400, "Email missing") if not check_if_admin_data_exists(data): abort(400, "Admin data missing") # Get user by email query_user = get_user(data['email']) # Update user admin state query_user.admin = data['admin'] db.session.commit() return make_response({}, 200, "Successfully updated users admin rights") @users_blueprint.route('/user/setCron', methods=['PUT']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=CronDataSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Set update cron", description="Set update cron of specified user") def set_cron(data): email = get_email_or_abort_401() # Check if required data is available if not check_if_cron_data_exists(data): abort(400, "Cron data missing") # Update user cron get_user(email).cron = data['cron'] db.session.commit() return make_response({}, 200, "Successfully updated users cron") @users_blueprint.route('/user', methods=['DELETE']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=DeleteUserSchema) @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Delete user", description="Deletes user by username") def delete_user(data): # Check if required data is available if not check_if_email_data_exists(data): abort(400, "Email missing") # Check if email to delete is current user # -> if, delete user # -> if not, check if user is admin # -> if, delete user # -> else, abort if data['email'] == get_email_or_abort_401(): # Username is same as current user db.session.query(User).filter_by(email=data['email']).delete() db.session.commit() else: abort_if_no_admin() db.session.query(User).filter_by(email=data['email']).delete() db.session.commit() return make_response({}, 200, "Successfully removed user") def check_if_email_data_exists(data): if "email" not in data: return False if data['email'] == "" or data['email'] is None: return False return True def check_if_password_data_exists(data): if "password" not in data: return False if data['password'] == "" or data['password'] is None: return False return True def check_if_username_data_exists(data): if "username" not in data: return False if data['username'] == "" or data['username'] is None: return False return True def check_if_admin_data_exists(data): if "admin" not in data: return False if data['admin'] == "" or data['admin'] is None: return False return True def check_if_cron_data_exists(data): if "cron" not in data: return False if data['cron'] == "" or data['cron'] is None: return False return True