TelegramAktienBot/api/app/helper_functions.py
2022-04-12 11:36:23 +02:00

137 lines
3.8 KiB
Python

__author__ = "Florian Kaiser"
__copyright__ = "Copyright 2022, Project Aktienbot"
__credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"]
__license__ = "GPL 3.0"
__version__ = "1.0.0"
import bcrypt
import jwt
from apiflask import abort
from app.db import database as db
from app.models import User
from flask import current_app
from flask import request, jsonify
def hash_password(password):
"""
Hash plain password to save it in the database
"""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
def check_password(hashed_password, user_password):
"""
Check if the password is correct using the bcrypt checkpw function
"""
return bcrypt.checkpw(user_password, hashed_password)
def get_email_from_token_data(token):
"""
Extract email from token data
"""
# If token is not provided-> return None
if token is None or len(token) < 2:
return None
else:
# Token contains "Bearer " -> remove it
token = token[1]
# Again: Check if token is not None
# Don't know why, but sometimes the token is None
if token is not None:
# We decided to append the user id to the bearer token using ":" as separator to select an specific user
# If the token contains ":" -> It may be a bot token
# If token valid -> return user email, not bot email
if ':' in token:
telegram_user_id = token.split(":")[1]
token = token.split(":")[0]
try:
# Only allow selecting users with telegram_user_id if current user is the bot user
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
# Check if user id exists
if res is not None:
return res.as_dict()['email']
else:
return None
else:
return None
except jwt.PyJWTError:
return None
else: # "Normal" token, extract username from token
try:
# Return email from token if token is valid
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
except jwt.PyJWTError:
return None
def get_token():
"""
Extract token from Authorization header
"""
# Check if Authorization header is provided
if 'Authorization' in request.headers:
return request.headers['Authorization'].split(" ")
else:
return None
def get_email_or_abort_401():
"""
Try to receive email from token data
If email is not provided -> abort 401
"""
email = get_email_from_token_data(get_token())
if email is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
return email
def abort_if_no_admin():
"""
Check if user is admin
If not -> abort 401
"""
if not is_user_admin():
abort(401, message="Only admin users can access this")
def is_user_admin():
"""
Return users admin status
"""
email = get_email_or_abort_401()
return db.session.query(User).filter_by(email=email).first().admin
def make_response(data, status=200, text=""):
"""
Generate response object
"""
return jsonify({"status": status, "text": text, "data": data})
def get_user(email):
"""
Get user from database
"""
query_user = db.session.query(User).filter_by(email=email).first()
# Check if user exists
if query_user is None:
abort(500, message="Can't find user")
return query_user