2022-03-14 06:32:16 +00:00
|
|
|
import hashlib
|
2022-03-14 16:10:00 +00:00
|
|
|
import os
|
2022-03-14 06:32:16 +00:00
|
|
|
import uuid
|
|
|
|
|
2022-03-14 16:10:00 +00:00
|
|
|
import jwt
|
2022-03-17 08:26:25 +00:00
|
|
|
from apiflask import abort
|
|
|
|
from flask import request
|
2022-03-14 16:10:00 +00:00
|
|
|
|
2022-03-14 16:36:38 +00:00
|
|
|
from db import db
|
|
|
|
from models import User
|
2022-03-14 16:10:00 +00:00
|
|
|
|
2022-03-14 06:32:16 +00:00
|
|
|
|
|
|
|
def hash_password(password):
|
|
|
|
salt = uuid.uuid4().hex
|
|
|
|
return hashlib.sha256(salt.encode() + password.encode()).hexdigest() + ':' + salt
|
|
|
|
|
|
|
|
|
|
|
|
def check_password(hashed_password, user_password):
|
|
|
|
password, salt = hashed_password.split(':')
|
2022-03-14 16:10:00 +00:00
|
|
|
return password == hashlib.sha256(salt.encode() + user_password.encode()).hexdigest()
|
|
|
|
|
|
|
|
|
|
|
|
def get_token():
|
|
|
|
token = None
|
|
|
|
if 'Authorization' in request.headers:
|
|
|
|
token = request.headers['Authorization'].split(" ")[1]
|
|
|
|
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
|
|
|
def extract_token_data(token):
|
|
|
|
if token is not None:
|
|
|
|
try:
|
|
|
|
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
|
2022-03-17 08:26:25 +00:00
|
|
|
except jwt.exceptions.DecodeError:
|
2022-03-14 16:10:00 +00:00
|
|
|
return None
|
2022-03-14 21:57:03 +00:00
|
|
|
else:
|
|
|
|
return None
|
2022-03-14 16:10:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get_username_from_token_data(token_data):
|
2022-03-14 21:57:03 +00:00
|
|
|
if token_data is not None:
|
|
|
|
return token_data['username']
|
|
|
|
else:
|
|
|
|
return None
|
2022-03-14 16:10:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get_user_id_from_username(username):
|
2022-03-14 21:57:03 +00:00
|
|
|
if username is not None:
|
|
|
|
return db.session.query(User).filter_by(username=username).first().user_id
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2022-03-17 08:26:25 +00:00
|
|
|
def get_username_or_abort_401():
|
|
|
|
# get username from jwt token
|
|
|
|
username = get_username_from_token_data(extract_token_data(get_token()))
|
2022-03-17 10:05:28 +00:00
|
|
|
|
2022-03-17 08:26:25 +00:00
|
|
|
if username is None: # If token not provided or invalid -> return 401 code
|
|
|
|
abort(401, message="Unable to login")
|
|
|
|
|
|
|
|
return username
|
2022-03-17 10:05:28 +00:00
|
|
|
|
|
|
|
|
|
|
|
def abort_if_no_admin():
|
|
|
|
if not is_user_admin():
|
|
|
|
abort(401, message="Only admin users can access this")
|
|
|
|
|
|
|
|
|
|
|
|
def is_user_admin():
|
|
|
|
username = get_username_or_abort_401()
|
|
|
|
|
|
|
|
return db.session.query(User).filter_by(username=username).first().admin
|