259 lines
8.1 KiB
Python
259 lines
8.1 KiB
Python
__author__ = "Florian Kaiser"
|
|
__copyright__ = "Copyright 2022, Project Aktienbot"
|
|
__credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"]
|
|
__license__ = "GPL 3.0"
|
|
__version__ = "1.0.0"
|
|
|
|
import datetime
|
|
import os
|
|
|
|
import jwt
|
|
from apiflask import APIBlueprint, abort
|
|
from app.auth import auth
|
|
from app.db import database as db
|
|
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401, get_user
|
|
from app.models import User
|
|
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema, CronDataSchema
|
|
from flask import current_app
|
|
|
|
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
|
|
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
|
|
|
|
|
@users_blueprint.route('/users', methods=['GET'])
|
|
@users_blueprint.output(UsersSchema(many=True), 200)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Get all users", description="Returns all existing users as array")
|
|
def users():
|
|
abort_if_no_admin()
|
|
|
|
res = []
|
|
|
|
# Query all users and convert them to dicts
|
|
for i in User.query.all():
|
|
res.append(i.as_dict())
|
|
|
|
return make_response(res, 200, "Successfully received all users")
|
|
|
|
|
|
@users_blueprint.route('/user', methods=['GET'])
|
|
@users_blueprint.output(UsersSchema(), 200)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Get current user", description="Returns current user")
|
|
def user():
|
|
email = get_email_or_abort_401()
|
|
|
|
# Query current user
|
|
query_user = get_user(email)
|
|
|
|
return make_response(query_user.as_dict(), 200, "Successfully received current user data")
|
|
|
|
|
|
@users_blueprint.route('/user/login', methods=['POST'])
|
|
@users_blueprint.output(TokenSchema(), 200)
|
|
@users_blueprint.input(schema=LoginDataSchema)
|
|
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
|
|
def login(data):
|
|
# Check if required data is available
|
|
if not check_if_password_data_exists(data):
|
|
abort(400, "Password missing")
|
|
|
|
if not check_if_email_data_exists(data):
|
|
abort(400, "Email missing")
|
|
|
|
# Query current user
|
|
query_user = get_user(data['email'])
|
|
|
|
# Check if password matches
|
|
if not check_password(query_user.password, data['password'].encode("utf-8")): # Password incorrect
|
|
abort(500, message="Unable to login")
|
|
|
|
# Check if user is bot
|
|
if query_user.email == current_app.config['BOT_EMAIL']:
|
|
# Set bot token valid for 1 year
|
|
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, current_app.config['SECRET_KEY'], "HS256")
|
|
else:
|
|
# Set token valid for 1 day
|
|
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, current_app.config['SECRET_KEY'], "HS256")
|
|
|
|
return make_response({"token": token}, 200, "Successfully logged in")
|
|
|
|
|
|
@users_blueprint.route('/user/register', methods=['POST'])
|
|
@users_blueprint.output(UsersSchema(), 200)
|
|
@users_blueprint.input(schema=RegisterDataSchema)
|
|
@users_blueprint.doc(summary="Register", description="Registers user")
|
|
def register(data):
|
|
# Check if required data is available
|
|
if not check_if_email_data_exists(data):
|
|
abort(400, "Email missing")
|
|
|
|
if not check_if_username_data_exists(data):
|
|
abort(400, "Username missing")
|
|
|
|
if not check_if_password_data_exists(data):
|
|
abort(400, "Password missing")
|
|
|
|
# Check if user already exists
|
|
query_user = db.session.query(User).filter_by(email=data['email']).first()
|
|
if query_user is not None:
|
|
abort(500, message="Email already exist")
|
|
|
|
# Add user to database
|
|
new_user = User(
|
|
email=data['email'],
|
|
username=data['username'],
|
|
password=hash_password(data['password']),
|
|
admin=False,
|
|
cron="0 8 * * *"
|
|
)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
return make_response(new_user.as_dict(), 200, "Successfully registered user")
|
|
|
|
|
|
@users_blueprint.route('/user', methods=['PUT'])
|
|
@users_blueprint.output({}, 200)
|
|
@users_blueprint.input(schema=UpdateUserDataSchema)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user")
|
|
def update_user(data):
|
|
email = get_email_or_abort_401()
|
|
|
|
# Query current user
|
|
query_user = get_user(email)
|
|
|
|
# Check if password data is available -> if, change password
|
|
if check_if_password_data_exists(data):
|
|
query_user.password = hash_password(data['password'])
|
|
|
|
# Check if username data is available -> if, change username
|
|
if check_if_username_data_exists(data):
|
|
query_user.username = data['username']
|
|
|
|
db.session.commit()
|
|
|
|
return make_response({}, 200, "Successfully updated user")
|
|
|
|
|
|
@users_blueprint.route('/user/setAdmin', methods=['PUT'])
|
|
@users_blueprint.output({}, 200)
|
|
@users_blueprint.input(schema=AdminDataSchema)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user")
|
|
def set_admin(data):
|
|
abort_if_no_admin() # Only admin users can do this
|
|
|
|
# Check if required data is available
|
|
if not check_if_email_data_exists(data):
|
|
abort(400, "Email missing")
|
|
|
|
if not check_if_admin_data_exists(data):
|
|
abort(400, "Admin data missing")
|
|
|
|
# Get user by email
|
|
query_user = get_user(data['email'])
|
|
|
|
# Update user admin state
|
|
query_user.admin = data['admin']
|
|
db.session.commit()
|
|
|
|
return make_response({}, 200, "Successfully updated users admin rights")
|
|
|
|
|
|
@users_blueprint.route('/user/setCron', methods=['PUT'])
|
|
@users_blueprint.output({}, 200)
|
|
@users_blueprint.input(schema=CronDataSchema)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Set update cron", description="Set update cron of specified user")
|
|
def set_cron(data):
|
|
email = get_email_or_abort_401()
|
|
|
|
# Check if required data is available
|
|
if not check_if_cron_data_exists(data):
|
|
abort(400, "Cron data missing")
|
|
|
|
# Update user cron
|
|
get_user(email).cron = data['cron']
|
|
db.session.commit()
|
|
|
|
return make_response({}, 200, "Successfully updated users cron")
|
|
|
|
|
|
@users_blueprint.route('/user', methods=['DELETE'])
|
|
@users_blueprint.output({}, 200)
|
|
@users_blueprint.input(schema=DeleteUserSchema)
|
|
@users_blueprint.auth_required(auth)
|
|
@users_blueprint.doc(summary="Delete user", description="Deletes user by username")
|
|
def delete_user(data):
|
|
# Check if required data is available
|
|
if not check_if_email_data_exists(data):
|
|
abort(400, "Email missing")
|
|
|
|
# Check if email to delete is current user
|
|
# -> if, delete user
|
|
# -> if not, check if user is admin
|
|
# -> if, delete user
|
|
# -> else, abort
|
|
if data['email'] == get_email_or_abort_401(): # Username is same as current user
|
|
db.session.query(User).filter_by(email=data['email']).delete()
|
|
db.session.commit()
|
|
else:
|
|
abort_if_no_admin()
|
|
|
|
db.session.query(User).filter_by(email=data['email']).delete()
|
|
db.session.commit()
|
|
|
|
return make_response({}, 200, "Successfully removed user")
|
|
|
|
|
|
def check_if_email_data_exists(data):
|
|
if "email" not in data:
|
|
return False
|
|
|
|
if data['email'] == "" or data['email'] is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_if_password_data_exists(data):
|
|
if "password" not in data:
|
|
return False
|
|
|
|
if data['password'] == "" or data['password'] is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_if_username_data_exists(data):
|
|
if "username" not in data:
|
|
return False
|
|
|
|
if data['username'] == "" or data['username'] is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_if_admin_data_exists(data):
|
|
if "admin" not in data:
|
|
return False
|
|
|
|
if data['admin'] == "" or data['admin'] is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_if_cron_data_exists(data):
|
|
if "cron" not in data:
|
|
return False
|
|
|
|
if data['cron'] == "" or data['cron'] is None:
|
|
return False
|
|
|
|
return True
|