__author__ = "Florian Kaiser" __copyright__ = "Copyright 2022, Project Aktienbot" __credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin Pauer"] __license__ = "GPL 3.0" __version__ = "1.0.0" import bcrypt import jwt from apiflask import abort from app.db import database as db from app.models import User from flask import current_app from flask import request, jsonify def hash_password(password): """ Hash plain password to save it in the database """ return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) def check_password(hashed_password, user_password): """ Check if the password is correct using the bcrypt checkpw function """ return bcrypt.checkpw(user_password, hashed_password) def get_email_from_token_data(token): """ Extract email from token data """ # If token is not provided-> return None if token is None or len(token) < 2: return None else: # Token contains "Bearer " -> remove it token = token[1] # Again: Check if token is not None # Don't know why, but sometimes the token is None if token is not None: # We decided to append the user id to the bearer token using ":" as separator to select an specific user # If the token contains ":" -> It may be a bot token # If token valid -> return user email, not bot email if ':' in token: telegram_user_id = token.split(":")[1] token = token.split(":")[0] try: # Only allow selecting users with telegram_user_id if current user is the bot user if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']: res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() # Check if user id exists if res is not None: return res.as_dict()['email'] else: return None else: return None except jwt.PyJWTError: return None else: # "Normal" token, extract username from token try: # Return email from token if token is valid return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] except jwt.PyJWTError: return None def get_token(): """ Extract token from Authorization header """ # Check if Authorization header is provided if 'Authorization' in request.headers: return request.headers['Authorization'].split(" ") else: return None def get_email_or_abort_401(): """ Try to receive email from token data If email is not provided -> abort 401 """ email = get_email_from_token_data(get_token()) if email is None: # If token not provided or invalid -> return 401 code abort(401, message="Unable to login") return email def abort_if_no_admin(): """ Check if user is admin If not -> abort 401 """ if not is_user_admin(): abort(401, message="Only admin users can access this") def is_user_admin(): """ Return users admin status """ email = get_email_or_abort_401() return db.session.query(User).filter_by(email=email).first().admin def make_response(data, status=200, text=""): """ Generate response object """ return jsonify({"status": status, "text": text, "data": data}) def get_user(email): """ Get user from database """ query_user = db.session.query(User).filter_by(email=email).first() # Check if user exists if query_user is None: abort(500, message="Can't find user") return query_user