Merge pull request #32 from WebEngineering2/api_tests

Added unit and functional tests
This commit is contained in:
Florian Kaiser 2022-03-30 11:14:04 +02:00 committed by GitHub
commit 983c01f67d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1697 additions and 238 deletions

View File

@ -10,6 +10,16 @@ Aktienbot API
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
4. Run api `python api/app.py`
## Testing
1. Create virtual environment `python -m venv venv env/Scripts/activate`
2. Install requirements `pip install -r api/requirements.txt`
3. Set environment variables (see list below)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
4. Change directory: `cd api/`
5. Run tests: `python -m pytest -v --cov-report term-missing --cov=app`
## Environment variables
```
# Flask secret key

View File

@ -1,72 +1,6 @@
import os
from app import create_app
from apiflask import APIFlask
from dotenv import load_dotenv
from flask_cors import CORS
from api_blueprint_keyword import keyword_blueprint
from api_blueprint_portfolio import portfolio_blueprint
from api_blueprint_shares import shares_blueprint
from api_blueprint_transactions import transaction_blueprint
from api_blueprint_telegram import telegram_blueprint
from helper_functions import hash_password
from models import *
from api_blueprint_user import users_blueprint
def create_app():
load_dotenv()
# Create Flask app load app.config
application = APIFlask(__name__, openapi_blueprint_url_prefix='/api')
application.config.from_object("config.ConfigClass")
CORS(application, resources={r"*": {"origins": "*"}})
application.app_context().push()
db.init_app(application)
# api blueprints
application.register_blueprint(keyword_blueprint)
application.register_blueprint(shares_blueprint)
application.register_blueprint(transaction_blueprint)
application.register_blueprint(portfolio_blueprint)
application.register_blueprint(users_blueprint)
application.register_blueprint(telegram_blueprint)
@application.before_first_request
def init_database():
db.create_all()
if os.getenv("BOT_EMAIL") is not None and os.getenv("BOT_USERNAME") is not None and os.getenv("BOT_PASSWORD") is not None:
if db.session.query(User).filter_by(email=os.getenv("BOT_EMAIL")).first() is None: # Check if user already exist
bot = User(
email=os.getenv("BOT_EMAIL"),
username=os.getenv("BOT_USERNAME"),
password=hash_password(os.getenv("BOT_PASSWORD")),
admin=False
)
db.session.add(bot)
db.session.commit()
if os.getenv("ADMIN_EMAIL") is not None and os.getenv("ADMIN_USERNAME") is not None and os.getenv("ADMIN_PASSWORD") is not None:
if db.session.query(User).filter_by(email=os.getenv("ADMIN_EMAIL")).first() is None: # Check if user already exist
admin = User(
email=os.getenv("ADMIN_EMAIL"),
username=os.getenv("ADMIN_USERNAME"),
password=hash_password(os.getenv("ADMIN_PASSWORD")),
admin=True
)
db.session.add(admin)
db.session.commit()
return application
app = create_app()
# Start development web server
if __name__ == '__main__':
app.run()
# Call the application factory function to construct a Flask application
# instance using the development configuration
application = create_app('config/flask.cfg')
application.run()

67
api/app/__init__.py Normal file
View File

@ -0,0 +1,67 @@
from flask import current_app
from apiflask import APIFlask
from dotenv import load_dotenv
from flask_cors import CORS
from app.blueprints.keyword import keyword_blueprint
from app.blueprints.portfolio import portfolio_blueprint
from app.blueprints.shares import shares_blueprint
from app.blueprints.transactions import transaction_blueprint
from app.blueprints.telegram import telegram_blueprint
from app.blueprints.user import users_blueprint
from app.helper_functions import hash_password
from app.models import *
def create_app(config_filename=None):
load_dotenv()
# Create Flask app load app.config
application = APIFlask(__name__, openapi_blueprint_url_prefix='/api')
application.config.from_pyfile(config_filename)
CORS(application, resources={r"*": {"origins": "*"}})
application.app_context().push()
db.init_app(application)
# api blueprints
application.register_blueprint(keyword_blueprint)
application.register_blueprint(shares_blueprint)
application.register_blueprint(transaction_blueprint)
application.register_blueprint(portfolio_blueprint)
application.register_blueprint(users_blueprint)
application.register_blueprint(telegram_blueprint)
@application.before_first_request
def init_database():
db.create_all()
if current_app.config['BOT_EMAIL'] is not None and current_app.config['BOT_USERNAME'] is not None and current_app.config['BOT_PASSWORD'] is not None:
if db.session.query(User).filter_by(email=current_app.config['BOT_EMAIL']).first() is None: # Check if user already exist
bot = User(
email=current_app.config['BOT_EMAIL'],
username=current_app.config['BOT_USERNAME'],
password=hash_password(current_app.config['BOT_PASSWORD']),
admin=False
)
db.session.add(bot)
db.session.commit()
if current_app.config['ADMIN_EMAIL'] is not None and current_app.config['ADMIN_USERNAME'] is not None and current_app.config['ADMIN_PASSWORD'] is not None:
if db.session.query(User).filter_by(email=current_app.config['ADMIN_EMAIL']).first() is None: # Check if user already exist
admin = User(
email=current_app.config['ADMIN_EMAIL'],
username=current_app.config['ADMIN_USERNAME'],
password=hash_password(current_app.config['ADMIN_PASSWORD']),
admin=True
)
db.session.add(admin)
db.session.commit()
return application
app = create_app("config/flask.cfg")

View File

@ -1,4 +1,4 @@
import os
from flask import current_app
import jwt
from apiflask import HTTPTokenAuth
@ -15,7 +15,7 @@ def verify_token(token):
token = token.split(":")[0]
try:
jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
return True
except jwt.PyJWTError:
return False

View File

View File

@ -2,11 +2,11 @@ import os
from apiflask import APIBlueprint, abort
from db import db
from helper_functions import make_response, get_email_or_abort_401
from auth import auth
from schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
from models import Keyword
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth
from app.schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
from app.models import Keyword
keyword_blueprint = APIBlueprint('keyword', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -20,7 +20,8 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file
def add_keyword(data):
email = get_email_or_abort_401()
check_if_keyword_data_exists(data)
if not check_if_keyword_data_exists(data):
abort(400, message="Keyword missing")
key = data['keyword']
@ -47,14 +48,15 @@ def add_keyword(data):
def remove_keyword(data):
email = get_email_or_abort_401()
check_if_keyword_data_exists(data)
if not check_if_keyword_data_exists(data):
abort(400, message="Keyword missing")
key = data['keyword']
check_keyword = db.session.query(Keyword).filter_by(keyword=key, email=email).first()
if check_keyword is None:
return make_response({}, 500, "Keyword doesn't exist for this user")
return abort(500, "Keyword doesn't exist for this user")
else:
db.session.query(Keyword).filter_by(keyword=key, email=email).delete()
db.session.commit()
@ -81,7 +83,9 @@ def get_keywords():
def check_if_keyword_data_exists(data):
if "keyword" not in data:
abort(400, message="Keyword missing")
return False
if data['keyword'] == "" or data['keyword'] is None:
abort(400, message="Keyword missing")
return False
return True

View File

@ -2,10 +2,10 @@ import os
from apiflask import APIBlueprint
from schema import PortfolioResponseSchema
from db import db
from helper_functions import make_response, get_email_or_abort_401
from auth import auth
from app.schema import PortfolioResponseSchema
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth
portfolio_blueprint = APIBlueprint('portfolio', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))

View File

@ -2,11 +2,11 @@ import os
from apiflask import APIBlueprint, abort
from auth import auth
from db import db
from helper_functions import make_response, get_email_or_abort_401
from models import Share
from schema import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema
from app.auth import auth
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.models import Share
from app.schema import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema
shares_blueprint = APIBlueprint('share', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -20,7 +20,8 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file
def add_symbol(data):
email = get_email_or_abort_401()
check_if_symbol_data_exists(data)
if not check_if_symbol_data_exists(data):
abort(400, message="Symbol missing")
symbol = data['symbol']
@ -36,7 +37,7 @@ def add_symbol(data):
return make_response(new_symbol.as_dict(), 200, "Successfully added symbol")
else:
return make_response({}, 500, "Symbol already exist for this user")
abort(500, "Symbol already exist for this user")
@shares_blueprint.route('/share', methods=['DELETE'])
@ -47,14 +48,15 @@ def add_symbol(data):
def remove_symbol(data):
email = get_email_or_abort_401()
check_if_symbol_data_exists(data)
if not check_if_symbol_data_exists(data):
abort(400, message="Symbol missing")
symbol = data['symbol']
check_share = db.session.query(Share).filter_by(symbol=symbol, email=email).first()
if check_share is None:
return make_response({}, 500, "Symbol doesn't exist for this user")
abort(500, "Symbol doesn't exist for this user")
else:
db.session.query(Share).filter_by(symbol=symbol, email=email).delete()
db.session.commit()
@ -81,7 +83,9 @@ def get_symbol():
def check_if_symbol_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
return False
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")
return False
return True

View File

@ -2,11 +2,11 @@ import os
from apiflask import APIBlueprint, abort
from db import db
from helper_functions import make_response, get_email_or_abort_401
from auth import auth
from schema import TelegramIdSchema, UsersSchema
from models import User
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth
from app.schema import TelegramIdSchema, UsersSchema
from app.models import User
telegram_blueprint = APIBlueprint('telegram', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -20,15 +20,11 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file
def add_keyword(data):
email = get_email_or_abort_401()
check_if_telegram_user_id_data_exists(data)
if not check_if_telegram_user_id_data_exists(data):
abort(400, message="User ID missing")
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
query_user.telegram_user_id = data['telegram_user_id']
db.session.commit()
return make_response(query_user.as_dict(), 200, "Successfully connected telegram user")
@ -36,7 +32,9 @@ def add_keyword(data):
def check_if_telegram_user_id_data_exists(data):
if "telegram_user_id" not in data:
abort(400, message="User ID missing")
return False
if data['telegram_user_id'] == "" or data['telegram_user_id'] is None:
abort(400, message="User ID missing")
return False
return True

View File

@ -3,11 +3,11 @@ import datetime
from apiflask import abort, APIBlueprint
from db import db
from helper_functions import make_response, get_email_or_abort_401
from models import Transaction
from schema import TransactionSchema, TransactionResponseSchema
from auth import auth
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.models import Transaction
from app.schema import TransactionSchema, TransactionResponseSchema
from app.auth import auth
transaction_blueprint = APIBlueprint('transaction', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -21,7 +21,17 @@ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file
def add_transaction(data):
email = get_email_or_abort_401()
check_if_transaction_data_exists(data)
if not check_if_symbol_data_exists(data):
abort(400, "Symbol missing")
if not check_if_time_data_exists(data):
abort(400, "Time missing")
if not check_if_count_data_exists(data):
abort(400, "Count missing")
if not check_if_price_data_exists(data):
abort(400, "Price missing")
new_transaction = Transaction(
email=email,
@ -53,27 +63,41 @@ def get_transaction():
return make_response(return_transactions, 200, "Successfully loaded transactions")
def check_if_transaction_data_exists(data):
def check_if_symbol_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
return False
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")
return False
return True
def check_if_time_data_exists(data):
if "time" not in data:
abort(400, message="Time missing")
return False
if data['time'] == "" or data['time'] is None:
abort(400, message="Time missing")
return False
return True
def check_if_count_data_exists(data):
if "count" not in data:
abort(400, message="Count missing")
return False
if data['count'] == "" or data['count'] is None:
abort(400, message="Count missing")
return False
return True
def check_if_price_data_exists(data):
if "price" not in data:
abort(400, message="Price missing")
return False
if data['price'] == "" or data['price'] is None:
abort(400, message="Price missing")
return False
return True

View File

@ -1,14 +1,15 @@
import datetime
import os
from flask import current_app
import jwt
from apiflask import APIBlueprint, abort
from db import db
from helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401
from models import User
from schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema
from auth import auth
from app.db import database as db
from app.helper_functions import check_password, hash_password, abort_if_no_admin, make_response, get_email_or_abort_401
from app.models import User
from app.schema import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema, RegisterDataSchema, UpdateUserDataSchema
from app.auth import auth
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@ -45,8 +46,11 @@ def user():
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
def login(data):
check_if_email_data_exists(data)
check_if_password_data_exists(data)
if not check_if_password_data_exists(data):
abort(400, "Password missing")
if not check_if_email_data_exists(data):
abort(400, "Email missing")
email = data['email']
password = data['password']
@ -59,10 +63,10 @@ def login(data):
if not check_password(query_user.password, password.encode("utf-8")): # Password incorrect
abort(500, message="Unable to login")
if query_user.email == os.getenv("BOT_EMAIL"):
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, os.getenv('SECRET_KEY'), "HS256")
if query_user.email == current_app.config['BOT_EMAIL']:
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365)}, current_app.config['SECRET_KEY'], "HS256")
else:
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, os.getenv('SECRET_KEY'), "HS256")
token = jwt.encode({'email': query_user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)}, current_app.config['SECRET_KEY'], "HS256")
return make_response({"token": token}, 200, "Successfully logged in")
@ -72,9 +76,14 @@ def login(data):
@users_blueprint.input(schema=RegisterDataSchema)
@users_blueprint.doc(summary="Register", description="Registers user")
def register(data):
check_if_email_data_exists(data)
check_if_username_data_exists(data)
check_if_password_data_exists(data)
if not check_if_email_data_exists(data):
abort(400, "Email missing")
if not check_if_username_data_exists(data):
abort(400, "Username missing")
if not check_if_password_data_exists(data):
abort(400, "Password missing")
email = data['email']
username = data['username']
@ -107,12 +116,10 @@ def update_user(data):
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
if "password" in data and data['password'] is not None:
if check_if_password_data_exists(data):
query_user.password = hash_password(data['password'])
if "username" in data and data['username'] is not None:
if check_if_username_data_exists(data):
query_user.username = data['username']
db.session.commit()
@ -128,8 +135,11 @@ def update_user(data):
def set_admin(data):
abort_if_no_admin() # Only admin users can do this
check_if_email_data_exists(data)
check_if_admin_data_exists(data)
if not check_if_email_data_exists(data):
abort(400, "Email missing")
if not check_if_admin_data_exists(data):
abort(400, "Admin data missing")
email = data['email']
admin = data['admin']
@ -137,7 +147,7 @@ def set_admin(data):
query_user = db.session.query(User).filter_by(email=email).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
abort(500, message="Unable to update user")
query_user.admin = admin
db.session.commit()
@ -151,7 +161,8 @@ def set_admin(data):
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Delete user", description="Deletes user by username")
def delete_user(data):
check_if_email_data_exists(data)
if not check_if_email_data_exists(data):
abort(400, "Email missing")
email = data['email']
@ -169,31 +180,39 @@ def delete_user(data):
def check_if_email_data_exists(data):
if "email" not in data:
abort(400, message="Email missing")
return False
if data['email'] == "" or data['email'] is None:
abort(400, message="Email missing")
return False
return True
def check_if_password_data_exists(data):
if "password" not in data:
abort(400, message="Password missing")
return False
if data['password'] == "" or data['password'] is None:
abort(400, message="Password missing")
return False
return True
def check_if_username_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
return False
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
return False
return True
def check_if_admin_data_exists(data):
if "admin" not in data:
abort(400, message="Admin state missing")
return False
if data['admin'] == "" or data['admin'] is None:
abort(400, message="Admin state missing")
return False
return True

26
api/app/config/flask.cfg Normal file
View File

@ -0,0 +1,26 @@
import os
from app.schema import BaseResponseSchema
# Flask settings
SECRET_KEY = os.getenv('SECRET_KEY', 'secret string')
# Flask-SQLAlchemy settings
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 100,
'pool_recycle': 240 # 4 minutes
}
# openapi/Swagger config
BASE_RESPONSE_DATA_KEY = "data"
BASE_RESPONSE_SCHEMA = BaseResponseSchema
BOT_EMAIL = os.getenv('BOT_EMAIL')
BOT_USERNAME = os.getenv('BOT_USERNAME')
BOT_PASSWORD = os.getenv('BOT_PASSWORD')
ADMIN_EMAIL = os.getenv('ADMIN_EMAIL')
ADMIN_USERNAME = os.getenv('ADMIN_USERNAME')
ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD')

View File

@ -0,0 +1,26 @@
import os
from app.schema import BaseResponseSchema
# Flask settings
SECRET_KEY = os.getenv('SECRET_KEY', 'secret string')
# Flask-SQLAlchemy settings
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot_test')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 100,
'pool_recycle': 240 # 4 minutes
}
# openapi/Swagger config
BASE_RESPONSE_DATA_KEY = "data"
BASE_RESPONSE_SCHEMA = BaseResponseSchema
BOT_EMAIL = "bot1@example.com"
BOT_USERNAME = "bot1"
BOT_PASSWORD = "bot1"
ADMIN_EMAIL = "admin1@example.com"
ADMIN_USERNAME = "admin1"
ADMIN_PASSWORD = "admin1"

View File

@ -1,3 +1,3 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
database = SQLAlchemy()

View File

@ -1,12 +1,12 @@
import os
from flask import current_app
import bcrypt
import jwt
from apiflask import abort
from flask import request, jsonify
from db import db
from models import User
from app.db import database as db
from app.models import User
def hash_password(password):
@ -17,27 +17,14 @@ def check_password(hashed_password, user_password):
return bcrypt.checkpw(user_password, hashed_password)
def get_token():
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
return token
def extract_token_data(token):
if token is not None:
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
except jwt.PyJWTError:
return None
else:
return None
def get_email_from_token_data():
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
token = request.headers['Authorization'].split(" ")
if len(token) < 2:
return None
else:
token = token[1]
if token is not None:
if ':' in token: # Maybe bot token, check if token valid and return username after ":" then
@ -45,7 +32,7 @@ def get_email_from_token_data():
token = token.split(":")[0]
try:
if jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email'] == os.getenv("BOT_EMAIL"):
if jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email'] == current_app.config['BOT_EMAIL']:
res = db.session.query(User).filter_by(telegram_user_id=telegram_user_id).first()
if res is not None:
@ -59,7 +46,7 @@ def get_email_from_token_data():
else: # "Normal" token, extract username from token
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])['email']
return jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])['email']
except jwt.PyJWTError:
return None

View File

@ -1,4 +1,4 @@
from db import db
from app.db import database as db
class User(db.Model):

View File

@ -1,55 +0,0 @@
import os
from dotenv import load_dotenv
from schema import BaseResponseSchema
load_dotenv()
class ConfigClass(object):
""" Flask application config """
# Flask settings
SECRET_KEY = os.getenv('SECRET_KEY')
# Flask-SQLAlchemy settings
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + \
os.getenv('MYSQL_USER') + ":" + \
os.getenv('MYSQL_PASSWORD') + "@" + \
os.getenv('MYSQL_HOST') + ":" + \
(os.getenv("MYSQL_PORT") or str(3306)) + "/" + \
os.getenv('MYSQL_DATABASE')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 100,
'pool_recycle': 240 # 4 minutes
}
# openapi/Swagger config
SERVERS = [
{
"name": "Production",
"url": "https://aktienbot.flokaiser.com"
},
{
"name": "Local",
"url": "http://127.0.0.1:5000"
}
]
INFO = {
'description': 'Webengineering 2 | Telegram Aktienbot',
'version': '0.0.1'
# 'termsOfService': 'http://example.com',
# 'contact': {
# 'name': 'API Support',
# 'url': 'http://www.example.com/support',
# 'email': 'support@example.com'
# },
# 'license': {
# 'name': 'Apache 2.0',
# 'url': 'http://www.apache.org/licenses/LICENSE-2.0.html'
# }
}
BASE_RESPONSE_DATA_KEY = "data"
BASE_RESPONSE_SCHEMA = BaseResponseSchema

View File

@ -7,4 +7,7 @@ pymysql==1.0.2
pyjwt==2.3.0
apiflask==0.12.0
flask-cors==3.0.10
bcrypt==3.2.0
bcrypt==3.2.0
pytest~=7.1.1
pytest-cov
marshmallow~=3.15.0

105
api/tests/conftest.py Normal file
View File

@ -0,0 +1,105 @@
import pytest
from app import create_app, db
from app.models import User, Transaction, Keyword, Share
from app.helper_functions import hash_password
@pytest.fixture(scope='module')
def new_user():
user = User(
email="user@example.com",
username="user",
password=hash_password("password"),
admin=False
)
return user
@pytest.fixture(scope='module')
def new_transaction():
transaction = Transaction(
email="user@example.com",
symbol="DTEGY",
time="2022-03-29T10:00:00.000Z",
count=10,
price=9.99
)
return transaction
@pytest.fixture(scope='module')
def new_keyword():
keyword = Keyword(
email="user@example.com",
keyword="Elon Musk",
)
return keyword
@pytest.fixture(scope='module')
def new_share():
share = Share(
email="user@example.com",
symbol="DTEGY",
)
return share
@pytest.fixture(scope='module')
def test_client():
flask_app = create_app('config/flask_test.cfg')
# Create a test client using the Flask application configured for testing
with flask_app.test_client() as testing_client:
# Establish an application context
with flask_app.app_context():
yield testing_client # this is where the testing happens!
@pytest.fixture(scope='function')
def init_database(test_client):
# Create the database and the database table
db.create_all()
# Insert user data
user1 = User(
email="user1@example.com",
username="user1",
password=hash_password("password"),
telegram_user_id="12345678",
admin=False
)
user2 = User(
email="user2@example.com",
username="user2",
password=hash_password("password"),
telegram_user_id="87654321",
admin=False
)
admin = User(
email="admin1@example.com",
username="admin1",
password=hash_password("admin1"),
telegram_user_id="00000000",
admin=True
)
bot = User(
email="bot1@example.com",
username="bot1",
password=hash_password("bot1"),
telegram_user_id="00000000",
admin=False
)
db.session.add(user1)
db.session.add(user2)
db.session.add(admin)
db.session.add(bot)
# Commit the changes for the users
db.session.commit()
yield # this is where the testing happens!
db.session.commit()
db.drop_all()

View File

View File

@ -0,0 +1,11 @@
import json
def get_token(test_client, email, password):
response = test_client.post('/api/user/login', data=json.dumps(dict(email=email, password=password)), content_type='application/json')
if "data" in json.loads(response.data):
if "token" in json.loads(response.data)["data"]:
return json.loads(response.data)["data"]["token"]
return ""

View File

@ -0,0 +1,182 @@
"""
This file (test_keyword.py) contains the functional tests for the `keyword` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_add_keyword_not_logged_in(test_client, init_database):
"""
Test POST '/api/keyword'
User is not logged in
"""
response = test_client.post('/api/keyword')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_add_keyword_user1_logged_in(test_client, init_database):
"""
Test POST '/api/keyword'
User1 is logged in
"""
response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully added keyword' in response.data
def test_add_keyword_user1_logged_in_but_keyword_exist(test_client, init_database):
"""
Test POST '/api/keyword'
User1 is logged in
Add keyword two times
"""
test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 500
assert b'Keyword already exist for this user' in response.data
def test_add_keyword_user1_logged_in_but_keyword_missing(test_client, init_database):
"""
Test POST '/api/keyword'
User1 is logged in
Keyword is missing in post data
"""
response = test_client.post('/api/keyword', data=json.dumps(dict()),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_add_keyword_user1_logged_in_but_keyword_empty(test_client, init_database):
"""
Test POST '/api/keyword'
User1 is logged in
Keyword is empty in post data
"""
response = test_client.post('/api/keyword', data=json.dumps(dict(keyword="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_get_keyword_not_logged_in(test_client, init_database):
"""
Test GET '/api/keyword'
User is not logged in
"""
response = test_client.get('/api/keywords')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_keyword_user1_logged_in_empty_response(test_client, init_database):
"""
Test GET '/api/keyword'
User1 is logged in
Empty response
"""
response = test_client.get('/api/keywords', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'"data":[' in response.data
def test_get_keyword_user1_logged_in_response_data(test_client, init_database):
"""
Test GET '/api/keyword'
User1 is logged in
Create some keywords for user1
"""
test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.get('/api/keywords', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'"data":[' in response.data
def test_delete_keyword_not_logged_in(test_client, init_database):
"""
Test DELETE '/api/keyword'
User is not logged in
"""
response = test_client.delete('/api/keyword')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_delete_keyword_user1_logged_in_but_empty_keyword(test_client, init_database):
"""
Test DELETE '/api/keyword'
User1 is logged in
Keyword empty in in delete data
"""
response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_keyword_user1_logged_in_but_missing_keyword(test_client, init_database):
"""
Test DELETE '/api/keyword'
User1 is logged in
Keyword missing in in delete data
"""
response = test_client.delete('/api/keyword', data=json.dumps(dict()),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_keyword_user1_logged_in_keyword_exists(test_client, init_database):
"""
Test DELETE '/api/keyword'
User1 is logged in
Keyword exists
"""
test_client.post('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully removed keyword' in response.data
def test_delete_keyword_user1_logged_in_keyword_not_exists(test_client, init_database):
"""
Test DELETE '/api/keyword'
User1 is logged in
Keyword doesn't exists
"""
response = test_client.delete('/api/keyword', data=json.dumps(dict(keyword="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 500
assert b'Keyword doesn\'t exist for this user' in response.data

View File

@ -0,0 +1,50 @@
"""
This file (test_portfolio.py) contains the functional tests for the `portfolio` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_get_portfolio_not_logged_in_empty_response(test_client, init_database):
"""
Test GET '/api/portfolio'
User is not logged in
"""
response = test_client.get('/api/portfolio')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_portfolio_user1_logged_in_empty_response(test_client, init_database):
"""
Test GET '/api/portfolio'
User1 is logged in
Empty response
"""
response = test_client.get('/api/portfolio',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'"data":[]' in response.data
assert b'Successfully loaded symbols' in response.data
def test_get_portfolio_user1_logged_in_response_data(test_client, init_database):
"""
Test GET '/api/portfolio'
User1 is logged in
Create transaction data
"""
test_client.post('/api/transaction', data=json.dumps(dict(count=5, price=9.99, symbol="DTEGY", time="2022-03-29T10:00:00.000Z")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.get('/api/portfolio',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'"data":[]' not in response.data
assert b'"data":[' in response.data

View File

@ -0,0 +1,182 @@
"""
This file (test_share.py) contains the functional tests for the `share` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_add_share_not_logged_in(test_client, init_database):
"""
Test POST '/api/share'
User is not logged in
"""
response = test_client.post('/api/share')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_add_share_user1_logged_in(test_client, init_database):
"""
Test POST '/api/share'
User1 is logged in
"""
response = test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully added symbol' in response.data
def test_add_share_user1_logged_in_but_symbol_exist(test_client, init_database):
"""
Test POST '/api/share'
User1 is logged in
Add symbol two times
"""
test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 500
assert b'Symbol already exist for this user' in response.data
def test_add_share_user1_logged_in_but_symbol_missing(test_client, init_database):
"""
Test POST '/api/share'
User1 is logged in
Symbol is missing in post data
"""
response = test_client.post('/api/share', data=json.dumps(dict()),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_add_share_user1_logged_in_but_symbol_empty(test_client, init_database):
"""
Test POST '/api/share'
User1 is logged in
Symbol is empty in post data
"""
response = test_client.post('/api/share', data=json.dumps(dict(symbol="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_get_share_not_logged_in(test_client, init_database):
"""
Test GET '/api/share'
User is not logged in
"""
response = test_client.get('/api/shares')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_share_user1_logged_in_empty_response(test_client, init_database):
"""
Test GET '/api/share'
User1 is logged in
Empty response
"""
response = test_client.get('/api/shares', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'"data":[' in response.data
def test_get_share_user1_logged_in_response_data(test_client, init_database):
"""
Test GET '/api/share'
User1 is logged in
Create some symbols for user1
"""
test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.get('/api/shares', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'"data":[' in response.data
def test_delete_share_not_logged_in(test_client, init_database):
"""
Test DELETE '/api/share'
User is not logged in
"""
response = test_client.delete('/api/share')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_delete_share_user1_logged_in_but_empty_symbol(test_client, init_database):
"""
Test DELETE '/api/share'
User1 is logged in
Symbol empty in in delete data
"""
response = test_client.delete('/api/share', data=json.dumps(dict(symbol="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_share_user1_logged_in_but_missing_symbol(test_client, init_database):
"""
Test DELETE '/api/share'
User1 is logged in
Symbol missing in in delete data
"""
response = test_client.delete('/api/share', data=json.dumps(dict()),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_share_user1_logged_in_symbol_exists(test_client, init_database):
"""
Test DELETE '/api/share'
User1 is logged in
Symbol exists
"""
test_client.post('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.delete('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully removed symbol' in response.data
def test_delete_share_user1_logged_in_symbol_not_exists(test_client, init_database):
"""
Test DELETE '/api/share'
User1 is logged in
Symbol doesn't exists
"""
response = test_client.delete('/api/share', data=json.dumps(dict(symbol="DTEGY")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 500
assert b'Symbol doesn\'t exist for this user' in response.data

View File

@ -0,0 +1,57 @@
"""
This file (test_telegram.py) contains the functional tests for the `telegram` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_add_telegram_not_logged_in(test_client, init_database):
"""
Test POST '/api/telegram'
User is not logged in
"""
response = test_client.post('/api/telegram')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_add_telegram_user1_logged_in(test_client, init_database):
"""
Test POST '/api/telegram'
User1 is logged in
"""
response = test_client.post('/api/telegram', data=json.dumps(dict(telegram_user_id="12345678")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully connected telegram user' in response.data
def test_add_telegram_user1_logged_in_user_data_missing(test_client, init_database):
"""
Test POST '/api/telegram'
User1 is logged in
telegram_user_id is missing
"""
response = test_client.post('/api/telegram', data=json.dumps(dict()),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_add_telegram_user1_logged_in_user_data_empty(test_client, init_database):
"""
Test POST '/api/telegram'
User1 is logged in
telegram_user_id is empty
"""
response = test_client.post('/api/telegram', data=json.dumps(dict(telegram_user_id="")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data

View File

@ -0,0 +1,94 @@
"""
This file (test_transaction.py) contains the functional tests for the `transaction` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_add_transaction_not_logged_in(test_client, init_database):
"""
Test POST '/api/transaction'
User is not logged in
"""
response = test_client.get('/api/portfolio')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_add_transaction_user1_logged_in_missing_data(test_client, init_database):
"""
Test POST '/api/transaction'
User1 is logged in
Data missing
"""
# symbol missing
response = test_client.post('/api/transaction', data=json.dumps(dict(time="2022-03-29T10:00:00.000Z", count=10, price=9.99)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
# time missing
response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", count=10, price=9.99)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
# count missing
response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", time="2022-03-29T10:00:00.000Z", price=9.99)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
# price missing
response = test_client.post('/api/transaction', data=json.dumps(dict(symbol="DTEGY", time="2022-03-29T10:00:00.000Z", count=10)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_get_transaction_not_logged_in(test_client, init_database):
"""
Test GET '/api/transaction'
User is not logged in
"""
response = test_client.get('/api/transactions')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_transaction_user1_logged_in_empty_response(test_client, init_database):
"""
Test GET '/api/transaction'
User1 is logged in
"""
response = test_client.get('/api/transactions',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully loaded transactions' in response.data
def test_get_transaction_user1_logged_in_response_data(test_client, init_database):
"""
Test GET '/api/transaction'
User1 is logged in
Create transaction
"""
test_client.post('/api/transaction', data=json.dumps(dict(count=5, price=9.99, symbol="DTEGY", time="2022-03-29T10:00:00.000Z")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.get('/api/transactions',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully loaded transactions' in response.data

View File

@ -0,0 +1,504 @@
"""
This file (test_user.py) contains the functional tests for the `users` blueprint.
"""
import json
from tests.functional.helper_functions import get_token
def test_login_with_valid_data(test_client, init_database):
"""
Test POST '/api/user/login'
Valid data
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(email="user1@example.com", password="password")), content_type='application/json')
assert response.status_code == 200
assert b'Successfully logged in' in response.data
def test_login_with_wrong_password(test_client, init_database):
"""
Test POST '/api/user/login'
Wrong password
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(email="user2@example.com", password="password2")), content_type='application/json')
assert response.status_code == 500
assert b'Unable to login' in response.data
def test_login_user_not_exist(test_client, init_database):
"""
Test POST '/api/user/login'
User doesn't exist
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(email="notexistinguser@example.com", password="password")), content_type='application/json')
assert response.status_code == 500
assert b'Unable to login' in response.data
def test_login_email_missing(test_client, init_database):
"""
Test POST '/api/user/login'
Email missing
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(password="password")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_login_password_missing(test_client, init_database):
"""
Test POST '/api/user/login'
Password missing
"""
response = test_client.post('/api/user/login', data=json.dumps(dict(email="user1@example.com")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_register_valid_data(test_client, init_database):
"""
Test POST '/api/user/register'
Valid data
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
assert response.status_code == 200
assert b'Successfully registered user' in response.data
def test_register_user_exists_already(test_client, init_database):
"""
Test POST '/api/user/register'
User exists already
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
assert response.status_code == 500
assert b'Email already exist' in response.data
def test_register_email_missing(test_client, init_database):
"""
Test POST '/api/user/register'
Email missing
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="user3")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_register_email_empty(test_client, init_database):
"""
Test POST '/api/user/register'
Email empty
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="user3", email="")), content_type='application/json')
assert response.status_code == 400
assert b'Not a valid email address' in response.data
def test_register_password_missing(test_client, init_database):
"""
Test POST '/api/user/register'
Password missing
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", username="user3")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_register_password_empty(test_client, init_database):
"""
Test POST '/api/user/register'
password empty
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", username="user3", password="")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_register_username_missing(test_client, init_database):
"""
Test POST '/api/user/register'
Username missing
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", email="user3@example.com")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_register_username_empty(test_client, init_database):
"""
Test POST '/api/user/register'
Username empty
"""
response = test_client.post('/api/user/register', data=json.dumps(dict(password="password", username="", email="user3@example.com")), content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_user_not_logged_in(test_client, init_database):
"""
Test DELETE '/api/user'
User is not logged in
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user', data=json.dumps(dict(email="user3@example.com")), content_type='application/json')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_delete_user_same_user_logged_in(test_client, init_database):
"""
Test DELETE '/api/user'
User3 is logged in
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user',
data=json.dumps(dict(email="user3@example.com")),
content_type='application/json',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))})
assert response.status_code == 200
assert b'Successfully removed user' in response.data
def test_delete_user_different_user_logged_in_no_admin(test_client, init_database):
"""
Test DELETE '/api/user'
Different user is logged in -> no admin
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user',
data=json.dumps(dict(email="user3@example.com")),
content_type='application/json',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 401
assert b'Only admin users can access this' in response.data
def test_delete_user_different_user_logged_in_admin(test_client, init_database):
"""
Test DELETE '/api/user'
Different user is logged in -> admin
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user',
data=json.dumps(dict(email="user3@example.com")),
content_type='application/json',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))})
assert response.status_code == 200
assert b'Successfully removed user' in response.data
def test_delete_user_email_missing(test_client, init_database):
"""
Test DELETE '/api/user'
Email missing
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user',
data=json.dumps(dict()),
content_type='application/json',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))})
assert response.status_code == 400
assert b'missing' in response.data
def test_delete_user_email_empty(test_client, init_database):
"""
Test DELETE '/api/user'
Email empty
"""
test_client.post('/api/user/register', data=json.dumps(dict(email="user3@example.com", password="password", username="user3")), content_type='application/json')
response = test_client.delete('/api/user',
data=json.dumps(dict(email="")),
content_type='application/json',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user3@example.com", "password"))})
assert response.status_code == 400
assert b'Not a valid email address' in response.data
def test_get_current_user_user_not_logged_in(test_client, init_database):
"""
Test GET '/api/user'
User is not logged in
"""
response = test_client.get('/api/user')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_current_user_user1_logged_in(test_client, init_database):
"""
Test GET '/api/user'
User1 is logged in
"""
response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'user1' in response.data
assert b'user2' not in response.data
def test_get_current_user_bot_logged_in_user_exists(test_client, init_database):
"""
Test GET '/api/user'
Bot1 is logged in and requests user 12345678
"""
response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":12345678")})
assert response.status_code == 200
assert b'user1' in response.data
assert b'bot' not in response.data
def test_get_current_user_bot_logged_in_user_not_exists(test_client, init_database):
"""
Test GET '/api/user'
Bot1 is logged in and requests user 1234 (not existing)
"""
response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "bot1@example.com", "bot1") + ":1234")})
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_get_current_user_user1_logged_in_but_no_bot(test_client, init_database):
"""
Test GET '/api/user'
User1 is logged in and requests user 1234 (not existing)
Fails because user1 is not a bot
"""
response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password") + ":12345678")})
assert response.status_code == 401
assert b'Unable to login' in response.data
def test_get_current_user_invalid_token(test_client, init_database):
"""
Test GET '/api/user'
Invalid Bearer token
"""
response = test_client.get('/api/user', headers={"Authorization": "Bearer {}".format("invalidtoken:12345678")})
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_update_user_not_logged_in(test_client, init_database):
"""
Test PUT '/api/user'
User is not logged in
"""
response = test_client.put('/api/user')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_update_user1_logged_in_password_username(test_client, init_database):
"""
Test PUT '/api/user'
User1 is logged in
Change Username and Password
"""
test_client.put('/api/user', data=json.dumps(dict(username="user4", password="password")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
response = test_client.get('/api/user',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 200
assert b'user4' in response.data
def test_update_user1_logged_in_password(test_client, init_database):
"""
Test PUT '/api/user'
User1 is logged in
Change Password
"""
response = test_client.put('/api/user', data=json.dumps(dict(password="password123")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'Successfully updated user' in response.data
def test_update_user1_logged_in_username(test_client, init_database):
"""
Test PUT '/api/user'
User1 is logged in
Change Username
"""
response = test_client.put('/api/user', data=json.dumps(dict(username="user1")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))})
assert response.status_code == 200
assert b'Successfully updated user' in response.data
def test_set_admin_user_not_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
User is not logged in
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)), content_type='application/json')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_set_admin_user1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
User1 is logged in (no admin)
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 401
assert b'Only admin users can access this' in response.data
def test_set_admin_admin1_logged_in(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=True)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully updated users admin rights' in response.data
def test_set_admin_admin1_logged_in_user_not_exist(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
notexistinguser@example.com does not exist
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="notexistinguser@example.com", admin=True)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 500
assert b'Unable to update user' in response.data
def test_set_admin_admin1_logged_in_email_missing(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
email missing
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(admin=True)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_set_admin_admin1_logged_in_email_empty(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
email missing
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="", admin=True)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 400
assert b'Not a valid email address.' in response.data
def test_set_admin_admin1_logged_in_admin_missing(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
admin data missing
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com")),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 400
assert b'missing' in response.data
def test_set_admin_admin1_logged_in_admin_empty(test_client, init_database):
"""
Test PUT '/api/user/setAdmin'
Admin1 is logged in (admin)
admin data missing
"""
response = test_client.put('/api/user/setAdmin', data=json.dumps(dict(email="user1@example.com", admin=None)),
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 400
assert b'Field may not be null' in response.data
def test_get_users_not_logged_in(test_client, init_database):
"""
Test GET '/api/users'
User is not logged in
"""
response = test_client.get('/api/users')
assert response.status_code == 401
assert b'Unauthorized' in response.data
def test_get_users_user1_logged_in(test_client, init_database):
"""
Test GET '/api/users'
User1 is logged in (not admin)
"""
response = test_client.get('/api/users',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "user1@example.com", "password"))},
content_type='application/json')
assert response.status_code == 401
assert b'Only admin users can access this' in response.data
def test_get_users_admin1_logged_in(test_client, init_database):
"""
Test GET '/api/users'
Admin1 is logged in (admin)
"""
response = test_client.get('/api/users',
headers={"Authorization": "Bearer {}".format(get_token(test_client, "admin1@example.com", "admin1"))},
content_type='application/json')
assert response.status_code == 200
assert b'Successfully received all users' in response.data

0
api/tests/pytest.ini Normal file
View File

View File

View File

@ -0,0 +1,11 @@
"""
This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file.
"""
from app.auth import *
def test_verify_token():
"""
Test verify_token function
"""
assert verify_token(None) is False

View File

@ -0,0 +1,20 @@
"""
This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file.
"""
from app.helper_functions import *
def test_hash_password():
"""
Test hash_password function
"""
assert hash_password("password") != "password"
def test_check_password():
"""
Test check_password function
"""
hashed = hash_password("password")
assert check_password(hashed, "password".encode("utf-8")) is True
assert check_password(hashed, "password1".encode("utf-8")) is False

View File

@ -0,0 +1,116 @@
"""
This file (test_models.py) contains the unit tests for the models.py file.
"""
from app.models import User, Transaction, Keyword, Share
from app.helper_functions import hash_password
def test_new_user():
"""
GIVEN a User model
WHEN a new User is created
THEN check the email, password, username, telegram_user_id and admin fields are defined correctly
"""
user = User(
email="user@example.com",
username="user",
password=hash_password("password"),
admin=False
)
assert user.email == 'user@example.com'
assert user.password != 'password'
assert user.username == "user"
assert user.telegram_user_id is None
assert user.admin is False
assert user.as_dict() == {'email': 'user@example.com', 'username': 'user', 'telegram_user_id': None, 'admin': False}
def test_new_user_with_fixture(new_user):
"""
GIVEN a User model
WHEN a new User is created
THEN check the email and password_hashed fields are defined correctly
"""
assert new_user.email == 'user@example.com'
assert new_user.password != 'password'
def test_new_transaction():
"""
GIVEN a Transaction model
WHEN a new Transaction is created
THEN check the email, symbol, time count and price fields are defined correctly
"""
transaction = Transaction(
email="user@example.com",
symbol="DTEGY",
time="2022-03-29T10:00:00.000Z",
count=10,
price=9.99
)
assert transaction.email == 'user@example.com'
assert transaction.symbol == "DTEGY"
assert transaction.count == 10
assert transaction.price == 9.99
assert transaction.as_dict() == {'t_id': None, 'email': 'user@example.com', 'symbol': 'DTEGY', 'time': '2022-03-29T10:00:00.000Z', 'count': 10, 'price': 9.99}
def test_new_transaction_with_fixture(new_transaction):
"""
GIVEN a User model
WHEN a new User is created
THEN check the email and password_hashed fields are defined correctly
"""
assert new_transaction.email == 'user@example.com'
assert new_transaction.symbol == 'DTEGY'
def test_new_keyword():
"""
GIVEN a Transaction model
WHEN a new Transaction is created
THEN check the email, symbol, time count and price fields are defined correctly
"""
keyword = Keyword(
email="user@example.com",
keyword="Elon Musk"
)
assert keyword.email == 'user@example.com'
assert keyword.keyword == "Elon Musk"
assert keyword.as_dict() == {'s_id': None, 'email': 'user@example.com', 'keyword': 'Elon Musk'}
def test_new_keyword_with_fixture(new_keyword):
"""
GIVEN a User model
WHEN a new User is created
THEN check the email and password_hashed fields are defined correctly
"""
assert new_keyword.email == 'user@example.com'
assert new_keyword.keyword == 'Elon Musk'
def test_new_share():
"""
GIVEN a Transaction model
WHEN a new Transaction is created
THEN check the email, symbol, time count and price fields are defined correctly
"""
share = Share(
email="user@example.com",
symbol="DTEGY"
)
assert share.email == 'user@example.com'
assert share.symbol == "DTEGY"
assert share.as_dict() == {'a_id': None, 'email': 'user@example.com', 'symbol': 'DTEGY'}
def test_new_share_with_fixture(new_share):
"""
GIVEN a User model
WHEN a new User is created
THEN check the email and password_hashed fields are defined correctly
"""
assert new_share.email == 'user@example.com'
assert new_share.symbol == 'DTEGY'

View File

@ -0,0 +1,40 @@
"""
This file (test_transaction.py) contains the unit tests for the blueprints/transaction.py file.
"""
from app.blueprints.transactions import *
def test_check_if_symbol_data_exists():
"""
Test check_if_symbol_data_exists function
"""
assert check_if_symbol_data_exists(dict(symbol='DTEGY')) is True
assert check_if_symbol_data_exists(dict(symbol='')) is False
assert check_if_symbol_data_exists(dict()) is False
def test_check_if_time_data_exists():
"""
Test check_if_time_data_exists function
"""
assert check_if_time_data_exists(dict(time='2022-03-29T10:00:00.000Z')) is True
assert check_if_time_data_exists(dict(time='')) is False
assert check_if_time_data_exists(dict()) is False
def test_check_if_count_data_exists():
"""
Test check_if_count_data_exists function
"""
assert check_if_count_data_exists(dict(count=10)) is True
assert check_if_count_data_exists(dict(count=None)) is False
assert check_if_count_data_exists(dict()) is False
def test_check_if_price_data_exists():
"""
Test check_if_price_data_exists function
"""
assert check_if_price_data_exists(dict(price=9.99)) is True
assert check_if_price_data_exists(dict(price=None)) is False
assert check_if_price_data_exists(dict()) is False

View File

@ -0,0 +1,40 @@
"""
This file (test_helper_functions.py) contains the unit tests for the helper_functions.py file.
"""
from app.blueprints.user import *
def test_check_if_email_data_exists():
"""
Test check_if_email_data_exists function
"""
assert check_if_email_data_exists(dict(email='user@example.com')) is True
assert check_if_email_data_exists(dict(email='')) is False
assert check_if_email_data_exists(dict()) is False
def test_check_if_password_data_exists():
"""
Test check_if_password_data_exists function
"""
assert check_if_password_data_exists(dict(password='password')) is True
assert check_if_password_data_exists(dict(password='')) is False
assert check_if_password_data_exists(dict()) is False
def test_check_if_username_data_exists():
"""
Test check_if_username_data_exists function
"""
assert check_if_username_data_exists(dict(username='user')) is True
assert check_if_username_data_exists(dict(username='')) is False
assert check_if_username_data_exists(dict()) is False
def test_check_if_admin_data_exists():
"""
Test check_if_admin_data_exists function
"""
assert check_if_admin_data_exists(dict(admin=True)) is True
assert check_if_admin_data_exists(dict(admin=None)) is False
assert check_if_admin_data_exists(dict()) is False