TelegramAktienBot/api/api_blueprint_user.py
2022-03-27 17:23:33 +02:00

197 lines
6.4 KiB
Python

import datetime
import os
import jwt
from apiflask import APIBlueprint, abort
from db import db
from helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401
from models import User
from schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema
from auth import auth
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@users_blueprint.route('/users', methods=['GET'])
@users_blueprint.output(UsersSchema(many=True), 200)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get all users", description="Returns all existing users as array")
def users():
abort_if_no_admin()
res = []
for i in User.query.all():
res.append(i.as_dict())
return make_response(res, 200, "Successfully received all users")
@users_blueprint.route('/user', methods=['GET'])
@users_blueprint.output(UsersSchema(), 200)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get current user", description="Returns current user")
def user():
email = get_email_or_abort_401()
res = db.session.query(User).filter_by(email=email).first().as_dict()
return make_response(res, 200, "Successfully received current user data")
@users_blueprint.route('/user/login', methods=['POST'])
@users_blueprint.output(TokenSchema(), 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
def login(data):
check_if_email_data_exists(data)
check_if_password_data_exists(data)
email = data['email']
password = data['password']
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # email doesn't exist
abort(500, message="Unable to login")
if not check_password(query_user.password, password): # Password incorrect
abort(500, message="Unable to login")
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
return make_response({"token": token}, 200, "Successfully logged in")
@users_blueprint.route('/user/register', methods=['POST'])
@users_blueprint.output(UsersSchema(), 200)
@users_blueprint.input(schema=RegisterDataSchema)
@users_blueprint.doc(summary="Register", description="Registers user")
def register(data):
check_if_email_data_exists(data)
check_if_username_data_exists(data)
check_if_password_data_exists(data)
email = data['email']
username = data['username']
password = data['password']
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is not None: # Username already exist
abort(500, message="Email already exist")
new_user = User(
email=email,
username=username,
password=hash_password(password),
admin=False
)
db.session.add(new_user)
db.session.commit()
return make_response(new_user.as_dict(), 200, "Successfully registered user")
@users_blueprint.route('/user', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=UpdateUserDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user")
def update_user(data):
email = get_email_or_abort_401()
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
if "password" in data and data['password'] is not None:
query_user.password = hash_password(data['password'])
if "username" in data and data['username'] is not None:
query_user.username = data['username']
db.session.commit()
return make_response({}, 200, "Successfully updated user")
@users_blueprint.route('/user/setAdmin', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=AdminDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user")
def set_admin(data):
abort_if_no_admin() # Only admin users can do this
check_if_email_data_exists(data)
check_if_admin_data_exists(data)
email = data['email']
admin = data['admin']
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
query_user.admin = admin
db.session.commit()
return make_response({}, 200, "Successfully updated users admin rights")
@users_blueprint.route('/user', methods=['DELETE'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=DeleteUserSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Delete user", description="Deletes user by username")
def delete_user(data):
check_if_email_data_exists(data)
email = data['email']
if email == get_email_or_abort_401(): # Username is same as current user
db.session.query(User).filter_by(email=email).delete()
db.session.commit()
else: # Delete different user than my user -> only admin users
abort_if_no_admin()
db.session.query(User).filter_by(email=email).delete()
db.session.commit()
return make_response({}, 200, "Successfully removed user")
def check_if_email_data_exists(data):
if "email" not in data:
abort(400, message="Email missing")
if data['email'] == "" or data['email'] is None:
abort(400, message="Email missing")
def check_if_password_data_exists(data):
if "password" not in data:
abort(400, message="Password missing")
if data['password'] == "" or data['password'] is None:
abort(400, message="Password missing")
def check_if_username_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
def check_if_admin_data_exists(data):
if "admin" not in data:
abort(400, message="Admin state missing")
if data['admin'] == "" or data['admin'] is None:
abort(400, message="Admin state missing")