TelegramAktienBot/api/helper_functions.py

97 lines
2.7 KiB
Python

import hashlib
import os
import uuid
import jwt
from apiflask import abort
from flask import request, jsonify
from db import db
from models import User
def hash_password(password):
salt = uuid.uuid4().hex
return hashlib.sha256(salt.encode() + password.encode()).hexdigest() + ':' + salt
def check_password(hashed_password, user_password):
password, salt = hashed_password.split(':')
return password == hashlib.sha256(salt.encode() + user_password.encode()).hexdigest()
def get_token():
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
return token
def extract_token_data(token):
if token is not None:
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
except jwt.exceptions.DecodeError:
return None
else:
return None
def get_username_from_token_data():
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
if token is not None:
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
username = token.split(":")[1]
token = token.split(":")[0]
try:
if jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['username'] == "bot":
return username
else:
return None
except jwt.exceptions.DecodeError:
return None
else: # "Normal" token, extract username from token
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['username']
except jwt.exceptions.DecodeError:
return None
return None
def get_user_id_from_username(username):
if username is not None:
return db.session.query(User).filter_by(username=username).first().user_id
else:
return None
def get_username_or_abort_401():
# get username from jwt token
username = get_username_from_token_data()
if username is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
return username
def abort_if_no_admin():
if not is_user_admin():
abort(401, message="Only admin users can access this")
def is_user_admin():
username = get_username_or_abort_401()
return db.session.query(User).filter_by(username=username).first().admin
def make_response(data, status=200, text=""):
return jsonify({"status": status, "text": text, "data": {"token": data}})