TelegramAktienBot/api/app/helper_functions.py
2022-04-12 09:50:24 +02:00

94 lines
2.7 KiB
Python

__author__ = "Florian Kaiser"
__copyright__ = "Copyright 2022, Project Aktienbot"
__credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"]
__license__ = "GPL 3.0"
__version__ = "1.0.0"
import bcrypt
import jwt
from apiflask import abort
from app.db import database as db
from app.models import User
from flask import current_app
from flask import request, jsonify
def hash_password(password):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
def check_password(hashed_password, user_password):
return bcrypt.checkpw(user_password, hashed_password)
def get_email_from_token_data(token):
if token is None or len(token) < 2:
return None
else:
token = token[1]
if token is not None:
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
telegram_user_id = token.split(":")[1]
token = token.split(":")[0]
try:
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
if res is not None:
return res.as_dict()['email']
else:
return None
else:
return None
except jwt.PyJWTError:
return None
else: # "Normal" token, extract username from token
try:
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
except jwt.PyJWTError:
return None
def get_token():
if 'Authorization' in request.headers:
return request.headers['Authorization'].split(" ")
else:
return None
def get_email_or_abort_401():
# get username from jwt token
email = get_email_from_token_data(get_token())
if email is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
return email
def abort_if_no_admin():
if not is_user_admin():
abort(401, message="Only admin users can access this")
def is_user_admin():
email = get_email_or_abort_401()
return db.session.query(User).filter_by(email=email).first().admin
def make_response(data, status=200, text=""):
return jsonify({"status": status, "text": text, "data": data})
def get_user(email):
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Can't find user")
return query_user