Tests
- Improved directory structure - Added functional and unit tests
This commit is contained in:
78
api/app/helper_functions.py
Normal file
78
api/app/helper_functions.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from flask import current_app
|
||||
|
||||
import bcrypt
|
||||
import jwt
|
||||
from apiflask import abort
|
||||
from flask import request, jsonify
|
||||
|
||||
from app.db import database as db
|
||||
from app.models import User
|
||||
|
||||
|
||||
def hash_password(password):
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
||||
|
||||
|
||||
def check_password(hashed_password, user_password):
|
||||
return bcrypt.checkpw(user_password, hashed_password)
|
||||
|
||||
|
||||
def get_email_from_token_data():
|
||||
if 'Authorization' in request.headers:
|
||||
token = request.headers['Authorization'].split(" ")
|
||||
|
||||
if len(token) < 2:
|
||||
return None
|
||||
else:
|
||||
token = token[1]
|
||||
|
||||
if token is not None:
|
||||
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
|
||||
telegram_user_id = token.split(":")[1]
|
||||
token = token.split(":")[0]
|
||||
|
||||
try:
|
||||
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
|
||||
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
|
||||
|
||||
if res is not None:
|
||||
return res.as_dict()['email']
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
except jwt.PyJWTError:
|
||||
return None
|
||||
|
||||
else: # "Normal" token, extract username from token
|
||||
try:
|
||||
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
|
||||
except jwt.PyJWTError:
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_email_or_abort_401():
|
||||
# get username from jwt token
|
||||
email = get_email_from_token_data()
|
||||
|
||||
if email is None: # If token not provided or invalid -> return 401 code
|
||||
abort(401, message="Unable to login")
|
||||
|
||||
return email
|
||||
|
||||
|
||||
def abort_if_no_admin():
|
||||
if not is_user_admin():
|
||||
abort(401, message="Only admin users can access this")
|
||||
|
||||
|
||||
def is_user_admin():
|
||||
email = get_email_or_abort_401()
|
||||
|
||||
return db.session.query(User).filter_by(email=email).first().admin
|
||||
|
||||
|
||||
def make_response(data, status=200, text=""):
|
||||
return jsonify({"status": status, "text": text, "data": data})
|
Reference in New Issue
Block a user