Added comments
This commit is contained in:
@@ -14,28 +14,48 @@ from flask import request, jsonify
|
||||
|
||||
|
||||
def hash_password(password):
|
||||
"""
|
||||
Hash plain password to save it in the database
|
||||
"""
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
||||
|
||||
|
||||
def check_password(hashed_password, user_password):
|
||||
"""
|
||||
Check if the password is correct using the bcrypt checkpw function
|
||||
"""
|
||||
return bcrypt.checkpw(user_password, hashed_password)
|
||||
|
||||
|
||||
def get_email_from_token_data(token):
|
||||
"""
|
||||
Extract email from token data
|
||||
"""
|
||||
|
||||
# If token is not provided-> return None
|
||||
if token is None or len(token) < 2:
|
||||
return None
|
||||
else:
|
||||
# Token contains "Bearer " -> remove it
|
||||
token = token[1]
|
||||
|
||||
# Again: Check if token is not None
|
||||
# Don't know why, but sometimes the token is None
|
||||
if token is not None:
|
||||
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
|
||||
|
||||
# We decided to append the user id to the bearer token using ":" as separator to select an specific user
|
||||
# If the token contains ":" -> It may be a bot token
|
||||
# If token valid -> return user email, not bot email
|
||||
if ':' in token:
|
||||
telegram_user_id = token.split(":")[1]
|
||||
token = token.split(":")[0]
|
||||
|
||||
try:
|
||||
# Only allow selecting users with telegram_user_id if current user is the bot user
|
||||
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
|
||||
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
|
||||
|
||||
# Check if user id exists
|
||||
if res is not None:
|
||||
return res.as_dict()['email']
|
||||
else:
|
||||
@@ -47,12 +67,18 @@ def get_email_from_token_data(token):
|
||||
|
||||
else: # "Normal" token, extract username from token
|
||||
try:
|
||||
# Return email from token if token is valid
|
||||
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
|
||||
except jwt.PyJWTError:
|
||||
return None
|
||||
|
||||
|
||||
def get_token():
|
||||
"""
|
||||
Extract token from Authorization header
|
||||
"""
|
||||
|
||||
# Check if Authorization header is provided
|
||||
if 'Authorization' in request.headers:
|
||||
return request.headers['Authorization'].split(" ")
|
||||
else:
|
||||
@@ -60,7 +86,10 @@ def get_token():
|
||||
|
||||
|
||||
def get_email_or_abort_401():
|
||||
# get username from jwt token
|
||||
"""
|
||||
Try to receive email from token data
|
||||
If email is not provided -> abort 401
|
||||
"""
|
||||
email = get_email_from_token_data(get_token())
|
||||
|
||||
if email is None: # If token not provided or invalid -> return 401 code
|
||||
@@ -70,24 +99,38 @@ def get_email_or_abort_401():
|
||||
|
||||
|
||||
def abort_if_no_admin():
|
||||
"""
|
||||
Check if user is admin
|
||||
If not -> abort 401
|
||||
"""
|
||||
if not is_user_admin():
|
||||
abort(401, message="Only admin users can access this")
|
||||
|
||||
|
||||
def is_user_admin():
|
||||
"""
|
||||
Return users admin status
|
||||
"""
|
||||
email = get_email_or_abort_401()
|
||||
|
||||
return db.session.query(User).filter_by(email=email).first().admin
|
||||
|
||||
|
||||
def make_response(data, status=200, text=""):
|
||||
"""
|
||||
Generate response object
|
||||
"""
|
||||
return jsonify({"status": status, "text": text, "data": data})
|
||||
|
||||
|
||||
def get_user(email):
|
||||
"""
|
||||
Get user from database
|
||||
"""
|
||||
query_user = db.session.query(User).filter_by(email=email).first()
|
||||
|
||||
if query_user is None: # Username doesn't exist
|
||||
# Check if user exists
|
||||
if query_user is None:
|
||||
abort(500, message="Can't find user")
|
||||
|
||||
return query_user
|
||||
|
Reference in New Issue
Block a user