Merge pull request #40 from WebEngineering2/cron

Added cron field to database
This commit is contained in:
Florian Kaiser 2022-04-05 10:52:11 +02:00 committed by GitHub
commit 19d22e2f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 203 additions and 63 deletions

View File

@ -1,12 +1,10 @@
import os import os
from apiflask import APIBlueprint, abort from apiflask import APIBlueprint, abort
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth from app.auth import auth
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401, get_user
from app.schema import TelegramIdSchema, UsersSchema from app.schema import TelegramIdSchema, UsersSchema
from app.models import User
telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api') telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -23,7 +21,8 @@ def add_keyword(data):
if not check_if_telegram_user_id_data_exists(data): if not check_if_telegram_user_id_data_exists(data):
abort(400, message="User ID missing") abort(400, message="User ID missing")
query_user = db.session.query(User).filter_by(email=email).first() query_user = get_user(email)
query_user.telegram_user_id = data['telegram_user_id'] query_user.telegram_user_id = data['telegram_user_id']
db.session.commit() db.session.commit()

View File

@ -1,15 +1,14 @@
import datetime import datetime
import os import os
from flask import current_app
import jwt import jwt
from apiflask import APIBlueprint, abort from apiflask import APIBlueprint, abort
from app.db import database as db
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401
from app.models import User
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema
from app.auth import auth from app.auth import auth
from app.db import database as db
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401, get_user
from app.models import User
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema, CronDataSchema
from flask import current_app
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -36,9 +35,9 @@ def users():
def user(): def user():
email = get_email_or_abort_401() email = get_email_or_abort_401()
res = db.session.query(User).filter_by(email=email).first().as_dict() query_user = get_user(email)
return make_response(res, 200, "Successfully received current user data") return make_response(query_user.as_dict(), 200, "Successfully received current user data")
@users_blueprint.route('/user/login', methods=['POST']) @users_blueprint.route('/user/login', methods=['POST'])
@ -55,10 +54,7 @@ def login(data):
email = data['email'] email = data['email']
password = data['password'] password = data['password']
query_user = db.session.query(User).filter_by(email=email).first() query_user = get_user(email)
if query_user is None: # email doesn't exist
abort(500, message="Unable to login")
if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect
abort(500, message="Unable to login") abort(500, message="Unable to login")
@ -98,7 +94,8 @@ def register(data):
email=email, email=email,
username=username, username=username,
password=hash_password(password), password=hash_password(password),
admin=False admin=False,
cron="0 8 * * *"
) )
db.session.add(new_user) db.session.add(new_user)
db.session.commit() db.session.commit()
@ -114,7 +111,7 @@ def register(data):
def update_user(data): def update_user(data):
email = get_email_or_abort_401() email = get_email_or_abort_401()
query_user = db.session.query(User).filter_by(email=email).first() query_user = get_user(email)
if check_if_password_data_exists(data): if check_if_password_data_exists(data):
query_user.password = hash_password(data['password']) query_user.password = hash_password(data['password'])
@ -144,10 +141,7 @@ def set_admin(data):
email = data['email'] email = data['email']
admin = data['admin'] admin = data['admin']
query_user = db.session.query(User).filter_by(email=email).first() query_user = get_user(email)
if query_user is None: # Username doesn't exist
abort(500, message="Unable to update user")
query_user.admin = admin query_user.admin = admin
db.session.commit() db.session.commit()
@ -155,6 +149,23 @@ def set_admin(data):
return make_response({}, 200, "Successfully updated users admin rights") return make_response({}, 200, "Successfully updated users admin rights")
@users_blueprint.route('/user/setCron', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=CronDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Set update cron", description="Set update cron of specified user")
def set_cron(data):
email = get_email_or_abort_401()
if not check_if_cron_data_exists(data):
abort(400, "Cron data missing")
get_user(email).cron = data['cron']
db.session.commit()
return make_response({}, 200, "Successfully updated users cron")
@users_blueprint.route('/user', methods=['DELETE']) @users_blueprint.route('/user', methods=['DELETE'])
@users_blueprint.output({}, 200) @users_blueprint.output({}, 200)
@users_blueprint.input(schema=DeleteUserSchema) @users_blueprint.input(schema=DeleteUserSchema)
@ -216,3 +227,13 @@ def check_if_admin_data_exists(data):
return False return False
return True return True
def check_if_cron_data_exists(data):
if "cron" not in data:
return False
if data['cron'] == "" or data['cron'] is None:
return False
return True

View File

@ -1,12 +1,10 @@
from flask import current_app
import bcrypt import bcrypt
import jwt import jwt
from apiflask import abort from apiflask import abort
from flask import request, jsonify
from app.db import database as db from app.db import database as db
from app.models import User from app.models import User
from flask import current_app
from flask import request, jsonify
def hash_password(password): def hash_password(password):
@ -17,11 +15,8 @@ def check_password(hashed_password, user_password):
return bcrypt.checkpw(user_password, hashed_password) return bcrypt.checkpw(user_password, hashed_password)
def get_email_from_token_data(): def get_email_from_token_data(token):
if 'Authorization' in request.headers: if token is None or len(token) < 2:
token = request.headers['Authorization'].split(" ")
if len(token) < 2:
return None return None
else: else:
token = token[1] token = token[1]
@ -50,12 +45,17 @@ def get_email_from_token_data():
except jwt.PyJWTError: except jwt.PyJWTError:
return None return None
def get_token():
if 'Authorization' in request.headers:
return request.headers['Authorization'].split(" ")
else:
return None return None
def get_email_or_abort_401(): def get_email_or_abort_401():
# get username from jwt token # get username from jwt token
email = get_email_from_token_data() email = get_email_from_token_data(get_token())
if email is None: # If token not provided or invalid -> return 401 code if email is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login") abort(401, message="Unable to login")
@ -76,3 +76,12 @@ def is_user_admin():
def make_response(data, status=200, text=""): def make_response(data, status=200, text=""):
return jsonify({"status": status, "text": text, "data": data}) return jsonify({"status": status, "text": text, "data": data})
def get_user(email):
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Can't find user")
return query_user

View File

@ -7,14 +7,16 @@ class User(db.Model):
password = db.Column('password', db.BINARY(60), nullable=False) password = db.Column('password', db.BINARY(60), nullable=False)
username = db.Column('username', db.String(255), nullable=False, server_default='') username = db.Column('username', db.String(255), nullable=False, server_default='')
telegram_user_id = db.Column('telegram_user_id', db.String(255), nullable=True, server_default='') telegram_user_id = db.Column('telegram_user_id', db.String(255), nullable=True, server_default='')
admin = db.Column('admin', db.Boolean(), server_default='0') admin = db.Column('admin', db.Boolean(), server_default='0') # 0 = False, 1 = True
cron = db.Column('cron', db.String(20), server_default='0 8 * * *', nullable=False)
def as_dict(self): def as_dict(self):
return { return {
"email": self.email, "email": self.email,
"username": self.username, "username": self.username,
"telegram_user_id": self.telegram_user_id, "telegram_user_id": self.telegram_user_id,
"admin": self.admin "admin": self.admin,
"cron": self.cron
} }

View File

@ -23,6 +23,10 @@ class AdminDataSchema(Schema):
admin = Boolean() admin = Boolean()
class CronDataSchema(Schema):
cron = String()
class TokenSchema(Schema): class TokenSchema(Schema):
token = String() token = String()

View File

@ -2,6 +2,7 @@
This file (test_user.py) contains the functional tests for the `users` blueprint. This file (test_user.py) contains the functional tests for the `users` blueprint.
""" """
import json import json
from tests.functional.helper_functions import get_token from tests.functional.helper_functions import get_token
@ -35,7 +36,7 @@ def test_login_user_not_exist(test_client, init_database):
""" """
response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json') response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json')
assert response.status_code == 500 assert response.status_code == 500
assert b'Unable to login' in response.data assert b'Can\'t find user' in response.data
def test_login_email_missing(test_client, init_database): def test_login_email_missing(test_client, init_database):
@ -407,7 +408,7 @@ def test_set_admin_admin1_logged_in_user_not_exist(test_client, init_database):
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json') content_type='application/json')
assert response.status_code == 500 assert response.status_code == 500
assert b'Unable to update user' in response.data assert b'Can\'t find user' in response.data
def test_set_admin_admin1_logged_in_email_missing(test_client, init_database): def test_set_admin_admin1_logged_in_email_missing(test_client, init_database):
@ -466,6 +467,94 @@ def test_set_admin_admin1_logged_in_admin_empty(test_client, init_database):
assert b'Field may not be null' in response.data assert b'Field may not be null' in response.data
def test_set_cron_user_not_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User is not logged in
"""
response = test_client.put('/api/user/setCron')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_set_cron_user1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully updated users cron' in response.data
def test_set_empty_cron_user1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in
Interval is empty
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
def test_set_cron_bot_logged_in_user_exists(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Bot1 is logged in and requests user 12345678
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":12345678")},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully updated users cron' in response.data
def test_set_cron_bot_logged_in_user_not_exists(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Bot1 is logged in and requests user 1234 (not existing)
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":1234")},
content_type='application/json')
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_set_cron_user1_logged_in_but_no_bot(test_client, init_database):
"""
Test PUT '/api/user/setCron'
User1 is logged in and requests user 1234 (not existing)
Fails because user1 is not a bot
"""
response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password") + ":12345678")},
content_type='application/json')
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_set_cron_invalid_token(test_client, init_database):
"""
Test PUT '/api/user/setCron'
Invalid Bearer token
"""
response = test_client.put('/api/user/setCron', headers={"Authorization": "Bearer {}".format("invalidtoken:12345678")})
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_users_not_logged_in(test_client, init_database): def test_get_users_not_logged_in(test_client, init_database):
""" """
Test GET '/api/users' Test GET '/api/users'

View File

@ -18,3 +18,10 @@ def test_check_password():
hashed = hash_password("password") hashed = hash_password("password")
assert check_password(hashed, "password".encode("utf-8")) is True assert check_password(hashed, "password".encode("utf-8")) is True
assert check_password(hashed, "password1".encode("utf-8")) is False assert check_password(hashed, "password1".encode("utf-8")) is False
def test_get_email_from_token():
"""
Test get_email_from_token function
"""
assert get_email_from_token_data(None) is None

View File

@ -1,9 +1,8 @@
""" """
This file (test_models.py) contains the unit tests for the models.py file. This file (test_models.py) contains the unit tests for the models.py file.
""" """
from app.models import User, Transaction, Keyword, Share
from app.helper_functions import hash_password from app.helper_functions import hash_password
from app.models import User, Transaction, Keyword, Share
def test_new_user(): def test_new_user():
@ -16,14 +15,15 @@ def test_new_user():
email="user@example.com", email="user@example.com",
username="user", username="user",
password=hash_password("password"), password=hash_password("password"),
admin=False admin=False,
cron="0 8 * * *",
) )
assert user.email == 'user@example.com' assert user.email == 'user@example.com'
assert user.password != 'password' assert user.password != 'password'
assert user.username == "user" assert user.username == "user"
assert user.telegram_user_id is None assert user.telegram_user_id is None
assert user.admin is False assert user.admin is False
assert user.as_dict() == {'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False} assert user.as_dict() == {'cron': '0 8 * * *', 'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False}
def test_new_user_with_fixture(new_user): def test_new_user_with_fixture(new_user):

View File

@ -38,3 +38,12 @@ def test_check_if_admin_data_exists():
assert check_if_admin_data_exists(dict(admin=True)) is True assert check_if_admin_data_exists(dict(admin=True)) is True
assert check_if_admin_data_exists(dict(admin=None)) is False assert check_if_admin_data_exists(dict(admin=None)) is False
assert check_if_admin_data_exists(dict()) is False assert check_if_admin_data_exists(dict()) is False
def test_check_if_cron_data_exists():
"""
Test check_if_cron_data_exists function
"""
assert check_if_cron_data_exists(dict(cron="* * * * *")) is True
assert check_if_cron_data_exists(dict(cron="")) is False
assert check_if_cron_data_exists(dict()) is False