Added cron field to database

This commit is contained in:
Administrator 2022-04-05 10:51:09 +02:00
parent e65933ac52
commit 835beed17d
10 changed files with 203 additions and 63 deletions

View File

@ -1,12 +1,10 @@
import os
from apiflask import APIBlueprint, abort
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401, get_user
from app.schema import TelegramIdSchema, UsersSchema
from app.models import User
telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -23,7 +21,8 @@ def add_keyword(data):
if not check_if_telegram_user_id_data_exists(data):
abort(400, message="User ID missing")
query_user = db.session.query(User).filter_by(email=email).first()
query_user = get_user(email)
query_user.telegram_user_id = data['telegram_user_id']
db.session.commit()

View File

@ -1,15 +1,14 @@
import datetime
import os
from flask import current_app
import jwt
from apiflask import APIBlueprint, abort
from app.db import database as db
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401
from app.models import User
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema
from app.auth import auth
from app.db import database as db
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401, get_user
from app.models import User
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema, CronDataSchema
from flask import current_app
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -36,9 +35,9 @@ def users():
def user():
email = get_email_or_abort_401()
res = db.session.query(User).filter_by(email=email).first().as_dict()
query_user = get_user(email)
return make_response(res, 200, "Successfully received current user data")
return make_response(query_user.as_dict(), 200, "Successfully received current user data")
@users_blueprint.route('/user/login', methods=['POST'])
@ -55,10 +54,7 @@ def login(data):
email = data['email']
password = data['password']
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # email doesn't exist
abort(500, message="Unable to login")
query_user = get_user(email)
if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect
abort(500, message="Unable to login")
@ -98,7 +94,8 @@ def register(data):
email=email,
username=username,
password=hash_password(password),
admin=False
admin=False,
cron="0 8 * * *"
)
db.session.add(new_user)
db.session.commit()
@ -114,7 +111,7 @@ def register(data):
def update_user(data):
email = get_email_or_abort_401()
query_user = db.session.query(User).filter_by(email=email).first()
query_user = get_user(email)
if check_if_password_data_exists(data):
query_user.password = hash_password(data['password'])
@ -144,10 +141,7 @@ def set_admin(data):
email = data['email']
admin = data['admin']
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to update user")
query_user = get_user(email)
query_user.admin = admin
db.session.commit()
@ -155,6 +149,23 @@ def set_admin(data):
return make_response({}, 200, "Successfully updated users admin rights")
@users_blueprint.route('/user/setCron', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=CronDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Set update cron", description="Set update cron of specified user")
def set_cron(data):
email = get_email_or_abort_401()
if not check_if_cron_data_exists(data):
abort(400, "Cron data missing")
get_user(email).cron = data['cron']
db.session.commit()
return make_response({}, 200, "Successfully updated users cron")
@users_blueprint.route('/user', methods=['DELETE'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=DeleteUserSchema)
@ -216,3 +227,13 @@ def check_if_admin_data_exists(data):
return False
return True
def check_if_cron_data_exists(data):
if "cron" not in data:
return False
if data['cron'] == "" or data['cron'] is None:
return False
return True

View File

@ -1,12 +1,10 @@
from flask import current_app
import bcrypt
import jwt
from apiflask import abort
from flask import request, jsonify
from app.db import database as db
from app.models import User
from flask import current_app
from flask import request, jsonify
def hash_password(password):
@ -17,45 +15,47 @@ def check_password(hashed_password, user_password):
return bcrypt.checkpw(user_password, hashed_password)
def get_email_from_token_data():
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")
def get_email_from_token_data(token):
if token is None or len(token) < 2:
return None
else:
token = token[1]
if len(token) < 2:
return None
else:
token = token[1]
if token is not None:
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
telegram_user_id = token.split(":")[1]
token = token.split(":")[0]
if token is not None:
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
telegram_user_id = token.split(":")[1]
token = token.split(":")[0]
try:
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
try:
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
if res is not None:
return res.as_dict()['email']
else:
return None
if res is not None:
return res.as_dict()['email']
else:
return None
except jwt.PyJWTError:
else:
return None
except jwt.PyJWTError:
return None
else: # "Normal" token, extract username from token
try:
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
except jwt.PyJWTError:
return None
else: # "Normal" token, extract username from token
try:
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
except jwt.PyJWTError:
return None
return None
def get_token():
if 'Authorization' in request.headers:
return request.headers['Authorization'].split(" ")
else:
return None
def get_email_or_abort_401():
# get username from jwt token
email = get_email_from_token_data()
email = get_email_from_token_data(get_token())
if email is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
@ -76,3 +76,12 @@ def is_user_admin():
def make_response(data, status=200, text=""):
return jsonify({"status": status, "text": text, "data": data})
def get_user(email):
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Can't find user")
return query_user

View File

@ -7,14 +7,16 @@ class User(db.Model):
password = db.Column('password', db.BINARY(60), nullable=False)
username = db.Column('username', db.String(255), nullable=False, server_default='')
telegram_user_id = db.Column('telegram_user_id', db.String(255), nullable=True, server_default='')
admin = db.Column('admin', db.Boolean(), server_default='0')
admin = db.Column('admin', db.Boolean(), server_default='0') # 0 = False, 1 = True
cron = db.Column('cron', db.String(20), server_default='0 8 * * *', nullable=False)
def as_dict(self):
return {
"email": self.email,
"username": self.username,
"telegram_user_id": self.telegram_user_id,
"admin": self.admin
"admin": self.admin,
"cron": self.cron
}

View File

@ -23,6 +23,10 @@ class AdminDataSchema(Schema):
admin = Boolean()
class CronDataSchema(Schema):
cron = String()
class TokenSchema(Schema):
token = String()

View File

@ -2,6 +2,7 @@
This file (test_user.py) contains the functional tests for the `users` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
@ -35,7 +36,7 @@ def test_login_user_not_exist(test_client, init_database):
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json')
assert response.status_code == 500
assert b'Unable to login' in response.data
assert b'Can\'t find user' in response.data
def test_login_email_missing(test_client, init_database):
@ -407,7 +408,7 @@ def test_set_admin_admin1_logged_in_user_not_exist(test_client, init_database):
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 500
assert b'Unable to update user' in response.data
assert b'Can\'t find user' in response.data
def test_set_admin_admin1_logged_in_email_missing(test_client, init_database):
@ -466,6 +467,94 @@ def test_set_admin_admin1_logged_in_admin_empty(test_client, init_database):
assert b'Field may not be null' in response.data
def test_set_cron_user_not_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User is not logged in
"""
response = test_client.put('/api/user/setCron')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_set_cron_user1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully updated users cron' in response.data
def test_set_empty_cron_user1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in
Interval is empty
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
def test_set_cron_bot_logged_in_user_exists(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Bot1 is logged in and requests user 12345678
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":12345678")},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully updated users cron' in response.data
def test_set_cron_bot_logged_in_user_not_exists(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Bot1 is logged in and requests user 1234 (not existing)
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":1234")},
content_type='application/json')
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_set_cron_user1_logged_in_but_no_bot(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in and requests user 1234 (not existing)
Fails because user1 is not a bot
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password") + ":12345678")},
content_type='application/json')
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_set_cron_invalid_token(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Invalid Bearer token
"""
response = test_client.put('/api/user/setCron', headers={"Authorization": "Bearer {}".format("invalidtoken:12345678")})
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_users_not_logged_in(test_client, init_database):
"""
Test GET '/api/users'

View File

@ -18,3 +18,10 @@ def test_check_password():
hashed = hash_password("password")
assert check_password(hashed, "password".encode("utf-8")) is True
assert check_password(hashed, "password1".encode("utf-8")) is False
def test_get_email_from_token():
"""
Test get_email_from_token function
"""
assert get_email_from_token_data(None) is None

View File

@ -1,9 +1,8 @@
"""
This file (test_models.py) contains the unit tests for the models.py file.
"""
from app.models import User, Transaction, Keyword, Share
from app.helper_functions import hash_password
from app.models import User, Transaction, Keyword, Share
def test_new_user():
@ -16,14 +15,15 @@ def test_new_user():
email="user@example.com",
username="user",
password=hash_password("password"),
admin=False
admin=False,
cron="0 8 * * *",
)
assert user.email == 'user@example.com'
assert user.password != 'password'
assert user.username == "user"
assert user.telegram_user_id is None
assert user.admin is False
assert user.as_dict() == {'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False}
assert user.as_dict() == {'cron': '0 8 * * *', 'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False}
def test_new_user_with_fixture(new_user):

View File

@ -38,3 +38,12 @@ def test_check_if_admin_data_exists():
assert check_if_admin_data_exists(dict(admin=True)) is True
assert check_if_admin_data_exists(dict(admin=None)) is False
assert check_if_admin_data_exists(dict()) is False
def test_check_if_cron_data_exists():
"""
Test check_if_cron_data_exists function
"""
assert check_if_cron_data_exists(dict(cron="* * * * *")) is True
assert check_if_cron_data_exists(dict(cron="")) is False
assert check_if_cron_data_exists(dict()) is False