Rewrite api from Flask to APIFlask

This commit is contained in:
Administrator 2022-03-17 09:26:25 +01:00
parent 72716c3c82
commit 290672cee4
11 changed files with 322 additions and 115 deletions

View File

@ -1,24 +1,29 @@
import os
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from auth import auth
from scheme import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
from models import Keyword
keyword_blueprint = Blueprint('keyword', __name__, url_prefix='/api')
keyword_blueprint = APIBlueprint('keyword', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@keyword_blueprint.route('/keyword', methods=['POST'])
def add_keyword():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@keyword_blueprint.output(KeywordResponseSchema(many=True), 200)
@keyword_blueprint.input(schema=KeywordSchema)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Add new keyword", description="Adds new keyword for current user")
def add_keyword(data):
username = get_username_or_abort_401()
request_data = request.get_json()
key = request_data['keyword']
check_if_keyword_data_exists(data)
key = data['keyword']
check_keyword = db.session.query(Keyword).filter_by(keyword=key, user_id=get_user_id_from_username(username)).first()
if check_keyword is None:
@ -32,31 +37,33 @@ def add_keyword():
return jsonify({"status": 200, "text": "Successfully added keyword", "data": new_keyword.as_dict()})
else:
return jsonify({"status": 500, "text": "Keyword already exist for this user"})
abort(500, message="Keyword already exist for this user")
@keyword_blueprint.route('/keyword', methods=['DELETE'])
def remove_keyword():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@keyword_blueprint.output(DeleteSuccessfulSchema, 200)
@keyword_blueprint.input(schema=KeywordSchema)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Removes existing keyword", description="Removes existing keyword for current user")
def remove_keyword(data):
username = get_username_or_abort_401()
request_data = request.get_json()
key = request_data['keyword']
check_if_keyword_data_exists(data)
key = data['keyword']
db.session.query(Keyword).filter_by(keyword=key, user_id=get_user_id_from_username(username)).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully removed keyword"})
return jsonify({"status": 200, "text": "Successfully removed keyword", "data": {}})
@keyword_blueprint.route('/keywords', methods=['GET'])
@keyword_blueprint.output(KeywordResponseSchema(many=True), 200)
@keyword_blueprint.auth_required(auth)
@keyword_blueprint.doc(summary="Returns all keywords", description="Returns all keywords for current user")
def get_keywords():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_keywords = []
keywords = db.session.query(Keyword).filter_by(user_id=get_user_id_from_username(username)).all()
@ -66,3 +73,11 @@ def get_keywords():
return_keywords.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded keywords", "data": return_keywords})
def check_if_keyword_data_exists(data):
if "keyword" not in data:
abort(400, message="Keyword missing")
if data['keyword'] == "" or data['keyword'] is None:
abort(400, message="Keyword missing")

View File

@ -1,21 +1,23 @@
import os
from flask import Blueprint, jsonify
from apiflask import APIBlueprint
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Transaction
from auth import auth
portfolio_blueprint = Blueprint('portfolio', __name__, url_prefix='/api')
portfolio_blueprint = APIBlueprint('portfolio', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@portfolio_blueprint.route('/portfolio', methods=['GET'])
@portfolio_blueprint.output(200)
@portfolio_blueprint.auth_required(auth)
@portfolio_blueprint.doc(summary="Returns portfolio", description="Returns all shares of current user")
def get_portfolio():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_portfolio = {}
transactions = db.session.query(Transaction).filter_by(user_id=get_user_id_from_username(username)).all()

View File

@ -1,24 +1,29 @@
import os
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from auth import auth
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Share
from scheme import SymbolSchema, SymbolResponseSchema, DeleteSuccessfulSchema
shares_blueprint = Blueprint('share', __name__, url_prefix='/api')
shares_blueprint = APIBlueprint('share', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@shares_blueprint.route('/share', methods=['POST'])
def add_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@shares_blueprint.output(SymbolResponseSchema(many=True), 200)
@shares_blueprint.input(schema=SymbolSchema)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Add new symbol", description="Adds new symbol for current user")
def add_symbol(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
check_if_symbol_data_exists(data)
symbol = data['symbol']
check_share = db.session.query(Share).filter_by(symbol=symbol, user_id=get_user_id_from_username(username)).first()
if check_share is None:
@ -36,27 +41,29 @@ def add_symbol():
@shares_blueprint.route('/share', methods=['DELETE'])
def remove_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@shares_blueprint.output(DeleteSuccessfulSchema, 200)
@shares_blueprint.input(schema=SymbolSchema)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Removes existing symbol", description="Removes existing symbol for current user")
def remove_symbol(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
check_if_symbol_data_exists(data)
symbol = data['symbol']
db.session.query(Share).filter_by(symbol=symbol, user_id=get_user_id_from_username(username)).delete()
db.session.commit()
return jsonify({"status": 200, "text": "Successfully removed symbol"})
return jsonify({"status": 200, "text": "Successfully removed symbol", "data": {}})
@shares_blueprint.route('/shares', methods=['GET'])
@shares_blueprint.output(SymbolResponseSchema(many=True), 200)
@shares_blueprint.auth_required(auth)
@shares_blueprint.doc(summary="Returns all symbols", description="Returns all symbols for current user")
def get_symbol():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_symbols = []
symbols = db.session.query(Share).filter_by(user_id=get_user_id_from_username(username)).all()
@ -66,3 +73,11 @@ def get_symbol():
return_symbols.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded symbols", "data": return_symbols})
def check_if_symbol_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")

View File

@ -1,48 +1,48 @@
import os
import datetime
from flask import Blueprint, jsonify, request
from apiflask import abort, APIBlueprint
from flask import jsonify
from db import db
from helper_functions import get_username_from_token_data, extract_token_data, get_token, get_user_id_from_username, return_401
from helper_functions import get_user_id_from_username, get_username_or_abort_401
from models import Transaction
from scheme import TransactionSchema
from auth import auth
transaction_blueprint = Blueprint('transaction', __name__, url_prefix='/api')
transaction_blueprint = APIBlueprint('transaction', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@transaction_blueprint.route('/transaction', methods=['POST'])
def add_transaction():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
@transaction_blueprint.output((), 200)
@transaction_blueprint.input(schema=TransactionSchema)
@transaction_blueprint.auth_required(auth)
@transaction_blueprint.doc(summary="Adds new transaction", description="Adds new transaction for current user")
def add_transaction(data):
username = get_username_or_abort_401()
request_data = request.get_json()
symbol = request_data['symbol']
time = datetime.datetime.strptime(request_data['time'], '%Y-%m-%dT%H:%M:%S.%fZ')
count = request_data['count']
price = request_data['price']
check_if_transaction_data_exists(data)
new_transcation = Transaction(
new_transaction = Transaction(
user_id=get_user_id_from_username(username),
symbol=symbol,
time=time,
count=count,
price=price
symbol=data['symbol'],
time=datetime.datetime.strptime(data['time'], '%Y-%m-%dT%H:%M:%S.%fZ'),
count=data['count'],
price=data['price']
)
db.session.add(new_transcation)
db.session.add(new_transaction)
db.session.commit()
return jsonify({"status": 200, "text": "Successfully added transaction", "data": new_transcation.as_dict()})
return jsonify({"status": 200, "text": "Successfully added transaction", "data": new_transaction.as_dict()})
@transaction_blueprint.route('/transactions', methods=['GET'])
@transaction_blueprint.output(TransactionSchema(), 200)
@transaction_blueprint.auth_required(auth)
@transaction_blueprint.doc(summary="Returns all transactions", description="Returns all transactions for current user")
def get_transaction():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
return return_401()
username = get_username_or_abort_401()
return_transactions = []
transactions = db.session.query(Transaction).filter_by(user_id=get_user_id_from_username(username)).all()
@ -52,3 +52,29 @@ def get_transaction():
return_transactions.append(row.as_dict())
return jsonify({"status": 200, "text": "Successfully loaded transactions", "data": return_transactions})
def check_if_transaction_data_exists(data):
if "symbol" not in data:
abort(400, message="Symbol missing")
if data['symbol'] == "" or data['symbol'] is None:
abort(400, message="Symbol missing")
if "time" not in data:
abort(400, message="Time missing")
if data['time'] == "" or data['time'] is None:
abort(400, message="Time missing")
if "count" not in data:
abort(400, message="Count missing")
if data['count'] == "" or data['count'] is None:
abort(400, message="Count missing")
if "price" not in data:
abort(400, message="Price missing")
if data['price'] == "" or data['price'] is None:
abort(400, message="Price missing")

View File

@ -2,17 +2,23 @@ import datetime
import os
import jwt
from flask import Blueprint, jsonify, request
from apiflask import APIBlueprint, abort
from flask import jsonify
from db import db
from helper_functions import check_password, hash_password, get_token, extract_token_data
from helper_functions import check_password, hash_password
from models import User
from scheme import UsersSchema, Token, LoginData
from auth import auth
users_blueprint = Blueprint('users', __name__, url_prefix='/api')
users_blueprint = APIBlueprint('users', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
@users_blueprint.route('/users', methods=['GET'])
@users_blueprint.output(UsersSchema(many=True), 200)
@users_blueprint.auth_required(auth)
@users_blueprint.doc(summary="Get all users", description="Returns all existing users as array")
def users():
res = []
for i in User.query.all():
@ -22,43 +28,62 @@ def users():
@users_blueprint.route('/login', methods=['POST'])
def login():
request_data = request.get_json()
username = request_data['username']
password = request_data['password']
@users_blueprint.output(Token(), 200)
@users_blueprint.input(schema=LoginData)
@users_blueprint.doc(summary="Login", description="Returns jwt token if username and password match, otherwise returns error")
def login(data):
check_if_user_data_exists(data)
username = data['username']
password = data['password']
user = db.session.query(User).filter_by(username=username).first()
if check_password(user.password, password):
token = jwt.encode({'username': user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
return jsonify({"status": 200, "text": "Successfully logged in", "data": token})
else:
return jsonify({"status": 500, "text": "Unable to login"})
if user is None: # Username doesn't exist
abort(500, message="Unable to login")
if not check_password(user.password, password): # Password incorrect
abort(500, message="Unable to login")
@users_blueprint.route('/logout', methods=['GET'])
def logout():
# TODO
return jsonify({"status": 200, "text": "Successfully logged out"})
token = jwt.encode({'username': user.username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=45)}, os.getenv('SECRET_KEY'), "HS256")
return jsonify({"status": 200, "text": "Successfully logged in", "data": {"token": token}})
@users_blueprint.route('/register', methods=['POST'])
def register():
request_data = request.get_json()
username = request_data['username']
password = request_data['password']
@users_blueprint.output(UsersSchema(), 200)
@users_blueprint.input(schema=LoginData)
@users_blueprint.doc(summary="Register", description="Registers user")
def register(data):
check_if_user_data_exists(data)
username = data['username']
password = data['password']
user = db.session.query(User).filter_by(username=username).first()
if user is None:
# Username doesn't exist yet
user = User(
username=username,
password=hash_password(password),
admin=False
)
db.session.add(user)
db.session.commit()
return jsonify({"status": 200, "text": "Successfully registered user", "data": user.as_dict()})
else:
return jsonify({"status": 500, "text": "Username already exist"})
if user is not None: # Username already exist
abort(500, message="Username already exist")
user = User(
username=username,
password=hash_password(password),
admin=False
)
db.session.add(user)
db.session.commit()
return jsonify({"status": 200, "text": "Successfully registered user", "data": user.as_dict()})
def check_if_user_data_exists(data):
if "username" not in data:
abort(400, message="Username missing")
if data['username'] == "" or data['username'] is None:
abort(400, message="Username missing")
if "password" not in data:
abort(400, message="Password missing")
if data['password'] == "" or data['password'] is None:
abort(400, message="Password missing")

View File

@ -1,4 +1,9 @@
from flask import Flask
# TODO
# Change password, username
# Endpoints for news, shares
from apiflask import APIFlask
from dotenv import load_dotenv
from models import *
@ -14,7 +19,7 @@ def create_app():
load_dotenv()
# Create Flask app load app.config
application = Flask(__name__)
application = APIFlask(__name__)
application.config.from_object("config.ConfigClass")
application.app_context().push()

18
webservice/auth.py Normal file
View File

@ -0,0 +1,18 @@
import os
import jwt
from apiflask import HTTPTokenAuth
auth = HTTPTokenAuth()
@auth.verify_token
def verify_token(token):
if token is None:
return False
try:
jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
return True
except jwt.exceptions.DecodeError:
return False

View File

@ -2,6 +2,8 @@ import os
from dotenv import load_dotenv
from scheme import BaseResponseSchema
load_dotenv()
@ -19,3 +21,32 @@ class ConfigClass(object):
(os.getenv("MYSQL_PORT") or str(3306)) + "/" + \
os.getenv('MYSQL_DATABASE')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
# openapi/Swagger config
SPEC_FORMAT = 'yaml'
SERVERS = [
{
"name": "Production",
"url": "https://aktienbot.flokaiser.com"
},
{
"name": "Local",
"url": "http://127.0.0.1:5000"
}
]
INFO = {
'description': 'Webengineering 2 | Telegram Aktienbot',
'version': '0.0.1'
# 'termsOfService': 'http://example.com',
# 'contact': {
# 'name': 'API Support',
# 'url': 'http://www.example.com/support',
# 'email': 'support@example.com'
# },
# 'license': {
# 'name': 'Apache 2.0',
# 'url': 'http://www.apache.org/licenses/LICENSE-2.0.html'
# }
}
BASE_RESPONSE_DATA_KEY = "data"
BASE_RESPONSE_SCHEMA = BaseResponseSchema

View File

@ -3,7 +3,8 @@ import os
import uuid
import jwt
from flask import request, jsonify
from apiflask import abort
from flask import request
from db import db
from models import User
@ -31,7 +32,7 @@ def extract_token_data(token):
if token is not None:
try:
return jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=["HS256"])
except:
except jwt.exceptions.DecodeError:
return None
else:
return None
@ -51,5 +52,10 @@ def get_user_id_from_username(username):
return None
def return_401():
return jsonify({"status": 401, "text": "Authorization token not provided or not valid"})
def get_username_or_abort_401():
# get username from jwt token
username = get_username_from_token_data(extract_token_data(get_token()))
if username is None: # If token not provided or invalid -> return 401 code
abort(401, message="Unable to login")
return username

View File

@ -6,3 +6,5 @@ Flask_SQLAlchemy==2.5.1
python-dotenv==0.19.2
pymysql==1.0.2
pyjwt==2.0.0
apiflask==0.12.0
flask-swagger-ui==3.36.0

62
webservice/scheme.py Normal file
View File

@ -0,0 +1,62 @@
from apiflask import Schema
from apiflask.fields import Integer, String, Boolean, Field, Float
class BaseResponseSchema(Schema):
text = String()
status = Integer()
data = Field()
class UsersSchema(Schema):
admin = Boolean()
password = String()
telegram_name = String()
user_id = Integer()
username = String()
class Token(Schema):
token = String()
class LoginData(Schema):
username = String()
password = String()
class KeywordSchema(Schema):
keyword = String()
class KeywordResponseSchema(Schema):
keyword = String()
s_id = Integer()
user_id = Integer()
class SymbolSchema(Schema):
symbol = String()
class SymbolResponseSchema(Schema):
symbol = String()
s_id = Integer()
user_id = Integer()
class PortfolioShareResponseSchema(Schema):
count = Integer()
last_transaction = String()
class TransactionSchema(Schema):
user_id = Integer()
symbol = String()
time = String()
count = Integer()
price = Float()
class DeleteSuccessfulSchema(Schema):
pass