2022-04-12 07:50:24 +00:00
|
|
|
__author__ = "Florian Kaiser"
|
|
|
|
__copyright__ = "Copyright 2022, Project Aktienbot"
|
|
|
|
__credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"]
|
|
|
|
__license__ = "GPL 3.0"
|
|
|
|
__version__ = "1.0.0"
|
|
|
|
|
2022-03-27 18:42:11 +00:00
|
|
|
import bcrypt
|
2022-03-14 16:10:00 +00:00
|
|
|
import jwt
|
2022-03-17 08:26:25 +00:00
|
|
|
from apiflask import abort
|
2022-03-30 08:46:54 +00:00
|
|
|
from app.db import database as db
|
|
|
|
from app.models import User
|
2022-04-05 08:51:09 +00:00
|
|
|
from flask import current_app
|
|
|
|
from flask import request, jsonify
|
2022-03-14 16:10:00 +00:00
|
|
|
|
2022-03-14 06:32:16 +00:00
|
|
|
|
|
|
|
def hash_password(password):
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Hash plain password to save it in the database
|
|
|
|
"""
|
2022-03-27 18:42:11 +00:00
|
|
|
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
2022-03-14 06:32:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
def check_password(hashed_password, user_password):
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Check if the password is correct using the bcrypt checkpw function
|
|
|
|
"""
|
2022-03-27 19:21:20 +00:00
|
|
|
return bcrypt.checkpw(user_password, hashed_password)
|
2022-03-14 16:10:00 +00:00
|
|
|
|
|
|
|
|
2022-04-05 08:51:09 +00:00
|
|
|
def get_email_from_token_data(token):
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Extract email from token data
|
|
|
|
"""
|
|
|
|
|
|
|
|
# If token is not provided-> return None
|
2022-04-05 08:51:09 +00:00
|
|
|
if token is None or len(token) < 2:
|
|
|
|
return None
|
|
|
|
else:
|
2022-04-12 09:36:23 +00:00
|
|
|
# Token contains "Bearer " -> remove it
|
2022-04-05 08:51:09 +00:00
|
|
|
token = token[1]
|
|
|
|
|
2022-04-12 09:36:23 +00:00
|
|
|
# Again: Check if token is not None
|
|
|
|
# Don't know why, but sometimes the token is None
|
2022-04-05 08:51:09 +00:00
|
|
|
if token is not None:
|
2022-04-12 09:36:23 +00:00
|
|
|
|
|
|
|
# We decided to append the user id to the bearer token using ":" as separator to select an specific user
|
|
|
|
# If the token contains ":" -> It may be a bot token
|
|
|
|
# If token valid -> return user email, not bot email
|
|
|
|
if ':' in token:
|
2022-04-05 08:51:09 +00:00
|
|
|
telegram_user_id = token.split(":")[1]
|
|
|
|
token = token.split(":")[0]
|
|
|
|
|
|
|
|
try:
|
2022-04-12 09:36:23 +00:00
|
|
|
# Only allow selecting users with telegram_user_id if current user is the bot user
|
2022-04-05 08:51:09 +00:00
|
|
|
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
|
|
|
|
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
|
|
|
|
|
2022-04-12 09:36:23 +00:00
|
|
|
# Check if user id exists
|
2022-04-05 08:51:09 +00:00
|
|
|
if res is not None:
|
|
|
|
return res.as_dict()['email']
|
2022-03-22 10:20:04 +00:00
|
|
|
else:
|
|
|
|
return None
|
2022-04-05 08:51:09 +00:00
|
|
|
else:
|
2022-03-22 10:20:04 +00:00
|
|
|
return None
|
2022-04-05 08:51:09 +00:00
|
|
|
except jwt.PyJWTError:
|
|
|
|
return None
|
2022-03-22 10:20:04 +00:00
|
|
|
|
2022-04-05 08:51:09 +00:00
|
|
|
else: # "Normal" token, extract username from token
|
|
|
|
try:
|
2022-04-12 09:36:23 +00:00
|
|
|
# Return email from token if token is valid
|
2022-04-05 08:51:09 +00:00
|
|
|
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
|
|
|
|
except jwt.PyJWTError:
|
|
|
|
return None
|
2022-03-22 10:20:04 +00:00
|
|
|
|
2022-04-05 08:51:09 +00:00
|
|
|
|
|
|
|
def get_token():
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Extract token from Authorization header
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Check if Authorization header is provided
|
2022-04-05 08:51:09 +00:00
|
|
|
if 'Authorization' in request.headers:
|
|
|
|
return request.headers['Authorization'].split(" ")
|
|
|
|
else:
|
|
|
|
return None
|
2022-03-14 16:10:00 +00:00
|
|
|
|
|
|
|
|
2022-03-27 15:23:33 +00:00
|
|
|
def get_email_or_abort_401():
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Try to receive email from token data
|
|
|
|
If email is not provided -> abort 401
|
|
|
|
"""
|
2022-04-05 08:51:09 +00:00
|
|
|
email = get_email_from_token_data(get_token())
|
2022-03-17 10:05:28 +00:00
|
|
|
|
2022-03-27 15:23:33 +00:00
|
|
|
if email is None: # If token not provided or invalid -> return 401 code
|
2022-03-17 08:26:25 +00:00
|
|
|
abort(401, message="Unable to login")
|
|
|
|
|
2022-03-27 15:23:33 +00:00
|
|
|
return email
|
2022-03-17 10:05:28 +00:00
|
|
|
|
|
|
|
|
|
|
|
def abort_if_no_admin():
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Check if user is admin
|
|
|
|
If not -> abort 401
|
|
|
|
"""
|
2022-03-17 10:05:28 +00:00
|
|
|
if not is_user_admin():
|
|
|
|
abort(401, message="Only admin users can access this")
|
|
|
|
|
|
|
|
|
|
|
|
def is_user_admin():
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Return users admin status
|
|
|
|
"""
|
2022-03-27 15:23:33 +00:00
|
|
|
email = get_email_or_abort_401()
|
2022-03-17 10:05:28 +00:00
|
|
|
|
2022-03-27 15:23:33 +00:00
|
|
|
return db.session.query(User).filter_by(email=email).first().admin
|
2022-03-22 10:21:39 +00:00
|
|
|
|
|
|
|
|
|
|
|
def make_response(data, status=200, text=""):
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Generate response object
|
|
|
|
"""
|
2022-03-27 15:23:33 +00:00
|
|
|
return jsonify({"status": status, "text": text, "data": data})
|
2022-04-05 08:51:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get_user(email):
|
2022-04-12 09:36:23 +00:00
|
|
|
"""
|
|
|
|
Get user from database
|
|
|
|
"""
|
2022-04-05 08:51:09 +00:00
|
|
|
query_user = db.session.query(User).filter_by(email=email).first()
|
|
|
|
|
2022-04-12 09:36:23 +00:00
|
|
|
# Check if user exists
|
|
|
|
if query_user is None:
|
2022-04-05 08:51:09 +00:00
|
|
|
abort(500, message="Can't find user")
|
|
|
|
|
|
|
|
return query_user
|