diff --git a/api/README.md b/api/README.md index e771473..e9daaa2 100644 --- a/api/README.md +++ b/api/README.md @@ -10,6 +10,16 @@ Aktienbot API 2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`) 4. Run api `python api/app.py` + +## Testing +1. Create virtual environment `python -m venv venv env/Scripts/activate` +2. Install requirements `pip install -r api/requirements.txt` +3. Set environment variables (see list below) + 1. Use `.env`-file in `api` directory like `.env.example` + 2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`) +4. Change directory: `cd api/` +5. Run tests: `python -m pytest -v --cov-report term-missing --cov=app` + ## Environment variables ``` # Flask secret key diff --git a/api/app.py b/api/app.py index 7225166..0424fca 100644 --- a/api/app.py +++ b/api/app.py @@ -1,72 +1,6 @@ -import os +from app import create_app -from apiflask import APIFlask - -from dotenv import load_dotenv -from flask_cors import CORS - -from api_blueprint_keyword import keyword_blueprint -from api_blueprint_portfolio import portfolio_blueprint -from api_blueprint_shares import shares_blueprint -from api_blueprint_transactions import transaction_blueprint -from api_blueprint_telegram import telegram_blueprint -from helper_functions import hash_password -from models import * -from api_blueprint_user import users_blueprint - - -def create_app(): - load_dotenv() - - # Create Flask app load app.config - application = APIFlask(__name__, openapi_blueprint_url_prefix='/api') - application.config.from_object("config.ConfigClass") - - CORS(application, resources={r"*": {"origins": "*"}}) - - application.app_context().push() - - db.init_app(application) - - # api blueprints - application.register_blueprint(keyword_blueprint) - application.register_blueprint(shares_blueprint) - application.register_blueprint(transaction_blueprint) - application.register_blueprint(portfolio_blueprint) - application.register_blueprint(users_blueprint) - application.register_blueprint(telegram_blueprint) - - @application.before_first_request - def init_database(): - db.create_all() - - if os.getenv("BOT_EMAIL") is not None and os.getenv("BOT_USERNAME") is not None and os.getenv("BOT_PASSWORD") is not None: - if db.session.query(User).filter_by(email=os.getenv("BOT_EMAIL")).first() is None: # Check if user already exist - bot = User( - email=os.getenv("BOT_EMAIL"), - username=os.getenv("BOT_USERNAME"), - password=hash_password(os.getenv("BOT_PASSWORD")), - admin=False - ) - db.session.add(bot) - db.session.commit() - - if os.getenv("ADMIN_EMAIL") is not None and os.getenv("ADMIN_USERNAME") is not None and os.getenv("ADMIN_PASSWORD") is not None: - if db.session.query(User).filter_by(email=os.getenv("ADMIN_EMAIL")).first() is None: # Check if user already exist - admin = User( - email=os.getenv("ADMIN_EMAIL"), - username=os.getenv("ADMIN_USERNAME"), - password=hash_password(os.getenv("ADMIN_PASSWORD")), - admin=True - ) - db.session.add(admin) - db.session.commit() - - return application - - -app = create_app() - -# Start development web server -if __name__ == '__main__': - app.run() +# Call the application factory function to construct a Flask application +# instance using the development configuration +application = create_app('config/flask.cfg') +application.run() diff --git a/api/app/__init__.py b/api/app/__init__.py new file mode 100644 index 0000000..c62e70c --- /dev/null +++ b/api/app/__init__.py @@ -0,0 +1,67 @@ +from flask import current_app +from apiflask import APIFlask + +from dotenv import load_dotenv +from flask_cors import CORS + +from app.blueprints.keyword import keyword_blueprint +from app.blueprints.portfolio import portfolio_blueprint +from app.blueprints.shares import shares_blueprint +from app.blueprints.transactions import transaction_blueprint +from app.blueprints.telegram import telegram_blueprint +from app.blueprints.user import users_blueprint +from app.helper_functions import hash_password +from app.models import * + + +def create_app(config_filename=None): + load_dotenv() + + # Create Flask app load app.config + application = APIFlask(__name__, openapi_blueprint_url_prefix='/api') + application.config.from_pyfile(config_filename) + + CORS(application, resources={r"*": {"origins": "*"}}) + + application.app_context().push() + + db.init_app(application) + + # api blueprints + application.register_blueprint(keyword_blueprint) + application.register_blueprint(shares_blueprint) + application.register_blueprint(transaction_blueprint) + application.register_blueprint(portfolio_blueprint) + application.register_blueprint(users_blueprint) + application.register_blueprint(telegram_blueprint) + + @application.before_first_request + def init_database(): + db.create_all() + + if current_app.config['BOT_EMAIL'] is not None and current_app.config['BOT_USERNAME'] is not None and current_app.config['BOT_PASSWORD'] is not None: + if db.session.query(User).filter_by(email=current_app.config['BOT_EMAIL']).first() is None: # Check if user already exist + bot = User( + email=current_app.config['BOT_EMAIL'], + username=current_app.config['BOT_USERNAME'], + password=hash_password(current_app.config['BOT_PASSWORD']), + admin=False + ) + db.session.add(bot) + db.session.commit() + + if current_app.config['ADMIN_EMAIL'] is not None and current_app.config['ADMIN_USERNAME'] is not None and current_app.config['ADMIN_PASSWORD'] is not None: + if db.session.query(User).filter_by(email=current_app.config['ADMIN_EMAIL']).first() is None: # Check if user already exist + admin = User( + email=current_app.config['ADMIN_EMAIL'], + username=current_app.config['ADMIN_USERNAME'], + password=hash_password(current_app.config['ADMIN_PASSWORD']), + admin=True + ) + db.session.add(admin) + db.session.commit() + + return application + + +app = create_app("config/flask.cfg") diff --git a/api/auth.py b/api/app/auth.py similarity index 73% rename from api/auth.py rename to api/app/auth.py index d5db292..513a889 100644 --- a/api/auth.py +++ b/api/app/auth.py @@ -1,4 +1,4 @@ -import os +from flask import current_app import jwt from apiflask import HTTPTokenAuth @@ -15,7 +15,7 @@ def verify_token(token): token = token.split(":")[0] try: - jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"]) + jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"]) return True except jwt.PyJWTError: return False diff --git a/api/app/blueprints/__init__.py b/api/app/blueprints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/api_blueprint_keyword.py b/api/app/blueprints/keyword.py similarity index 85% rename from api/api_blueprint_keyword.py rename to api/app/blueprints/keyword.py index a22abb3..cd00c53 100644 --- a/api/api_blueprint_keyword.py +++ b/api/app/blueprints/keyword.py @@ -2,11 +2,11 @@ import os from apiflask import APIBlueprint, abort -from db import db -from helper_functions import make_response, get_email_or_abort_401 -from auth import auth -from schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema -from models import Keyword +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401 +from app.auth import auth +from app.schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema +from app.models import Keyword keyword_blueprint = APIBlueprint('keyword', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -20,7 +20,8 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file def add_keyword(data): email = get_email_or_abort_401() - check_if_keyword_data_exists(data) + if not check_if_keyword_data_exists(data): + abort(400, message="Keyword missing") key = data['keyword'] @@ -47,14 +48,15 @@ def add_keyword(data): def remove_keyword(data): email = get_email_or_abort_401() - check_if_keyword_data_exists(data) + if not check_if_keyword_data_exists(data): + abort(400, message="Keyword missing") key = data['keyword'] check_keyword = db.session.query(Keyword).filter_by(keyword=key, email=email).first() if check_keyword is None: - return make_response({}, 500, "Keyword doesn't exist for this user") + return abort(500, "Keyword doesn't exist for this user") else: db.session.query(Keyword).filter_by(keyword=key, email=email).delete() db.session.commit() @@ -81,7 +83,9 @@ def get_keywords(): def check_if_keyword_data_exists(data): if "keyword" not in data: - abort(400, message="Keyword missing") + return False if data['keyword'] == "" or data['keyword'] is None: - abort(400, message="Keyword missing") + return False + + return True diff --git a/api/api_blueprint_portfolio.py b/api/app/blueprints/portfolio.py similarity index 85% rename from api/api_blueprint_portfolio.py rename to api/app/blueprints/portfolio.py index cf3c9b1..465ce47 100644 --- a/api/api_blueprint_portfolio.py +++ b/api/app/blueprints/portfolio.py @@ -2,10 +2,10 @@ import os from apiflask import APIBlueprint -from schema import PortfolioResponseSchema -from db import db -from helper_functions import make_response, get_email_or_abort_401 -from auth import auth +from app.schema import PortfolioResponseSchema +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401 +from app.auth import auth portfolio_blueprint = APIBlueprint('portfolio', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) diff --git a/api/api_blueprint_shares.py b/api/app/blueprints/shares.py similarity index 83% rename from api/api_blueprint_shares.py rename to api/app/blueprints/shares.py index 1c7c785..24656a1 100644 --- a/api/api_blueprint_shares.py +++ b/api/app/blueprints/shares.py @@ -2,11 +2,11 @@ import os from apiflask import APIBlueprint, abort -from auth import auth -from db import db -from helper_functions import make_response, get_email_or_abort_401 -from models import Share -from schema import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema +from app.auth import auth +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401 +from app.models import Share +from app.schema import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema shares_blueprint = APIBlueprint('share', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -20,7 +20,8 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file def add_symbol(data): email = get_email_or_abort_401() - check_if_symbol_data_exists(data) + if not check_if_symbol_data_exists(data): + abort(400, message="Symbol missing") symbol = data['symbol'] @@ -36,7 +37,7 @@ def add_symbol(data): return make_response(new_symbol.as_dict(), 200, "Successfully added symbol") else: - return make_response({}, 500, "Symbol already exist for this user") + abort(500, "Symbol already exist for this user") @shares_blueprint.route('/share', methods=['DELETE']) @@ -47,14 +48,15 @@ def add_symbol(data): def remove_symbol(data): email = get_email_or_abort_401() - check_if_symbol_data_exists(data) + if not check_if_symbol_data_exists(data): + abort(400, message="Symbol missing") symbol = data['symbol'] check_share = db.session.query(Share).filter_by(symbol=symbol, email=email).first() if check_share is None: - return make_response({}, 500, "Symbol doesn't exist for this user") + abort(500, "Symbol doesn't exist for this user") else: db.session.query(Share).filter_by(symbol=symbol, email=email).delete() db.session.commit() @@ -81,7 +83,9 @@ def get_symbol(): def check_if_symbol_data_exists(data): if "symbol" not in data: - abort(400, message="Symbol missing") + return False if data['symbol'] == "" or data['symbol'] is None: - abort(400, message="Symbol missing") + return False + + return True diff --git a/api/api_blueprint_telegram.py b/api/app/blueprints/telegram.py similarity index 73% rename from api/api_blueprint_telegram.py rename to api/app/blueprints/telegram.py index 1aa6f6e..5208d71 100644 --- a/api/api_blueprint_telegram.py +++ b/api/app/blueprints/telegram.py @@ -2,11 +2,11 @@ import os from apiflask import APIBlueprint, abort -from db import db -from helper_functions import make_response, get_email_or_abort_401 -from auth import auth -from schema import TelegramIdSchema, UsersSchema -from models import User +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401 +from app.auth import auth +from app.schema import TelegramIdSchema, UsersSchema +from app.models import User telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -20,15 +20,11 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file def add_keyword(data): email = get_email_or_abort_401() - check_if_telegram_user_id_data_exists(data) + if not check_if_telegram_user_id_data_exists(data): + abort(400, message="User ID missing") query_user = db.session.query(User).filter_by(email=email).first() - - if query_user is None: # Username doesn't exist - abort(500, message="Unable to login") - query_user.telegram_user_id = data['telegram_user_id'] - db.session.commit() return make_response(query_user.as_dict(), 200, "Successfully connected telegram user") @@ -36,7 +32,9 @@ def add_keyword(data): def check_if_telegram_user_id_data_exists(data): if "telegram_user_id" not in data: - abort(400, message="User ID missing") + return False if data['telegram_user_id'] == "" or data['telegram_user_id'] is None: - abort(400, message="User ID missing") + return False + + return True diff --git a/api/api_blueprint_transactions.py b/api/app/blueprints/transactions.py similarity index 67% rename from api/api_blueprint_transactions.py rename to api/app/blueprints/transactions.py index 054a032..33eae41 100644 --- a/api/api_blueprint_transactions.py +++ b/api/app/blueprints/transactions.py @@ -3,11 +3,11 @@ import datetime from apiflask import abort, APIBlueprint -from db import db -from helper_functions import make_response, get_email_or_abort_401 -from models import Transaction -from schema import TransactionSchema, TransactionResponseSchema -from auth import auth +from app.db import database as db +from app.helper_functions import make_response, get_email_or_abort_401 +from app.models import Transaction +from app.schema import TransactionSchema, TransactionResponseSchema +from app.auth import auth transaction_blueprint = APIBlueprint('transaction', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -21,7 +21,17 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file def add_transaction(data): email = get_email_or_abort_401() - check_if_transaction_data_exists(data) + if not check_if_symbol_data_exists(data): + abort(400, "Symbol missing") + + if not check_if_time_data_exists(data): + abort(400, "Time missing") + + if not check_if_count_data_exists(data): + abort(400, "Count missing") + + if not check_if_price_data_exists(data): + abort(400, "Price missing") new_transaction = Transaction( email=email, @@ -53,27 +63,41 @@ def get_transaction(): return make_response(return_transactions, 200, "Successfully loaded transactions") -def check_if_transaction_data_exists(data): +def check_if_symbol_data_exists(data): if "symbol" not in data: - abort(400, message="Symbol missing") + return False if data['symbol'] == "" or data['symbol'] is None: - abort(400, message="Symbol missing") + return False + return True + + +def check_if_time_data_exists(data): if "time" not in data: - abort(400, message="Time missing") + return False if data['time'] == "" or data['time'] is None: - abort(400, message="Time missing") + return False + return True + + +def check_if_count_data_exists(data): if "count" not in data: - abort(400, message="Count missing") + return False if data['count'] == "" or data['count'] is None: - abort(400, message="Count missing") + return False + return True + + +def check_if_price_data_exists(data): if "price" not in data: - abort(400, message="Price missing") + return False if data['price'] == "" or data['price'] is None: - abort(400, message="Price missing") + return False + + return True diff --git a/api/api_blueprint_user.py b/api/app/blueprints/user.py similarity index 75% rename from api/api_blueprint_user.py rename to api/app/blueprints/user.py index ad2ec2b..771bd60 100644 --- a/api/api_blueprint_user.py +++ b/api/app/blueprints/user.py @@ -1,14 +1,15 @@ import datetime import os +from flask import current_app import jwt from apiflask import APIBlueprint, abort -from db import db -from helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401 -from models import User -from schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema -from auth import auth +from app.db import database as db +from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401 +from app.models import User +from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema +from app.auth import auth users_blueprint = APIBlueprint('users', __name__, url_prefix='/api') __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) @@ -45,8 +46,11 @@ def user(): @users_blueprint.input(schema=LoginDataSchema) @users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error") def login(data): - check_if_email_data_exists(data) - check_if_password_data_exists(data) + if not check_if_password_data_exists(data): + abort(400, "Password missing") + + if not check_if_email_data_exists(data): + abort(400, "Email missing") email = data['email'] password = data['password'] @@ -59,10 +63,10 @@ def login(data): if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect abort(500, message="Unable to login") - if query_user.email == os.getenv("BOT_EMAIL"): - token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, os.getenv('SECRET_KEY'), "HS256") + if query_user.email == current_app.config['BOT_EMAIL']: + token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, current_app.config['SECRET_KEY'], "HS256") else: - token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, os.getenv('SECRET_KEY'), "HS256") + token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, current_app.config['SECRET_KEY'], "HS256") return make_response({"token": token}, 200, "Successfully logged in") @@ -72,9 +76,14 @@ def login(data): @users_blueprint.input(schema=RegisterDataSchema) @users_blueprint.doc(summary="Register", description="Registers user") def register(data): - check_if_email_data_exists(data) - check_if_username_data_exists(data) - check_if_password_data_exists(data) + if not check_if_email_data_exists(data): + abort(400, "Email missing") + + if not check_if_username_data_exists(data): + abort(400, "Username missing") + + if not check_if_password_data_exists(data): + abort(400, "Password missing") email = data['email'] username = data['username'] @@ -107,12 +116,10 @@ def update_user(data): query_user = db.session.query(User).filter_by(email=email).first() - if query_user is None: # Username doesn't exist - abort(500, message="Unable to login") - - if "password" in data and data['password'] is not None: + if check_if_password_data_exists(data): query_user.password = hash_password(data['password']) - if "username" in data and data['username'] is not None: + + if check_if_username_data_exists(data): query_user.username = data['username'] db.session.commit() @@ -128,8 +135,11 @@ def update_user(data): def set_admin(data): abort_if_no_admin() # Only admin users can do this - check_if_email_data_exists(data) - check_if_admin_data_exists(data) + if not check_if_email_data_exists(data): + abort(400, "Email missing") + + if not check_if_admin_data_exists(data): + abort(400, "Admin data missing") email = data['email'] admin = data['admin'] @@ -137,7 +147,7 @@ def set_admin(data): query_user = db.session.query(User).filter_by(email=email).first() if query_user is None: # Username doesn't exist - abort(500, message="Unable to login") + abort(500, message="Unable to update user") query_user.admin = admin db.session.commit() @@ -151,7 +161,8 @@ def set_admin(data): @users_blueprint.auth_required(auth) @users_blueprint.doc(summary="Delete user", description="Deletes user by username") def delete_user(data): - check_if_email_data_exists(data) + if not check_if_email_data_exists(data): + abort(400, "Email missing") email = data['email'] @@ -169,31 +180,39 @@ def delete_user(data): def check_if_email_data_exists(data): if "email" not in data: - abort(400, message="Email missing") + return False if data['email'] == "" or data['email'] is None: - abort(400, message="Email missing") + return False + + return True def check_if_password_data_exists(data): if "password" not in data: - abort(400, message="Password missing") + return False if data['password'] == "" or data['password'] is None: - abort(400, message="Password missing") + return False + + return True def check_if_username_data_exists(data): if "username" not in data: - abort(400, message="Username missing") + return False if data['username'] == "" or data['username'] is None: - abort(400, message="Username missing") + return False + + return True def check_if_admin_data_exists(data): if "admin" not in data: - abort(400, message="Admin state missing") + return False if data['admin'] == "" or data['admin'] is None: - abort(400, message="Admin state missing") + return False + + return True diff --git a/api/app/config/flask.cfg b/api/app/config/flask.cfg new file mode 100644 index 0000000..1a6347c --- /dev/null +++ b/api/app/config/flask.cfg @@ -0,0 +1,26 @@ +import os +from app.schema import BaseResponseSchema + +# Flask settings +SECRET_KEY = os.getenv('SECRET_KEY', 'secret string') + + +# Flask-SQLAlchemy settings +SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot') +SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning +SQLALCHEMY_ENGINE_OPTIONS = { + 'pool_size': 100, + 'pool_recycle': 240 # 4 minutes +} + +# openapi/Swagger config +BASE_RESPONSE_DATA_KEY = "data" +BASE_RESPONSE_SCHEMA = BaseResponseSchema + +BOT_EMAIL = os.getenv('BOT_EMAIL') +BOT_USERNAME = os.getenv('BOT_USERNAME') +BOT_PASSWORD = os.getenv('BOT_PASSWORD') + +ADMIN_EMAIL = os.getenv('ADMIN_EMAIL') +ADMIN_USERNAME = os.getenv('ADMIN_USERNAME') +ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD') diff --git a/api/app/config/flask_test.cfg b/api/app/config/flask_test.cfg new file mode 100644 index 0000000..2cc085b --- /dev/null +++ b/api/app/config/flask_test.cfg @@ -0,0 +1,26 @@ +import os +from app.schema import BaseResponseSchema + +# Flask settings +SECRET_KEY = os.getenv('SECRET_KEY', 'secret string') + + +# Flask-SQLAlchemy settings +SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot_test') +SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning +SQLALCHEMY_ENGINE_OPTIONS = { + 'pool_size': 100, + 'pool_recycle': 240 # 4 minutes +} + +# openapi/Swagger config +BASE_RESPONSE_DATA_KEY = "data" +BASE_RESPONSE_SCHEMA = BaseResponseSchema + +BOT_EMAIL = "bot1@example.com" +BOT_USERNAME = "bot1" +BOT_PASSWORD = "bot1" + +ADMIN_EMAIL = "admin1@example.com" +ADMIN_USERNAME = "admin1" +ADMIN_PASSWORD = "admin1" diff --git a/api/db.py b/api/app/db.py similarity index 63% rename from api/db.py rename to api/app/db.py index f0b13d6..890af1d 100644 --- a/api/db.py +++ b/api/app/db.py @@ -1,3 +1,3 @@ from flask_sqlalchemy import SQLAlchemy -db = SQLAlchemy() +database = SQLAlchemy() diff --git a/api/helper_functions.py b/api/app/helper_functions.py similarity index 72% rename from api/helper_functions.py rename to api/app/helper_functions.py index 4ebf10e..516d0e7 100644 --- a/api/helper_functions.py +++ b/api/app/helper_functions.py @@ -1,12 +1,12 @@ -import os +from flask import current_app import bcrypt import jwt from apiflask import abort from flask import request, jsonify -from db import db -from models import User +from app.db import database as db +from app.models import User def hash_password(password): @@ -17,27 +17,14 @@ def check_password(hashed_password, user_password): return bcrypt.checkpw(user_password, hashed_password) -def get_token(): - token = None - if 'Authorization' in request.headers: - token = request.headers['Authorization'].split(" ")[1] - - return token - - -def extract_token_data(token): - if token is not None: - try: - return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"]) - except jwt.PyJWTError: - return None - else: - return None - - def get_email_from_token_data(): if 'Authorization' in request.headers: - token = request.headers['Authorization'].split(" ")[1] + token = request.headers['Authorization'].split(" ") + + if len(token) < 2: + return None + else: + token = token[1] if token is not None: if ':' in token: # Maybe bot token, check if token valid and return username after ":" then @@ -45,7 +32,7 @@ def get_email_from_token_data(): token = token.split(":")[0] try: - if jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] == os.getenv("BOT_EMAIL"): + if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']: res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first() if res is not None: @@ -59,7 +46,7 @@ def get_email_from_token_data(): else: # "Normal" token, extract username from token try: - return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] + return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] except jwt.PyJWTError: return None diff --git a/api/models.py b/api/app/models.py similarity index 98% rename from api/models.py rename to api/app/models.py index d85e49c..6c06c7b 100644 --- a/api/models.py +++ b/api/app/models.py @@ -1,4 +1,4 @@ -from db import db +from app.db import database as db class User(db.Model): diff --git a/api/schema.py b/api/app/schema.py similarity index 100% rename from api/schema.py rename to api/app/schema.py diff --git a/api/config.py b/api/config.py deleted file mode 100644 index a12c60a..0000000 --- a/api/config.py +++ /dev/null @@ -1,55 +0,0 @@ -import os - -from dotenv import load_dotenv - -from schema import BaseResponseSchema - -load_dotenv() - - -class ConfigClass(object): - """ Flask application config """ - - # Flask settings - SECRET_KEY = os.getenv('SECRET_KEY') - - # Flask-SQLAlchemy settings - SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + \ - os.getenv('MYSQL_USER') + ":" + \ - os.getenv('MYSQL_PASSWORD') + "@" + \ - os.getenv('MYSQL_HOST') + ":" + \ - (os.getenv("MYSQL_PORT") or str(3306)) + "/" + \ - os.getenv('MYSQL_DATABASE') - SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning - SQLALCHEMY_ENGINE_OPTIONS = { - 'pool_size': 100, - 'pool_recycle': 240 # 4 minutes - } - - # openapi/Swagger config - SERVERS = [ - { - "name": "Production", - "url": "https://aktienbot.flokaiser.com" - }, - { - "name": "Local", - "url": "http://127.0.0.1:5000" - } - ] - INFO = { - 'description': 'Webengineering 2 | Telegram Aktienbot', - 'version': '0.0.1' - # 'termsOfService': 'http://example.com', - # 'contact': { - # 'name': 'API Support', - # 'url': 'http://www.example.com/support', - # 'email': 'support@example.com' - # }, - # 'license': { - # 'name': 'Apache 2.0', - # 'url': 'http://www.apache.org/licenses/LICENSE-2.0.html' - # } - } - BASE_RESPONSE_DATA_KEY = "data" - BASE_RESPONSE_SCHEMA = BaseResponseSchema diff --git a/api/requirements.txt b/api/requirements.txt index 57da142..a73b603 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -7,4 +7,7 @@ pymysql==1.0.2 pyjwt==2.3.0 apiflask==0.12.0 flask-cors==3.0.10 -bcrypt==3.2.0 \ No newline at end of file +bcrypt==3.2.0 +pytest~=7.1.1 +pytest-cov +marshmallow~=3.15.0 \ No newline at end of file diff --git a/api/tests/conftest.py b/api/tests/conftest.py new file mode 100644 index 0000000..e1f32fc --- /dev/null +++ b/api/tests/conftest.py @@ -0,0 +1,105 @@ +import pytest +from app import create_app, db +from app.models import User, Transaction, Keyword, Share + +from app.helper_functions import hash_password + + +@pytest.fixture(scope='module') +def new_user(): + user = User( + email="user@example.com", + username="user", + password=hash_password("password"), + admin=False + ) + return user + + +@pytest.fixture(scope='module') +def new_transaction(): + transaction = Transaction( + email="user@example.com", + symbol="DTEGY", + time="2022-03-29T10:00:00.000Z", + count=10, + price=9.99 + ) + return transaction + + +@pytest.fixture(scope='module') +def new_keyword(): + keyword = Keyword( + email="user@example.com", + keyword="Elon Musk", + ) + return keyword + + +@pytest.fixture(scope='module') +def new_share(): + share = Share( + email="user@example.com", + symbol="DTEGY", + ) + return share + + +@pytest.fixture(scope='module') +def test_client(): + flask_app = create_app('config/flask_test.cfg') + + # Create a test client using the Flask application configured for testing + with flask_app.test_client() as testing_client: + # Establish an application context + with flask_app.app_context(): + yield testing_client # this is where the testing happens! + + +@pytest.fixture(scope='function') +def init_database(test_client): + # Create the database and the database table + db.create_all() + + # Insert user data + user1 = User( + email="user1@example.com", + username="user1", + password=hash_password("password"), + telegram_user_id="12345678", + admin=False + ) + user2 = User( + email="user2@example.com", + username="user2", + password=hash_password("password"), + telegram_user_id="87654321", + admin=False + ) + admin = User( + email="admin1@example.com", + username="admin1", + password=hash_password("admin1"), + telegram_user_id="00000000", + admin=True + ) + bot = User( + email="bot1@example.com", + username="bot1", + password=hash_password("bot1"), + telegram_user_id="00000000", + admin=False + ) + db.session.add(user1) + db.session.add(user2) + db.session.add(admin) + db.session.add(bot) + + # Commit the changes for the users + db.session.commit() + + yield # this is where the testing happens! + + db.session.commit() + db.drop_all() diff --git a/api/tests/functional/__init__.py b/api/tests/functional/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/functional/helper_functions.py b/api/tests/functional/helper_functions.py new file mode 100644 index 0000000..9e68226 --- /dev/null +++ b/api/tests/functional/helper_functions.py @@ -0,0 +1,11 @@ +import json + + +def get_token(test_client, email, password): + response = test_client.post('/api/user/login', data=json.dumps(dict(email=email, password=password)), content_type='application/json') + + if "data" in json.loads(response.data): + if "token" in json.loads(response.data)["data"]: + return json.loads(response.data)["data"]["token"] + + return "" \ No newline at end of file diff --git a/api/tests/functional/test_keyword.py b/api/tests/functional/test_keyword.py new file mode 100644 index 0000000..3517c18 --- /dev/null +++ b/api/tests/functional/test_keyword.py @@ -0,0 +1,182 @@ +""" +This file (test_keyword.py) contains the functional tests for the `keyword` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_add_keyword_not_logged_in(test_client, init_database): + """ + Test POST '/api/keyword' + + User is not logged in + """ + response = test_client.post('/api/keyword') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_add_keyword_user1_logged_in(test_client, init_database): + """ + Test POST '/api/keyword' + + User1 is logged in + """ + response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully added keyword' in response.data + + +def test_add_keyword_user1_logged_in_but_keyword_exist(test_client, init_database): + """ + Test POST '/api/keyword' + + User1 is logged in + Add keyword two times + """ + test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 500 + assert b'Keyword already exist for this user' in response.data + + +def test_add_keyword_user1_logged_in_but_keyword_missing(test_client, init_database): + """ + Test POST '/api/keyword' + + User1 is logged in + Keyword is missing in post data + """ + response = test_client.post('/api/keyword', data=json.dumps(dict()), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_add_keyword_user1_logged_in_but_keyword_empty(test_client, init_database): + """ + Test POST '/api/keyword' + + User1 is logged in + Keyword is empty in post data + """ + response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_get_keyword_not_logged_in(test_client, init_database): + """ + Test GET '/api/keyword' + + User is not logged in + """ + response = test_client.get('/api/keywords') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_keyword_user1_logged_in_empty_response(test_client, init_database): + """ + Test GET '/api/keyword' + + User1 is logged in + Empty response + """ + response = test_client.get('/api/keywords', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'"data":[' in response.data + + +def test_get_keyword_user1_logged_in_response_data(test_client, init_database): + """ + Test GET '/api/keyword' + + User1 is logged in + Create some keywords for user1 + """ + test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.get('/api/keywords', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'"data":[' in response.data + + +def test_delete_keyword_not_logged_in(test_client, init_database): + """ + Test DELETE '/api/keyword' + + User is not logged in + """ + response = test_client.delete('/api/keyword') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_delete_keyword_user1_logged_in_but_empty_keyword(test_client, init_database): + """ + Test DELETE '/api/keyword' + + User1 is logged in + Keyword empty in in delete data + """ + response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_keyword_user1_logged_in_but_missing_keyword(test_client, init_database): + """ + Test DELETE '/api/keyword' + + User1 is logged in + Keyword missing in in delete data + """ + response = test_client.delete('/api/keyword', data=json.dumps(dict()), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_keyword_user1_logged_in_keyword_exists(test_client, init_database): + """ + Test DELETE '/api/keyword' + + User1 is logged in + Keyword exists + """ + test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully removed keyword' in response.data + + +def test_delete_keyword_user1_logged_in_keyword_not_exists(test_client, init_database): + """ + Test DELETE '/api/keyword' + + User1 is logged in + Keyword doesn't exists + """ + response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 500 + assert b'Keyword doesn\'t exist for this user' in response.data diff --git a/api/tests/functional/test_portfolio.py b/api/tests/functional/test_portfolio.py new file mode 100644 index 0000000..f240c93 --- /dev/null +++ b/api/tests/functional/test_portfolio.py @@ -0,0 +1,50 @@ +""" +This file (test_portfolio.py) contains the functional tests for the `portfolio` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_get_portfolio_not_logged_in_empty_response(test_client, init_database): + """ + Test GET '/api/portfolio' + + User is not logged in + """ + response = test_client.get('/api/portfolio') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_portfolio_user1_logged_in_empty_response(test_client, init_database): + """ + Test GET '/api/portfolio' + + User1 is logged in + Empty response + """ + response = test_client.get('/api/portfolio', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'"data":[]' in response.data + assert b'Successfully loaded symbols' in response.data + + +def test_get_portfolio_user1_logged_in_response_data(test_client, init_database): + """ + Test GET '/api/portfolio' + + User1 is logged in + Create transaction data + """ + test_client.post('/api/transaction', data=json.dumps(dict(count=5, price=9.99, symbol="DTEGY", time="2022-03-29T10:00:00.000Z")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + + response = test_client.get('/api/portfolio', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'"data":[]' not in response.data + assert b'"data":[' in response.data diff --git a/api/tests/functional/test_share.py b/api/tests/functional/test_share.py new file mode 100644 index 0000000..8df2ced --- /dev/null +++ b/api/tests/functional/test_share.py @@ -0,0 +1,182 @@ +""" +This file (test_share.py) contains the functional tests for the `share` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_add_share_not_logged_in(test_client, init_database): + """ + Test POST '/api/share' + + User is not logged in + """ + response = test_client.post('/api/share') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_add_share_user1_logged_in(test_client, init_database): + """ + Test POST '/api/share' + + User1 is logged in + """ + response = test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully added symbol' in response.data + + +def test_add_share_user1_logged_in_but_symbol_exist(test_client, init_database): + """ + Test POST '/api/share' + + User1 is logged in + Add symbol two times + """ + test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 500 + assert b'Symbol already exist for this user' in response.data + + +def test_add_share_user1_logged_in_but_symbol_missing(test_client, init_database): + """ + Test POST '/api/share' + + User1 is logged in + Symbol is missing in post data + """ + response = test_client.post('/api/share', data=json.dumps(dict()), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_add_share_user1_logged_in_but_symbol_empty(test_client, init_database): + """ + Test POST '/api/share' + + User1 is logged in + Symbol is empty in post data + """ + response = test_client.post('/api/share', data=json.dumps(dict(symbol="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_get_share_not_logged_in(test_client, init_database): + """ + Test GET '/api/share' + + User is not logged in + """ + response = test_client.get('/api/shares') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_share_user1_logged_in_empty_response(test_client, init_database): + """ + Test GET '/api/share' + + User1 is logged in + Empty response + """ + response = test_client.get('/api/shares', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'"data":[' in response.data + + +def test_get_share_user1_logged_in_response_data(test_client, init_database): + """ + Test GET '/api/share' + + User1 is logged in + Create some symbols for user1 + """ + test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.get('/api/shares', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'"data":[' in response.data + + +def test_delete_share_not_logged_in(test_client, init_database): + """ + Test DELETE '/api/share' + + User is not logged in + """ + response = test_client.delete('/api/share') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_delete_share_user1_logged_in_but_empty_symbol(test_client, init_database): + """ + Test DELETE '/api/share' + + User1 is logged in + Symbol empty in in delete data + """ + response = test_client.delete('/api/share', data=json.dumps(dict(symbol="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_share_user1_logged_in_but_missing_symbol(test_client, init_database): + """ + Test DELETE '/api/share' + + User1 is logged in + Symbol missing in in delete data + """ + response = test_client.delete('/api/share', data=json.dumps(dict()), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_share_user1_logged_in_symbol_exists(test_client, init_database): + """ + Test DELETE '/api/share' + + User1 is logged in + Symbol exists + """ + test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.delete('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully removed symbol' in response.data + + +def test_delete_share_user1_logged_in_symbol_not_exists(test_client, init_database): + """ + Test DELETE '/api/share' + + User1 is logged in + Symbol doesn't exists + """ + response = test_client.delete('/api/share', data=json.dumps(dict(symbol="DTEGY")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 500 + assert b'Symbol doesn\'t exist for this user' in response.data diff --git a/api/tests/functional/test_telegram.py b/api/tests/functional/test_telegram.py new file mode 100644 index 0000000..34acee7 --- /dev/null +++ b/api/tests/functional/test_telegram.py @@ -0,0 +1,57 @@ +""" +This file (test_telegram.py) contains the functional tests for the `telegram` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_add_telegram_not_logged_in(test_client, init_database): + """ + Test POST '/api/telegram' + + User is not logged in + """ + response = test_client.post('/api/telegram') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_add_telegram_user1_logged_in(test_client, init_database): + """ + Test POST '/api/telegram' + + User1 is logged in + """ + response = test_client.post('/api/telegram', data=json.dumps(dict(telegram_user_id="12345678")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully connected telegram user' in response.data + + +def test_add_telegram_user1_logged_in_user_data_missing(test_client, init_database): + """ + Test POST '/api/telegram' + + User1 is logged in + telegram_user_id is missing + """ + response = test_client.post('/api/telegram', data=json.dumps(dict()), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_add_telegram_user1_logged_in_user_data_empty(test_client, init_database): + """ + Test POST '/api/telegram' + + User1 is logged in + telegram_user_id is empty + """ + response = test_client.post('/api/telegram', data=json.dumps(dict(telegram_user_id="")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data diff --git a/api/tests/functional/test_transaction.py b/api/tests/functional/test_transaction.py new file mode 100644 index 0000000..cb7b9ad --- /dev/null +++ b/api/tests/functional/test_transaction.py @@ -0,0 +1,94 @@ +""" +This file (test_transaction.py) contains the functional tests for the `transaction` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_add_transaction_not_logged_in(test_client, init_database): + """ + Test POST '/api/transaction' + + User is not logged in + """ + response = test_client.get('/api/portfolio') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_add_transaction_user1_logged_in_missing_data(test_client, init_database): + """ + Test POST '/api/transaction' + + User1 is logged in + Data missing + """ + # symbol missing + response = test_client.post('/api/transaction', data=json.dumps(dict(time="2022-03-29T10:00:00.000Z", count=10, price=9.99)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + # time missing + response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", count=10, price=9.99)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + # count missing + response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", time="2022-03-29T10:00:00.000Z", price=9.99)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + # price missing + response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", time="2022-03-29T10:00:00.000Z", count=10)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_get_transaction_not_logged_in(test_client, init_database): + """ + Test GET '/api/transaction' + + User is not logged in + """ + response = test_client.get('/api/transactions') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_transaction_user1_logged_in_empty_response(test_client, init_database): + """ + Test GET '/api/transaction' + + User1 is logged in + """ + response = test_client.get('/api/transactions', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully loaded transactions' in response.data + + +def test_get_transaction_user1_logged_in_response_data(test_client, init_database): + """ + Test GET '/api/transaction' + + User1 is logged in + Create transaction + """ + test_client.post('/api/transaction', data=json.dumps(dict(count=5, price=9.99, symbol="DTEGY", time="2022-03-29T10:00:00.000Z")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + + response = test_client.get('/api/transactions', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully loaded transactions' in response.data diff --git a/api/tests/functional/test_user.py b/api/tests/functional/test_user.py new file mode 100644 index 0000000..8b57338 --- /dev/null +++ b/api/tests/functional/test_user.py @@ -0,0 +1,504 @@ +""" +This file (test_user.py) contains the functional tests for the `users` blueprint. +""" +import json +from tests.functional.helper_functions import get_token + + +def test_login_with_valid_data(test_client, init_database): + """ + Test POST '/api/user/login' + + Valid data + """ + response = test_client.post('/api/user/login', data=json.dumps(dict(email="user1@example.com", password="password")), content_type='application/json') + assert response.status_code == 200 + assert b'Successfully logged in' in response.data + + +def test_login_with_wrong_password(test_client, init_database): + """ + Test POST '/api/user/login' + + Wrong password + """ + response = test_client.post('/api/user/login', data=json.dumps(dict(email="user2@example.com", password="password2")), content_type='application/json') + assert response.status_code == 500 + assert b'Unable to login' in response.data + + +def test_login_user_not_exist(test_client, init_database): + """ + Test POST '/api/user/login' + + User doesn't exist + """ + response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json') + assert response.status_code == 500 + assert b'Unable to login' in response.data + + +def test_login_email_missing(test_client, init_database): + """ + Test POST '/api/user/login' + + Email missing + """ + response = test_client.post('/api/user/login', data=json.dumps(dict(password="password")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_login_password_missing(test_client, init_database): + """ + Test POST '/api/user/login' + + Password missing + """ + response = test_client.post('/api/user/login', data=json.dumps(dict(email="user1@example.com")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_register_valid_data(test_client, init_database): + """ + Test POST '/api/user/register' + + Valid data + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + assert response.status_code == 200 + assert b'Successfully registered user' in response.data + + +def test_register_user_exists_already(test_client, init_database): + """ + Test POST '/api/user/register' + + User exists already + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + assert response.status_code == 500 + assert b'Email already exist' in response.data + + +def test_register_email_missing(test_client, init_database): + """ + Test POST '/api/user/register' + + Email missing + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="user3")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_register_email_empty(test_client, init_database): + """ + Test POST '/api/user/register' + + Email empty + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="user3", email="")), content_type='application/json') + assert response.status_code == 400 + assert b'Not a valid email address' in response.data + + +def test_register_password_missing(test_client, init_database): + """ + Test POST '/api/user/register' + + Password missing + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", username="user3")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_register_password_empty(test_client, init_database): + """ + Test POST '/api/user/register' + + password empty + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", username="user3", password="")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_register_username_missing(test_client, init_database): + """ + Test POST '/api/user/register' + + Username missing + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", email="user3@example.com")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_register_username_empty(test_client, init_database): + """ + Test POST '/api/user/register' + + Username empty + """ + response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="", email="user3@example.com")), content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_user_not_logged_in(test_client, init_database): + """ + Test DELETE '/api/user' + + User is not logged in + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', data=json.dumps(dict(email="user3@example.com")), content_type='application/json') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_delete_user_same_user_logged_in(test_client, init_database): + """ + Test DELETE '/api/user' + + User3 is logged in + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', + data=json.dumps(dict(email="user3@example.com")), + content_type='application/json', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))}) + assert response.status_code == 200 + assert b'Successfully removed user' in response.data + + +def test_delete_user_different_user_logged_in_no_admin(test_client, init_database): + """ + Test DELETE '/api/user' + + Different user is logged in -> no admin + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', + data=json.dumps(dict(email="user3@example.com")), + content_type='application/json', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 401 + assert b'Only admin users can access this' in response.data + + +def test_delete_user_different_user_logged_in_admin(test_client, init_database): + """ + Test DELETE '/api/user' + + Different user is logged in -> admin + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', + data=json.dumps(dict(email="user3@example.com")), + content_type='application/json', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}) + assert response.status_code == 200 + assert b'Successfully removed user' in response.data + + +def test_delete_user_email_missing(test_client, init_database): + """ + Test DELETE '/api/user' + + Email missing + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', + data=json.dumps(dict()), + content_type='application/json', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))}) + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_delete_user_email_empty(test_client, init_database): + """ + Test DELETE '/api/user' + + Email empty + """ + test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json') + response = test_client.delete('/api/user', + data=json.dumps(dict(email="")), + content_type='application/json', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))}) + assert response.status_code == 400 + assert b'Not a valid email address' in response.data + + +def test_get_current_user_user_not_logged_in(test_client, init_database): + """ + Test GET '/api/user' + + User is not logged in + """ + response = test_client.get('/api/user') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_current_user_user1_logged_in(test_client, init_database): + """ + Test GET '/api/user' + + User1 is logged in + """ + response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'user1' in response.data + assert b'user2' not in response.data + + +def test_get_current_user_bot_logged_in_user_exists(test_client, init_database): + """ + Test GET '/api/user' + + Bot1 is logged in and requests user 12345678 + """ + response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":12345678")}) + assert response.status_code == 200 + assert b'user1' in response.data + assert b'bot' not in response.data + + +def test_get_current_user_bot_logged_in_user_not_exists(test_client, init_database): + """ + Test GET '/api/user' + + Bot1 is logged in and requests user 1234 (not existing) + """ + response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":1234")}) + assert response.status_code == 401 + assert b'Unable to login' in response.data + + +def test_get_current_user_user1_logged_in_but_no_bot(test_client, init_database): + """ + Test GET '/api/user' + + User1 is logged in and requests user 1234 (not existing) + Fails because user1 is not a bot + """ + response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password") + ":12345678")}) + assert response.status_code == 401 + assert b'Unable to login' in response.data + + +def test_get_current_user_invalid_token(test_client, init_database): + """ + Test GET '/api/user' + + Invalid Bearer token + """ + response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format("invalidtoken:12345678")}) + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_update_user_not_logged_in(test_client, init_database): + """ + Test PUT '/api/user' + + User is not logged in + """ + response = test_client.put('/api/user') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_update_user1_logged_in_password_username(test_client, init_database): + """ + Test PUT '/api/user' + + User1 is logged in + Change Username and Password + """ + test_client.put('/api/user', data=json.dumps(dict(username="user4", password="password")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + response = test_client.get('/api/user', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'user4' in response.data + + +def test_update_user1_logged_in_password(test_client, init_database): + """ + Test PUT '/api/user' + + User1 is logged in + Change Password + """ + response = test_client.put('/api/user', data=json.dumps(dict(password="password123")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'Successfully updated user' in response.data + + +def test_update_user1_logged_in_username(test_client, init_database): + """ + Test PUT '/api/user' + + User1 is logged in + Change Username + """ + response = test_client.put('/api/user', data=json.dumps(dict(username="user1")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}) + assert response.status_code == 200 + assert b'Successfully updated user' in response.data + + +def test_set_admin_user_not_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + User is not logged in + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)), content_type='application/json') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_set_admin_user1_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + User1 is logged in (no admin) + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + assert response.status_code == 401 + assert b'Only admin users can access this' in response.data + + +def test_set_admin_admin1_logged_in(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully updated users admin rights' in response.data + + +def test_set_admin_admin1_logged_in_user_not_exist(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + notexistinguser@example.com does not exist + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="notexistinguser@example.com", admin=True)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 500 + assert b'Unable to update user' in response.data + + +def test_set_admin_admin1_logged_in_email_missing(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + email missing + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(admin=True)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_set_admin_admin1_logged_in_email_empty(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + email missing + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="", admin=True)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'Not a valid email address.' in response.data + + +def test_set_admin_admin1_logged_in_admin_missing(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + admin data missing + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com")), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'missing' in response.data + + +def test_set_admin_admin1_logged_in_admin_empty(test_client, init_database): + """ + Test PUT '/api/user/setAdmin' + + Admin1 is logged in (admin) + admin data missing + """ + response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=None)), + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 400 + assert b'Field may not be null' in response.data + + +def test_get_users_not_logged_in(test_client, init_database): + """ + Test GET '/api/users' + + User is not logged in + """ + response = test_client.get('/api/users') + assert response.status_code == 401 + assert b'Unauthorized' in response.data + + +def test_get_users_user1_logged_in(test_client, init_database): + """ + Test GET '/api/users' + + User1 is logged in (not admin) + """ + response = test_client.get('/api/users', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))}, + content_type='application/json') + + assert response.status_code == 401 + assert b'Only admin users can access this' in response.data + + +def test_get_users_admin1_logged_in(test_client, init_database): + """ + Test GET '/api/users' + + Admin1 is logged in (admin) + """ + response = test_client.get('/api/users', + headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))}, + content_type='application/json') + assert response.status_code == 200 + assert b'Successfully received all users' in response.data diff --git a/api/tests/pytest.ini b/api/tests/pytest.ini new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/unit/__init__.py b/api/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/unit/test_auth.py b/api/tests/unit/test_auth.py new file mode 100644 index 0000000..cf35577 --- /dev/null +++ b/api/tests/unit/test_auth.py @@ -0,0 +1,11 @@ +""" +This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file. +""" +from app.auth import * + + +def test_verify_token(): + """ + Test verify_token function + """ + assert verify_token(None) is False diff --git a/api/tests/unit/test_helper_functions.py b/api/tests/unit/test_helper_functions.py new file mode 100644 index 0000000..5cf054b --- /dev/null +++ b/api/tests/unit/test_helper_functions.py @@ -0,0 +1,20 @@ +""" +This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file. +""" +from app.helper_functions import * + + +def test_hash_password(): + """ + Test hash_password function + """ + assert hash_password("password") != "password" + + +def test_check_password(): + """ + Test check_password function + """ + hashed = hash_password("password") + assert check_password(hashed, "password".encode("utf-8")) is True + assert check_password(hashed, "password1".encode("utf-8")) is False diff --git a/api/tests/unit/test_models.py b/api/tests/unit/test_models.py new file mode 100644 index 0000000..e8a336a --- /dev/null +++ b/api/tests/unit/test_models.py @@ -0,0 +1,116 @@ +""" +This file (test_models.py) contains the unit tests for the models.py file. +""" +from app.models import User, Transaction, Keyword, Share + +from app.helper_functions import hash_password + + +def test_new_user(): + """ + GIVEN a User model + WHEN a new User is created + THEN check the email, password, username, telegram_user_id and admin fields are defined correctly + """ + user = User( + email="user@example.com", + username="user", + password=hash_password("password"), + admin=False + ) + assert user.email == 'user@example.com' + assert user.password != 'password' + assert user.username == "user" + assert user.telegram_user_id is None + assert user.admin is False + assert user.as_dict() == {'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False} + + +def test_new_user_with_fixture(new_user): + """ + GIVEN a User model + WHEN a new User is created + THEN check the email and password_hashed fields are defined correctly + """ + assert new_user.email == 'user@example.com' + assert new_user.password != 'password' + + +def test_new_transaction(): + """ + GIVEN a Transaction model + WHEN a new Transaction is created + THEN check the email, symbol, time count and price fields are defined correctly + """ + transaction = Transaction( + email="user@example.com", + symbol="DTEGY", + time="2022-03-29T10:00:00.000Z", + count=10, + price=9.99 + ) + assert transaction.email == 'user@example.com' + assert transaction.symbol == "DTEGY" + assert transaction.count == 10 + assert transaction.price == 9.99 + assert transaction.as_dict() == {'t_id': None, 'email': 'user@example.com', 'symbol': 'DTEGY', 'time': '2022-03-29T10:00:00.000Z', 'count': 10, 'price': 9.99} + + +def test_new_transaction_with_fixture(new_transaction): + """ + GIVEN a User model + WHEN a new User is created + THEN check the email and password_hashed fields are defined correctly + """ + assert new_transaction.email == 'user@example.com' + assert new_transaction.symbol == 'DTEGY' + + +def test_new_keyword(): + """ + GIVEN a Transaction model + WHEN a new Transaction is created + THEN check the email, symbol, time count and price fields are defined correctly + """ + keyword = Keyword( + email="user@example.com", + keyword="Elon Musk" + ) + assert keyword.email == 'user@example.com' + assert keyword.keyword == "Elon Musk" + assert keyword.as_dict() == {'s_id': None, 'email': 'user@example.com', 'keyword': 'Elon Musk'} + + +def test_new_keyword_with_fixture(new_keyword): + """ + GIVEN a User model + WHEN a new User is created + THEN check the email and password_hashed fields are defined correctly + """ + assert new_keyword.email == 'user@example.com' + assert new_keyword.keyword == 'Elon Musk' + + +def test_new_share(): + """ + GIVEN a Transaction model + WHEN a new Transaction is created + THEN check the email, symbol, time count and price fields are defined correctly + """ + share = Share( + email="user@example.com", + symbol="DTEGY" + ) + assert share.email == 'user@example.com' + assert share.symbol == "DTEGY" + assert share.as_dict() == {'a_id': None, 'email': 'user@example.com', 'symbol': 'DTEGY'} + + +def test_new_share_with_fixture(new_share): + """ + GIVEN a User model + WHEN a new User is created + THEN check the email and password_hashed fields are defined correctly + """ + assert new_share.email == 'user@example.com' + assert new_share.symbol == 'DTEGY' diff --git a/api/tests/unit/test_transaction.py b/api/tests/unit/test_transaction.py new file mode 100644 index 0000000..66f51ae --- /dev/null +++ b/api/tests/unit/test_transaction.py @@ -0,0 +1,40 @@ +""" +This file (test_transaction.py) contains the unit tests for the blueprints/transaction.py file. +""" +from app.blueprints.transactions import * + + +def test_check_if_symbol_data_exists(): + """ + Test check_if_symbol_data_exists function + """ + assert check_if_symbol_data_exists(dict(symbol='DTEGY')) is True + assert check_if_symbol_data_exists(dict(symbol='')) is False + assert check_if_symbol_data_exists(dict()) is False + + +def test_check_if_time_data_exists(): + """ + Test check_if_time_data_exists function + """ + assert check_if_time_data_exists(dict(time='2022-03-29T10:00:00.000Z')) is True + assert check_if_time_data_exists(dict(time='')) is False + assert check_if_time_data_exists(dict()) is False + + +def test_check_if_count_data_exists(): + """ + Test check_if_count_data_exists function + """ + assert check_if_count_data_exists(dict(count=10)) is True + assert check_if_count_data_exists(dict(count=None)) is False + assert check_if_count_data_exists(dict()) is False + + +def test_check_if_price_data_exists(): + """ + Test check_if_price_data_exists function + """ + assert check_if_price_data_exists(dict(price=9.99)) is True + assert check_if_price_data_exists(dict(price=None)) is False + assert check_if_price_data_exists(dict()) is False diff --git a/api/tests/unit/test_user.py b/api/tests/unit/test_user.py new file mode 100644 index 0000000..91ad580 --- /dev/null +++ b/api/tests/unit/test_user.py @@ -0,0 +1,40 @@ +""" +This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file. +""" +from app.blueprints.user import * + + +def test_check_if_email_data_exists(): + """ + Test check_if_email_data_exists function + """ + assert check_if_email_data_exists(dict(email='user@example.com')) is True + assert check_if_email_data_exists(dict(email='')) is False + assert check_if_email_data_exists(dict()) is False + + +def test_check_if_password_data_exists(): + """ + Test check_if_password_data_exists function + """ + assert check_if_password_data_exists(dict(password='password')) is True + assert check_if_password_data_exists(dict(password='')) is False + assert check_if_password_data_exists(dict()) is False + + +def test_check_if_username_data_exists(): + """ + Test check_if_username_data_exists function + """ + assert check_if_username_data_exists(dict(username='user')) is True + assert check_if_username_data_exists(dict(username='')) is False + assert check_if_username_data_exists(dict()) is False + + +def test_check_if_admin_data_exists(): + """ + Test check_if_admin_data_exists function + """ + assert check_if_admin_data_exists(dict(admin=True)) is True + assert check_if_admin_data_exists(dict(admin=None)) is False + assert check_if_admin_data_exists(dict()) is False