import os import bcrypt import jwt from apiflask import abort from flask import request, jsonify from db import db from models import User def hash_password(password): return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) def check_password(hashed_password, user_password): return bcrypt.checkpw(user_password, hashed_password) def get_token(): token = None if 'Authorization' in request.headers: token = request.headers['Authorization'].split(" ")[1] return token def extract_token_data(token): if token is not None: try: return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"]) except jwt.PyJWTError: return None else: return None def get_email_from_token_data(): if 'Authorization' in request.headers: token = request.headers['Authorization'].split(" ")[1] if token is not None: if ':' in token: # Maybe bot token, check if token valid and return username after ":" then telegram_user_id = token.split(":")[1] token = token.split(":")[0] try: if jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] == os.getenv("BOT_EMAIL"): res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() if res is not None: return res.as_dict()['email'] else: return None else: return None except jwt.PyJWTError: return None else: # "Normal" token, extract username from token try: return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] except jwt.PyJWTError: return None return None def get_email_or_abort_401(): # get username from jwt token email = get_email_from_token_data() if email is None: # If token not provided or invalid -> return 401 code abort(401, message="Unable to login") return email def abort_if_no_admin(): if not is_user_admin(): abort(401, message="Only admin users can access this") def is_user_admin(): email = get_email_or_abort_401() return db.session.query(User).filter_by(email=email).first().admin def make_response(data, status=200, text=""): return jsonify({"status": status, "text": text, "data": data})