2022-03-14 17:10:00 +01:00
|
|
|
import os
|
2022-03-14 07:32:16 +01:00
|
|
|
|
2022-03-27 20:42:11 +02:00
|
|
|
import bcrypt
|
2022-03-14 17:10:00 +01:00
|
|
|
import jwt
|
2022-03-17 09:26:25 +01:00
|
|
|
from apiflask import abort
|
2022-03-22 11:21:39 +01:00
|
|
|
from flask import request, jsonify
|
2022-03-14 17:10:00 +01:00
|
|
|
|
2022-03-14 17:36:38 +01:00
|
|
|
from db import db
|
|
|
|
from models import User
|
2022-03-14 17:10:00 +01:00
|
|
|
|
2022-03-14 07:32:16 +01:00
|
|
|
|
|
|
|
def hash_password(password):
|
2022-03-27 20:42:11 +02:00
|
|
|
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
2022-03-14 07:32:16 +01:00
|
|
|
|
|
|
|
|
|
|
|
def check_password(hashed_password, user_password):
|
2022-03-27 21:21:20 +02:00
|
|
|
return bcrypt.checkpw(user_password, hashed_password)
|
2022-03-14 17:10:00 +01:00
|
|
|
|
|
|
|
|
|
|
|
def get_token():
|
|
|
|
token = None
|
|
|
|
if 'Authorization' in request.headers:
|
|
|
|
token = request.headers['Authorization'].split(" ")[1]
|
|
|
|
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
|
|
|
def extract_token_data(token):
|
|
|
|
if token is not None:
|
|
|
|
try:
|
|
|
|
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
|
2022-03-27 20:03:11 +02:00
|
|
|
except jwt.PyJWTError:
|
2022-03-14 17:10:00 +01:00
|
|
|
return None
|
2022-03-14 22:57:03 +01:00
|
|
|
else:
|
|
|
|
return None
|
2022-03-14 17:10:00 +01:00
|
|
|
|
|
|
|
|
2022-03-27 17:23:33 +02:00
|
|
|
def get_email_from_token_data():
|
2022-03-22 11:20:04 +01:00
|
|
|
if 'Authorization' in request.headers:
|
|
|
|
token = request.headers['Authorization'].split(" ")[1]
|
|
|
|
|
|
|
|
if token is not None:
|
|
|
|
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
|
2022-03-28 17:51:10 +02:00
|
|
|
telegram_user_id = token.split(":")[1]
|
2022-03-22 11:20:04 +01:00
|
|
|
token = token.split(":")[0]
|
|
|
|
|
|
|
|
try:
|
2022-03-27 21:36:10 +02:00
|
|
|
if jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] == os.getenv("BOT_EMAIL"):
|
2022-03-28 17:51:10 +02:00
|
|
|
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
|
|
|
|
|
|
|
|
if res is not None:
|
|
|
|
return res.as_dict()['email']
|
|
|
|
else:
|
|
|
|
return None
|
2022-03-22 11:20:04 +01:00
|
|
|
else:
|
|
|
|
return None
|
2022-03-27 20:03:11 +02:00
|
|
|
except jwt.PyJWTError:
|
2022-03-22 11:20:04 +01:00
|
|
|
return None
|
|
|
|
|
|
|
|
else: # "Normal" token, extract username from token
|
|
|
|
try:
|
2022-03-27 17:23:33 +02:00
|
|
|
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email']
|
2022-03-27 20:03:11 +02:00
|
|
|
except jwt.PyJWTError:
|
2022-03-22 11:20:04 +01:00
|
|
|
return None
|
|
|
|
|
|
|
|
return None
|
2022-03-14 17:10:00 +01:00
|
|
|
|
|
|
|
|
2022-03-27 17:23:33 +02:00
|
|
|
def get_email_or_abort_401():
|
2022-03-17 09:26:25 +01:00
|
|
|
# get username from jwt token
|
2022-03-27 17:23:33 +02:00
|
|
|
email = get_email_from_token_data()
|
2022-03-17 11:05:28 +01:00
|
|
|
|
2022-03-27 17:23:33 +02:00
|
|
|
if email is None: # If token not provided or invalid -> return 401 code
|
2022-03-17 09:26:25 +01:00
|
|
|
abort(401, message="Unable to login")
|
|
|
|
|
2022-03-27 17:23:33 +02:00
|
|
|
return email
|
2022-03-17 11:05:28 +01:00
|
|
|
|
|
|
|
|
|
|
|
def abort_if_no_admin():
|
|
|
|
if not is_user_admin():
|
|
|
|
abort(401, message="Only admin users can access this")
|
|
|
|
|
|
|
|
|
|
|
|
def is_user_admin():
|
2022-03-27 17:23:33 +02:00
|
|
|
email = get_email_or_abort_401()
|
2022-03-17 11:05:28 +01:00
|
|
|
|
2022-03-27 17:23:33 +02:00
|
|
|
return db.session.query(User).filter_by(email=email).first().admin
|
2022-03-22 11:21:39 +01:00
|
|
|
|
|
|
|
|
|
|
|
def make_response(data, status=200, text=""):
|
2022-03-27 17:23:33 +02:00
|
|
|
return jsonify({"status": status, "text": text, "data": data})
|