diff --git a/api/app/blueprints/telegram.py b/api/app/blueprints/telegram.py index 5208d71..c96e923 100644 --- a/api/app/blueprints/telegram.py +++ b/api/app/blueprints/telegram.py @@ -1,12 +1,10 @@ import os from apiflask import APIBlueprint, abort - -from app.db import database as db -from app.helper_functions import make_response, get_email_or_abort_401 from app.auth import auth +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401, get_user from app.schema import TelegramIdSchema, UsersSchema -from app.models import User telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -23,7 +21,8 @@ def add_keyword(data): if not check_if_telegram_user_id_data_exists(data): abort(400, message="User ID missing") - query_user = db.session.query(User).filter_by(email=email).first() + query_user = get_user(email) + query_user.telegram_user_id = data['telegram_user_id'] db.session.commit() diff --git a/api/app/blueprints/user.py b/api/app/blueprints/user.py index 771bd60..a570883 100644 --- a/api/app/blueprints/user.py +++ b/api/app/blueprints/user.py @@ -1,15 +1,14 @@ import datetime import os -from flask import current_app import jwt from apiflask import APIBlueprint, abort - -from app.db import database as db -from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401 -from app.models import User -from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema from app.auth import auth +from app.db import database as db +from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401, get_user +from app.models import User +from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema, CronDataSchema +from flask import current_app users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -36,9 +35,9 @@ def users(): def user(): email = get_email_or_abort_401() - res = db.session.query(User).filter_by(email=email).first().as_dict() + query_user = get_user(email) - return make_response(res, 200, "Successfully received current user data") + return make_response(query_user.as_dict(), 200, "Successfully received current user data") @users_blueprint.route('/user/login', methods=['POST']) @@ -55,10 +54,7 @@ def login(data): email = data['email'] password = data['password'] - query_user = db.session.query(User).filter_by(email=email).first() - - if query_user is None: # email doesn't exist - abort(500, message="Unable to login") + query_user = get_user(email) if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect abort(500, message="Unable to login") @@ -98,7 +94,8 @@ def register(data): email=email, username=username, password=hash_password(password), - admin=False + admin=False, + cron="0 8 * * *" ) db.session.add(new_user) db.session.commit() @@ -114,7 +111,7 @@ def register(data): def update_user(data): email = get_email_or_abort_401() - query_user = db.session.query(User).filter_by(email=email).first() + query_user = get_user(email) if check_if_password_data_exists(data): query_user.password = hash_password(data['password']) @@ -144,10 +141,7 @@ def set_admin(data): email = data['email'] admin = data['admin'] - query_user = db.session.query(User).filter_by(email=email).first() - - if query_user is None: # Username doesn't exist - abort(500, message="Unable to update user") + query_user = get_user(email) query_user.admin = admin db.session.commit() @@ -155,6 +149,23 @@ def set_admin(data): return make_response({}, 200, "Successfully updated users admin rights") +@users_blueprint.route('/user/setCron', methods=['PUT']) +@users_blueprint.output({}, 200) +@users_blueprint.input(schema=CronDataSchema) +@users_blueprint.auth_required(auth) +@users_blueprint.doc(summary="Set update cron", description="Set update cron of specified user") +def set_cron(data): + email = get_email_or_abort_401() + + if not check_if_cron_data_exists(data): + abort(400, "Cron data missing") + + get_user(email).cron = data['cron'] + db.session.commit() + + return make_response({}, 200, "Successfully updated users cron") + + @users_blueprint.route('/user', methods=['DELETE']) @users_blueprint.output({}, 200) @users_blueprint.input(schema=DeleteUserSchema) @@ -216,3 +227,13 @@ def check_if_admin_data_exists(data): return False return True + + +def check_if_cron_data_exists(data): + if "cron" not in data: + return False + + if data['cron'] == "" or data['cron'] is None: + return False + + return True diff --git a/api/app/helper_functions.py b/api/app/helper_functions.py index 516d0e7..ee90298 100644 --- a/api/app/helper_functions.py +++ b/api/app/helper_functions.py @@ -1,12 +1,10 @@ -from flask import current_app - import bcrypt import jwt from apiflask import abort -from flask import request, jsonify - from app.db import database as db from app.models import User +from flask import current_app +from flask import request, jsonify def hash_password(password): @@ -17,45 +15,47 @@ def check_password(hashed_password, user_password): return bcrypt.checkpw(user_password, hashed_password) -def get_email_from_token_data(): - if 'Authorization' in request.headers: - token = request.headers['Authorization'].split(" ") +def get_email_from_token_data(token): + if token is None or len(token) < 2: + return None + else: + token = token[1] - if len(token) < 2: - return None - else: - token = token[1] + if token is not None: + if ':' in token: # Maybe bot token, check if token valid and return username after ":" then + telegram_user_id = token.split(":")[1] + token = token.split(":")[0] - if token is not None: - if ':' in token: # Maybe bot token, check if token valid and return username after ":" then - telegram_user_id = token.split(":")[1] - token = token.split(":")[0] + try: + if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']: + res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() - try: - if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']: - res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() - - if res is not None: - return res.as_dict()['email'] - else: - return None + if res is not None: + return res.as_dict()['email'] else: return None - except jwt.PyJWTError: + else: return None + except jwt.PyJWTError: + return None - else: # "Normal" token, extract username from token - try: - return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] - except jwt.PyJWTError: - return None + else: # "Normal" token, extract username from token + try: + return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] + except jwt.PyJWTError: + return None - return None + +def get_token(): + if 'Authorization' in request.headers: + return request.headers['Authorization'].split(" ") + else: + return None def get_email_or_abort_401(): # get username from jwt token - email = get_email_from_token_data() + email = get_email_from_token_data(get_token()) if email is None: # If token not provided or invalid -> return 401 code abort(401, message="Unable to login") @@ -76,3 +76,12 @@ def is_user_admin(): def make_response(data, status=200, text=""): return jsonify({"status": status, "text": text, "data": data}) + + +def get_user(email): + query_user = db.session.query(User).filter_by(email=email).first() + + if query_user is None: # Username doesn't exist + abort(500, message="Can't find user") + + return query_user diff --git a/api/app/models.py b/api/app/models.py index 6c06c7b..14b42ac 100644 --- a/api/app/models.py +++ b/api/app/models.py @@ -7,14 +7,16 @@ class User(db.Model): password = db.Column('password', db.BINARY(60), nullable=False) username = db.Column('username', db.String(255), nullable=False, server_default='') telegram_user_id = db.Column('telegram_user_id', db.String(255), nullable=True, server_default='') - admin = db.Column('admin', db.Boolean(), server_default='0') + admin = db.Column('admin', db.Boolean(), server_default='0') # 0 = False, 1 = True + cron = db.Column('cron', db.String(20), server_default='0 8 * * *', nullable=False) def as_dict(self): return { "email": self.email, "username": self.username, "telegram_user_id": self.telegram_user_id, - "admin": self.admin + "admin": self.admin, + "cron": self.cron } diff --git a/api/app/schema.py b/api/app/schema.py index f425825..834ada3 100644 --- a/api/app/schema.py +++ b/api/app/schema.py @@ -23,6 +23,10 @@ class AdminDataSchema(Schema): admin = Boolean() +class CronDataSchema(Schema): + cron = String() + + class TokenSchema(Schema): token = String() diff --git a/api/tests/functional/helper_functions.py b/api/tests/functional/helper_functions.py index 9e68226..2b88442 100644 --- a/api/tests/functional/helper_functions.py +++ b/api/tests/functional/helper_functions.py @@ -8,4 +8,4 @@ def get_token(test_client, email, password): if "token" in json.loads(response.data)["data"]: return json.loads(response.data)["data"]["token"] - return "" \ No newline at end of file + return "" diff --git a/api/tests/functional/test_user.py b/api/tests/functional/test_user.py index 8b57338..80fdaec 100644 --- a/api/tests/functional/test_user.py +++ b/api/tests/functional/test_user.py @@ -2,6 +2,7 @@ This file (test_user.py) contains the functional tests for the `users` blueprint. """ import json + from tests.functional.helper_functions import get_token @@ -35,7 +36,7 @@ def test_login_user_not_exist(test_client, init_database): """ response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json') assert response.status_code == 500 - assert b'Unable to login' in response.data + assert b'Can\'t find user' in response.data def test_login_email_missing(test_client, init_database): @@ -407,7 +408,7 @@ def test_set_admin_admin1_logged_in_user_not_exist(test_client, init_database): headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, content_type='application/json') assert response.status_code == 500 - assert b'Unable to update user' in response.data + assert b'Can\'t find user' in response.data def test_set_admin_admin1_logged_in_email_missing(test_client, init_database): @@ -466,6 +467,94 @@ def test_set_admin_admin1_logged_in_admin_empty(test_client, init_database): assert b'Field may not be null' in response.data +def test_set_cron_user_not_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + User is not logged in + """ + response = test_client.put('/api/user/setCron') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_set_cron_user1_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + User1 is logged in + """ + response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully updated users cron' in response.data + + +def test_set_empty_cron_user1_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + User1 is logged in + Interval is empty + """ + response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + + +def test_set_cron_bot_logged_in_user_exists(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + Bot1 is logged in and requests user 12345678 + """ + response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":12345678")}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully updated users cron' in response.data + + +def test_set_cron_bot_logged_in_user_not_exists(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + Bot1 is logged in and requests user 1234 (not existing) + """ + response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":1234")}, + content_type='application/json') + assert response.status_code == 401 + assert b'Unable to login' in response.data + + +def test_set_cron_user1_logged_in_but_no_bot(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + User1 is logged in and requests user 1234 (not existing) + Fails because user1 is not a bot + """ + response = test_client.put('/api/user/setCron', data=json.dumps(dict(cron="* * * * *")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password") + ":12345678")}, + content_type='application/json') + assert response.status_code == 401 + assert b'Unable to login' in response.data + + +def test_set_cron_invalid_token(test_client, init_database): + """ + Test PUT '/api/user/setCron' + + Invalid Bearer token + """ + response = test_client.put('/api/user/setCron', headers={"Authorization": "Bearer {}".format("invalidtoken:12345678")}) + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + def test_get_users_not_logged_in(test_client, init_database): """ Test GET '/api/users' diff --git a/api/tests/unit/test_helper_functions.py b/api/tests/unit/test_helper_functions.py index 5cf054b..ef84fc4 100644 --- a/api/tests/unit/test_helper_functions.py +++ b/api/tests/unit/test_helper_functions.py @@ -18,3 +18,10 @@ def test_check_password(): hashed = hash_password("password") assert check_password(hashed, "password".encode("utf-8")) is True assert check_password(hashed, "password1".encode("utf-8")) is False + + +def test_get_email_from_token(): + """ + Test get_email_from_token function + """ + assert get_email_from_token_data(None) is None diff --git a/api/tests/unit/test_models.py b/api/tests/unit/test_models.py index e8a336a..690b58a 100644 --- a/api/tests/unit/test_models.py +++ b/api/tests/unit/test_models.py @@ -1,9 +1,8 @@ """ This file (test_models.py) contains the unit tests for the models.py file. """ -from app.models import User, Transaction, Keyword, Share - from app.helper_functions import hash_password +from app.models import User, Transaction, Keyword, Share def test_new_user(): @@ -16,14 +15,15 @@ def test_new_user(): email="user@example.com", username="user", password=hash_password("password"), - admin=False + admin=False, + cron="0 8 * * *", ) assert user.email == 'user@example.com' assert user.password != 'password' assert user.username == "user" assert user.telegram_user_id is None assert user.admin is False - assert user.as_dict() == {'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False} + assert user.as_dict() == {'cron': '0 8 * * *', 'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False} def test_new_user_with_fixture(new_user): diff --git a/api/tests/unit/test_user.py b/api/tests/unit/test_user.py index 91ad580..eb5706d 100644 --- a/api/tests/unit/test_user.py +++ b/api/tests/unit/test_user.py @@ -38,3 +38,12 @@ def test_check_if_admin_data_exists(): assert check_if_admin_data_exists(dict(admin=True)) is True assert check_if_admin_data_exists(dict(admin=None)) is False assert check_if_admin_data_exists(dict()) is False + + +def test_check_if_cron_data_exists(): + """ + Test check_if_cron_data_exists function + """ + assert check_if_cron_data_exists(dict(cron="* * * * *")) is True + assert check_if_cron_data_exists(dict(cron="")) is False + assert check_if_cron_data_exists(dict()) is False