import bcrypt import jwt from apiflask import abort from app.db import database as db from app.models import User from flask import current_app from flask import request, jsonify def hash_password(password): return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) def check_password(hashed_password, user_password): return bcrypt.checkpw(user_password, hashed_password) def get_email_from_token_data(token): if token is None or len(token) < 2: return None else: token = token[1] if token is not None: if ':' in token: # Maybe bot token, check if token valid and return username after ":" then telegram_user_id = token.split(":")[1] token = token.split(":")[0] try: if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']: res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() if res is not None: return res.as_dict()['email'] else: return None else: return None except jwt.PyJWTError: return None else: # "Normal" token, extract username from token try: return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] except jwt.PyJWTError: return None def get_token(): if 'Authorization' in request.headers: return request.headers['Authorization'].split(" ") else: return None def get_email_or_abort_401(): # get username from jwt token email = get_email_from_token_data(get_token()) if email is None: # If token not provided or invalid -> return 401 code abort(401, message="Unable to login") return email def abort_if_no_admin(): if not is_user_admin(): abort(401, message="Only admin users can access this") def is_user_admin(): email = get_email_or_abort_401() return db.session.query(User).filter_by(email=email).first().admin def make_response(data, status=200, text=""): return jsonify({"status": status, "text": text, "data": data}) def get_user(email): query_user = db.session.query(User).filter_by(email=email).first() if query_user is None: # Username doesn't exist abort(500, message="Can't find user") return query_user