Added api endpoints:

- delete user
- get current user
- update user password and username
- set admin rights
- refactoring
This commit is contained in:
Administrator 2022-03-17 11:05:28 +01:00
parent 290672cee4
commit 317a585f52
4 changed files with 163 additions and 19 deletions

View File

@ -6,9 +6,9 @@ from apiflask import APIBlueprint, abort
from flask import jsonify from flask import jsonify
from db import db from db import db
from helper_functions import check_password, hash_password from helper_functions import check_password, hash_password, get_username_or_abort_401, abort_if_no_admin
from models import User from models import User
from scheme import UsersSchema, Token, LoginData from scheme import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema
from auth import auth from auth import auth
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
@ -20,6 +20,8 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file
@users_blueprint.auth_required(auth) @users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get all users", description="Returns all existing users as array") @users_blueprint.doc(summary="Get all users", description="Returns all existing users as array")
def users(): def users():
abort_if_no_admin()
res = [] res = []
for i in User.query.all(): for i in User.query.all():
res.append(i.as_dict()) res.append(i.as_dict())
@ -27,9 +29,21 @@ def users():
return jsonify({"status": 200, "data": res}) return jsonify({"status": 200, "data": res})
@users_blueprint.route('/login', methods=['POST']) @users_blueprint.route('/user', methods=['GET'])
@users_blueprint.output(Token(), 200) @users_blueprint.output(UsersSchema(), 200)
@users_blueprint.input(schema=LoginData) @users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get current user", description="Returns current user")
def user():
username = get_username_or_abort_401()
res = db.session.query(User).filter_by(username=username).first().as_dict()
return jsonify({"status": 200, "data": res})
@users_blueprint.route('/user/login', methods=['POST'])
@users_blueprint.output(TokenSchema(), 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error") @users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
def login(data): def login(data):
check_if_user_data_exists(data) check_if_user_data_exists(data)
@ -37,21 +51,21 @@ def login(data):
username = data['username'] username = data['username']
password = data['password'] password = data['password']
user = db.session.query(User).filter_by(username=username).first() query_user = db.session.query(User).filter_by(username=username).first()
if user is None: # Username doesn't exist if query_user is None: # Username doesn't exist
abort(500, message="Unable to login") abort(500, message="Unable to login")
if not check_password(user.password, password): # Password incorrect if not check_password(query_user.password, password): # Password incorrect
abort(500, message="Unable to login") abort(500, message="Unable to login")
token = jwt.encode({'username': user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256") token = jwt.encode({'username': query_user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
return jsonify({"status": 200, "text": "Successfully logged in", "data": {"token": token}}) return jsonify({"status": 200, "text": "Successfully logged in", "data": {"token": token}})
@users_blueprint.route('/register', methods=['POST']) @users_blueprint.route('/user/register', methods=['POST'])
@users_blueprint.output(UsersSchema(), 200) @users_blueprint.output(UsersSchema(), 200)
@users_blueprint.input(schema=LoginData) @users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Register", description="Registers user") @users_blueprint.doc(summary="Register", description="Registers user")
def register(data): def register(data):
check_if_user_data_exists(data) check_if_user_data_exists(data)
@ -59,20 +73,94 @@ def register(data):
username = data['username'] username = data['username']
password = data['password'] password = data['password']
user = db.session.query(User).filter_by(username=username).first() query_user = db.session.query(User).filter_by(username=username).first()
if user is not None: # Username already exist if query_user is not None: # Username already exist
abort(500, message="Username already exist") abort(500, message="Username already exist")
user = User( new_user = User(
username=username, username=username,
password=hash_password(password), password=hash_password(password),
admin=False admin=False
) )
db.session.add(user) db.session.add(new_user)
db.session.commit() db.session.commit()
return jsonify({"status": 200, "text": "Successfully registered user", "data": user.as_dict()}) return jsonify({"status": 200, "text": "Successfully registered user", "data": new_user.as_dict()})
@users_blueprint.route('/user', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user")
def update_user(data):
username = get_username_or_abort_401()
check_if_user_data_exists(data)
new_username = data['username']
new_password = data['password']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
if new_password is not None:
query_user.password = hash_password(new_password)
if new_username is not None:
query_user.username = new_username
db.session.commit()
return jsonify({"status": 200, "text": "Successfully updated user", "data": {}})
@users_blueprint.route('/user/setAdmin', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=AdminDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user")
def set_admin(data):
abort_if_no_admin() # Only admin users can do this
check_if_admin_data_exists(data)
username = data['username']
admin = data['admin']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
query_user.admin = admin
db.session.commit()
return jsonify({"status": 200, "text": "Successfully updated users admin rights", "data": {}})
@users_blueprint.route('/user', methods=['DELETE'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=DeleteUserSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Delete user", description="Deletes user by username")
def delete_user(data):
check_if_delete_data_exists(data)
username = data['username']
if username == get_username_or_abort_401(): # Username is same as current user
db.session.query(User).filter_by(username=username).delete()
db.session.commit()
else: # Delete different user than my user -> only admin users
abort_if_no_admin()
db.session.query(User).filter_by(username=username).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully removed user", "data": {}})
def check_if_user_data_exists(data): def check_if_user_data_exists(data):
@ -87,3 +175,25 @@ def check_if_user_data_exists(data):
if data['password'] == "" or data['password'] is None: if data['password'] == "" or data['password'] is None:
abort(400, message="Password missing") abort(400, message="Password missing")
def check_if_admin_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
if "admin" not in data:
abort(400, message="Admin state missing")
if data['admin'] == "" or data['admin'] is None:
abort(400, message="Admin state missing")
def check_if_delete_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")

View File

@ -1,5 +1,9 @@
# TODO # TODO
# Change password, username # Roles -> Admin Non-Admin
# Delete all users Delete only me
# Set Admin -
# Show all users -
#
# Endpoints for news, shares # Endpoints for news, shares
from apiflask import APIFlask from apiflask import APIFlask

View File

@ -55,7 +55,19 @@ def get_user_id_from_username(username):
def get_username_or_abort_401(): def get_username_or_abort_401():
# get username from jwt token # get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token())) username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code if username is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login") abort(401, message="Unable to login")
return username return username
def abort_if_no_admin():
if not is_user_admin():
abort(401, message="Only admin users can access this")
def is_user_admin():
username = get_username_or_abort_401()
return db.session.query(User).filter_by(username=username).first().admin

View File

@ -16,15 +16,33 @@ class UsersSchema(Schema):
username = String() username = String()
class Token(Schema): class AdminDataSchema(Schema):
username = String()
admin = Boolean()
class TokenSchema(Schema):
token = String() token = String()
class LoginData(Schema): class LoginDataSchema(Schema):
username = String() username = String()
password = String() password = String()
class DeleteUserSchema(Schema):
username = String()
class ChangePasswordSchema(Schema):
old_password = String()
new_password = String()
class ChangeUsernameSchema(Schema):
new_username = String()
class KeywordSchema(Schema): class KeywordSchema(Schema):
keyword = String() keyword = String()