Merge pull request #6 from WebEngineering2/api

Rewrite api from Flask to APIFlask
This commit is contained in:
Florian Kaiser 2022-03-17 14:28:56 +01:00 committed by GitHub
commit 6360453a60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 466 additions and 118 deletions

View File

@ -1,24 +1,29 @@
import os
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from auth import auth
from scheme import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
from models import Keyword
keyword_blueprint = Blueprint('keyword', __name__, url_prefix='/api')
keyword_blueprint = APIBlueprint('keyword', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@keyword_blueprint.route('/keyword', methods=['POST'])
def add_keyword():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@keyword_blueprint.output(KeywordResponseSchema(many=True), 200)
@keyword_blueprint.input(schema=KeywordSchema)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Add new keyword", description="Adds new keyword for current user")
def add_keyword(data):
username = get_username_or_abort_401()
request_data = request.get_json()
key = request_data['keyword']
check_if_keyword_data_exists(data)
key = data['keyword']
check_keyword = db.session.query(Keyword).filter_by(keyword=key, user_id=get_user_id_from_username(username)).first()
if check_keyword is None:
@ -32,31 +37,33 @@ def add_keyword():
return jsonify({"status": 200, "text": "Successfully added keyword", "data": new_keyword.as_dict()})
else:
return jsonify({"status": 500, "text": "Keyword already exist for this user"})
abort(500, message="Keyword already exist for this user")
@keyword_blueprint.route('/keyword', methods=['DELETE'])
def remove_keyword():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@keyword_blueprint.output(DeleteSuccessfulSchema, 200)
@keyword_blueprint.input(schema=KeywordSchema)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Removes existing keyword", description="Removes existing keyword for current user")
def remove_keyword(data):
username = get_username_or_abort_401()
request_data = request.get_json()
key = request_data['keyword']
check_if_keyword_data_exists(data)
key = data['keyword']
db.session.query(Keyword).filter_by(keyword=key, user_id=get_user_id_from_username(username)).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully removed keyword"})
return jsonify({"status": 200, "text": "Successfully removed keyword", "data": {}})
@keyword_blueprint.route('/keywords', methods=['GET'])
@keyword_blueprint.output(KeywordResponseSchema(many=True), 200)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Returns all keywords", description="Returns all keywords for current user")
def get_keywords():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_keywords = []
keywords = db.session.query(Keyword).filter_by(user_id=get_user_id_from_username(username)).all()
@ -66,3 +73,11 @@ def get_keywords():
return_keywords.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded keywords", "data": return_keywords})
def check_if_keyword_data_exists(data):
if "keyword" not in data:
abort(400, message="Keyword missing")
if data['keyword'] == "" or data['keyword'] is None:
abort(400, message="Keyword missing")

View File

@ -1,21 +1,23 @@
import os
from flask import Blueprint, jsonify
from apiflask import APIBlueprint
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Transaction
from auth import auth
portfolio_blueprint = Blueprint('portfolio', __name__, url_prefix='/api')
portfolio_blueprint = APIBlueprint('portfolio', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@portfolio_blueprint.route('/portfolio', methods=['GET'])
@portfolio_blueprint.output(200)
@portfolio_blueprint.auth_required(auth)
@portfolio_blueprint.doc(summary="Returns portfolio", description="Returns all shares of current user")
def get_portfolio():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_portfolio = {}
transactions = db.session.query(Transaction).filter_by(user_id=get_user_id_from_username(username)).all()

View File

@ -1,24 +1,29 @@
import os
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from auth import auth
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Share
from scheme import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema
shares_blueprint = Blueprint('share', __name__, url_prefix='/api')
shares_blueprint = APIBlueprint('share', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@shares_blueprint.route('/share', methods=['POST'])
def add_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@shares_blueprint.output(SymbolResponseSchema(many=True), 200)
@shares_blueprint.input(schema=SymbolSchema)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Add new symbol", description="Adds new symbol for current user")
def add_symbol(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
check_if_symbol_data_exists(data)
symbol = data['symbol']
check_share = db.session.query(Share).filter_by(symbol=symbol, user_id=get_user_id_from_username(username)).first()
if check_share is None:
@ -36,27 +41,29 @@ def add_symbol():
@shares_blueprint.route('/share', methods=['DELETE'])
def remove_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@shares_blueprint.output(DeleteSuccessfulSchema, 200)
@shares_blueprint.input(schema=SymbolSchema)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Removes existing symbol", description="Removes existing symbol for current user")
def remove_symbol(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
check_if_symbol_data_exists(data)
symbol = data['symbol']
db.session.query(Share).filter_by(symbol=symbol, user_id=get_user_id_from_username(username)).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully removed symbol"})
return jsonify({"status": 200, "text": "Successfully removed symbol", "data": {}})
@shares_blueprint.route('/shares', methods=['GET'])
@shares_blueprint.output(SymbolResponseSchema(many=True), 200)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Returns all symbols", description="Returns all symbols for current user")
def get_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_symbols = []
symbols = db.session.query(Share).filter_by(user_id=get_user_id_from_username(username)).all()
@ -66,3 +73,11 @@ def get_symbol():
return_symbols.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded symbols", "data": return_symbols})
def check_if_symbol_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")

View File

@ -1,48 +1,48 @@
import os
import datetime
from flask import Blueprint, jsonify, request
from apiflask import abort, APIBlueprint
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Transaction
from scheme import TransactionSchema
from auth import auth
transaction_blueprint = Blueprint('transaction', __name__, url_prefix='/api')
transaction_blueprint = APIBlueprint('transaction', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@transaction_blueprint.route('/transaction', methods=['POST'])
def add_transaction():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@transaction_blueprint.output((), 200)
@transaction_blueprint.input(schema=TransactionSchema)
@transaction_blueprint.auth_required(auth)
@transaction_blueprint.doc(summary="Adds new transaction", description="Adds new transaction for current user")
def add_transaction(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
time = datetime.datetime.strptime(request_data['time'], '%Y-%m-%dT%H:%M:%S.%fZ')
count = request_data['count']
price = request_data['price']
check_if_transaction_data_exists(data)
new_transcation = Transaction(
new_transaction = Transaction(
user_id=get_user_id_from_username(username),
symbol=symbol,
time=time,
count=count,
price=price
symbol=data['symbol'],
time=datetime.datetime.strptime(data['time'], '%Y-%m-%dT%H:%M:%S.%fZ'),
count=data['count'],
price=data['price']
)
db.session.add(new_transcation)
db.session.add(new_transaction)
db.session.commit()
return jsonify({"status": 200, "text": "Successfully added transaction", "data": new_transcation.as_dict()})
return jsonify({"status": 200, "text": "Successfully added transaction", "data": new_transaction.as_dict()})
@transaction_blueprint.route('/transactions', methods=['GET'])
@transaction_blueprint.output(TransactionSchema(), 200)
@transaction_blueprint.auth_required(auth)
@transaction_blueprint.doc(summary="Returns all transactions", description="Returns all transactions for current user")
def get_transaction():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_transactions = []
transactions = db.session.query(Transaction).filter_by(user_id=get_user_id_from_username(username)).all()
@ -52,3 +52,29 @@ def get_transaction():
return_transactions.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded transactions", "data": return_transactions})
def check_if_transaction_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")
if "time" not in data:
abort(400, message="Time missing")
if data['time'] == "" or data['time'] is None:
abort(400, message="Time missing")
if "count" not in data:
abort(400, message="Count missing")
if data['count'] == "" or data['count'] is None:
abort(400, message="Count missing")
if "price" not in data:
abort(400, message="Price missing")
if data['price'] == "" or data['price'] is None:
abort(400, message="Price missing")

View File

@ -2,18 +2,26 @@ import datetime
import os
import jwt
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from db import db
from helper_functions import check_password, hash_password, get_token, extract_token_data
from helper_functions import check_password, hash_password, get_username_or_abort_401, abort_if_no_admin
from models import User
from scheme import UsersSchema, TokenSchema, LoginDataSchema, AdminDataSchema, DeleteUserSchema
from auth import auth
users_blueprint = Blueprint('users', __name__, url_prefix='/api')
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@users_blueprint.route('/users', methods=['GET'])
@users_blueprint.output(UsersSchema(many=True), 200)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get all users", description="Returns all existing users as array")
def users():
abort_if_no_admin()
res = []
for i in User.query.all():
res.append(i.as_dict())
@ -21,44 +29,171 @@ def users():
return jsonify({"status": 200, "data": res})
@users_blueprint.route('/login', methods=['POST'])
def login():
request_data = request.get_json()
username = request_data['username']
password = request_data['password']
@users_blueprint.route('/user', methods=['GET'])
@users_blueprint.output(UsersSchema(), 200)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get current user", description="Returns current user")
def user():
username = get_username_or_abort_401()
user = db.session.query(User).filter_by(username=username).first()
if check_password(user.password, password):
token = jwt.encode({'username': user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
res = db.session.query(User).filter_by(username=username).first().as_dict()
return jsonify({"status": 200, "text": "Successfully logged in", "data": token})
else:
return jsonify({"status": 500, "text": "Unable to login"})
return jsonify({"status": 200, "data": res})
@users_blueprint.route('/logout', methods=['GET'])
def logout():
# TODO
return jsonify({"status": 200, "text": "Successfully logged out"})
@users_blueprint.route('/user/login', methods=['POST'])
@users_blueprint.output(TokenSchema(), 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
def login(data):
check_if_user_data_exists(data)
username = data['username']
password = data['password']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
if not check_password(query_user.password, password): # Password incorrect
abort(500, message="Unable to login")
token = jwt.encode({'username': query_user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
return jsonify({"status": 200, "text": "Successfully logged in", "data": {"token": token}})
@users_blueprint.route('/register', methods=['POST'])
def register():
request_data = request.get_json()
username = request_data['username']
password = request_data['password']
@users_blueprint.route('/user/register', methods=['POST'])
@users_blueprint.output(UsersSchema(), 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.doc(summary="Register", description="Registers user")
def register(data):
check_if_user_data_exists(data)
user = db.session.query(User).filter_by(username=username).first()
if user is None:
# Username doesn't exist yet
user = User(
username=username,
password=hash_password(password),
admin=False
)
db.session.add(user)
username = data['username']
password = data['password']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is not None: # Username already exist
abort(500, message="Username already exist")
new_user = User(
username=username,
password=hash_password(password),
admin=False
)
db.session.add(new_user)
db.session.commit()
return jsonify({"status": 200, "text": "Successfully registered user", "data": new_user.as_dict()})
@users_blueprint.route('/user', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=LoginDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Update user", description="Changes password and/or username of current user")
def update_user(data):
username = get_username_or_abort_401()
check_if_user_data_exists(data)
new_username = data['username']
new_password = data['password']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
if new_password is not None:
query_user.password = hash_password(new_password)
if new_username is not None:
query_user.username = new_username
db.session.commit()
return jsonify({"status": 200, "text": "Successfully updated user", "data": {}})
@users_blueprint.route('/user/setAdmin', methods=['PUT'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=AdminDataSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Set user admin state", description="Set admin state of specified user")
def set_admin(data):
abort_if_no_admin() # Only admin users can do this
check_if_admin_data_exists(data)
username = data['username']
admin = data['admin']
query_user = db.session.query(User).filter_by(username=username).first()
if query_user is None: # Username doesn't exist
abort(500, message="Unable to login")
query_user.admin = admin
db.session.commit()
return jsonify({"status": 200, "text": "Successfully updated users admin rights", "data": {}})
@users_blueprint.route('/user', methods=['DELETE'])
@users_blueprint.output({}, 200)
@users_blueprint.input(schema=DeleteUserSchema)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Delete user", description="Deletes user by username")
def delete_user(data):
check_if_delete_data_exists(data)
username = data['username']
if username == get_username_or_abort_401(): # Username is same as current user
db.session.query(User).filter_by(username=username).delete()
db.session.commit()
else: # Delete different user than my user -> only admin users
abort_if_no_admin()
db.session.query(User).filter_by(username=username).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully registered user", "data": user.as_dict()})
else:
return jsonify({"status": 500, "text": "Username already exist"})
return jsonify({"status": 200, "text": "Successfully removed user", "data": {}})
def check_if_user_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
if "password" not in data:
abort(400, message="Password missing")
if data['password'] == "" or data['password'] is None:
abort(400, message="Password missing")
def check_if_admin_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
if "admin" not in data:
abort(400, message="Admin state missing")
if data['admin'] == "" or data['admin'] is None:
abort(400, message="Admin state missing")
def check_if_delete_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")

View File

@ -1,5 +1,7 @@
from flask import Flask
from apiflask import APIFlask
from dotenv import load_dotenv
from flask_cors import CORS
from models import *
from blueprint_interface import interface_blueprint
@ -14,9 +16,11 @@ def create_app():
load_dotenv()
# Create Flask app load app.config
application = Flask(__name__)
application = APIFlask(__name__)
application.config.from_object("config.ConfigClass")
CORS(application)
application.app_context().push()
db.init_app(application)

18
webservice/auth.py Normal file
View File

@ -0,0 +1,18 @@
import os
import jwt
from apiflask import HTTPTokenAuth
auth = HTTPTokenAuth()
@auth.verify_token
def verify_token(token):
if token is None:
return False
try:
jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
return True
except jwt.exceptions.DecodeError:
return False

View File

@ -2,6 +2,8 @@ import os
from dotenv import load_dotenv
from scheme import BaseResponseSchema
load_dotenv()
@ -19,3 +21,32 @@ class ConfigClass(object):
(os.getenv("MYSQL_PORT") or str(3306)) + "/" + \
os.getenv('MYSQL_DATABASE')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
# openapi/Swagger config
SPEC_FORMAT = 'yaml'
SERVERS = [
{
"name": "Production",
"url": "https://aktienbot.flokaiser.com"
},
{
"name": "Local",
"url": "http://127.0.0.1:5000"
}
]
INFO = {
'description': 'Webengineering 2 | Telegram Aktienbot',
'version': '0.0.1'
# 'termsOfService': 'http://example.com',
# 'contact': {
# 'name': 'API Support',
# 'url': 'http://www.example.com/support',
# 'email': 'support@example.com'
# },
# 'license': {
# 'name': 'Apache 2.0',
# 'url': 'http://www.apache.org/licenses/LICENSE-2.0.html'
# }
}
BASE_RESPONSE_DATA_KEY = "data"
BASE_RESPONSE_SCHEMA = BaseResponseSchema

View File

@ -3,7 +3,8 @@ import os
import uuid
import jwt
from flask import request, jsonify
from apiflask import abort
from flask import request
from db import db
from models import User
@ -31,7 +32,7 @@ def extract_token_data(token):
if token is not None:
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
except:
except jwt.exceptions.DecodeError:
return None
else:
return None
@ -51,5 +52,22 @@ def get_user_id_from_username(username):
return None
def return_401():
return jsonify({"status": 401, "text": "Authorization token not provided or not valid"})
def get_username_or_abort_401():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
return username
def abort_if_no_admin():
if not is_user_admin():
abort(401, message="Only admin users can access this")
def is_user_admin():
username = get_username_or_abort_401()
return db.session.query(User).filter_by(username=username).first().admin

View File

@ -6,3 +6,7 @@ Flask_SQLAlchemy==2.5.1
python-dotenv==0.19.2
pymysql==1.0.2
pyjwt==2.0.0
apiflask==0.12.0
flask-swagger-ui==3.36.0
flask-cors==3.0.10

80
webservice/scheme.py Normal file
View File

@ -0,0 +1,80 @@
from apiflask import Schema
from apiflask.fields import Integer, String, Boolean, Field, Float
class BaseResponseSchema(Schema):
text = String()
status = Integer()
data = Field()
class UsersSchema(Schema):
admin = Boolean()
password = String()
telegram_name = String()
user_id = Integer()
username = String()
class AdminDataSchema(Schema):
username = String()
admin = Boolean()
class TokenSchema(Schema):
token = String()
class LoginDataSchema(Schema):
username = String()
password = String()
class DeleteUserSchema(Schema):
username = String()
class ChangePasswordSchema(Schema):
old_password = String()
new_password = String()
class ChangeUsernameSchema(Schema):
new_username = String()
class KeywordSchema(Schema):
keyword = String()
class KeywordResponseSchema(Schema):
keyword = String()
s_id = Integer()
user_id = Integer()
class SymbolSchema(Schema):
symbol = String()
class SymbolResponseSchema(Schema):
symbol = String()
s_id = Integer()
user_id = Integer()
class PortfolioShareResponseSchema(Schema):
count = Integer()
last_transaction = String()
class TransactionSchema(Schema):
user_id = Integer()
symbol = String()
time = String()
count = Integer()
price = Float()
class DeleteSuccessfulSchema(Schema):
pass