374 lines
14 KiB
Python
374 lines
14 KiB
Python
"""
|
|
script for communicating with webservice to get data from database
|
|
"""
|
|
__author__ = "Florian Kellermann, Linus Eickhoff"
|
|
__date__ = "26.04.2022"
|
|
__version__ = "1.0.1"
|
|
__license__ = "None"
|
|
|
|
#side-dependencies: none
|
|
#Work in Progress
|
|
|
|
import sys
|
|
import os
|
|
import requests as r
|
|
from croniter import croniter # used for checking cron formatting
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv() # loads environment vars
|
|
|
|
# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
|
|
|
|
class API_Handler:
|
|
"""class for interacting with the api webservice
|
|
|
|
Attributes:
|
|
db_adress (string): adress of the database
|
|
token (string): auth token for api
|
|
|
|
Methods:
|
|
reauthorize(email, password): set new credentials
|
|
get_user_keywords(user_id): gets the keywords of the user
|
|
set_keyword(user_id, keyword): sets the keyword of the user
|
|
delete_keyword(user_id, keyword): deletes the keyword of the user
|
|
get_user_shares(user_id): gets the shares of the user
|
|
set_share(user_id, symbol): sets the share of the user
|
|
delete_share(user_id, symbol): deletes the share of the user
|
|
get_user_transactions(user_id): gets the transactions of the user
|
|
set_transaction(user_id, transaction): sets the transaction of the user
|
|
get_user_portfolio(user_id): gets the portfolio of the user
|
|
set_portfolio(user_id, portfolio): sets the portfolio of the user
|
|
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
|
|
set_cron_interval(user_id, interval): sets the cron interval of the user
|
|
set_admin(email, is_admin): sets the admin status of the user with the given email
|
|
"""
|
|
|
|
|
|
def __init__(self, db_adress, email, password):
|
|
"""initializes the API_Handler class
|
|
|
|
Args:
|
|
db_adress (string): adress of the database
|
|
email (string): email of the user
|
|
password (string): password of the user
|
|
"""
|
|
self.db_adress = db_adress
|
|
|
|
payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
|
|
with r.Session() as s: # open session
|
|
p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
|
|
if p.status_code == 200:
|
|
self.token = p.json()["data"]['token'] # store token for further authentication of requests
|
|
else:
|
|
print("Error: " + str(p.status_code) + " invalid credentials")
|
|
self.token = None
|
|
|
|
|
|
def reauthorize(self, email, password): # can be used if token expired
|
|
"""set new credentials
|
|
|
|
Args:
|
|
email (string): email of the user
|
|
password (string): password of the user
|
|
"""
|
|
payload = {'email': email, 'password': password}
|
|
with r.Session() as s:
|
|
p = s.post(self.db_adress + "/user/login", json=payload)
|
|
if p.status_code == 200:
|
|
self.token = p.json()["data"]['token']
|
|
return p.json()["data"]['token']
|
|
else:
|
|
self.token = None
|
|
return None
|
|
|
|
|
|
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
|
|
"""gets the shares of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
json: json of user infos
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
|
|
req = s.get(self.db_adress + "/user", headers=headers)
|
|
if(req.status_code == 200):
|
|
return req.json()["data"]
|
|
|
|
else:
|
|
return self.get_user(user_id, max_retries-1) # if request fails try again recursively
|
|
|
|
|
|
def get_all_users(self, max_retries=10):
|
|
"""gets all users
|
|
|
|
Args:
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
list: list of users
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token}
|
|
req = s.get(self.db_adress + "/users", headers=headers)
|
|
if(req.status_code == 200):
|
|
return req.json()["data"]
|
|
|
|
else:
|
|
return self.get_all_users(max_retries-1)
|
|
|
|
|
|
def get_user_keywords(self, user_id, max_retries=10):
|
|
"""gets the keywords of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
list: list of keywords
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
keywords = []
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.get(self.db_adress + "/keywords", headers=headers)
|
|
if(req.status_code == 200):
|
|
keywords_json = req.json()["data"]
|
|
for keyword in keywords_json: # keywords_json is a list of dictionaries
|
|
keywords.append(keyword["keyword"])
|
|
|
|
return keywords # will be empty if no keywords are set
|
|
|
|
else:
|
|
return self.get_user_keywords(user_id, max_retries-1)
|
|
|
|
|
|
|
|
def set_keyword(self, user_id, keyword):
|
|
"""sets the keyword of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
keyword (int): keyword of the user
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
|
|
|
|
return req.status_code
|
|
|
|
|
|
def delete_keyword(self, user_id, keyword):
|
|
"""deletes the keyword of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
keyword (string): keyword of the user
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
|
|
|
|
return req.status_code
|
|
|
|
|
|
def get_user_shares(self, user_id, max_retries=10):
|
|
"""gets the shares of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
list: list of shares
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.get(self.db_adress + "/shares", headers=headers)
|
|
if(req.status_code == 200):
|
|
shares_json = req.json()["data"]
|
|
shares = []
|
|
for share in shares_json:
|
|
shares.append(share["isin"]) # we only want the isin of the shares
|
|
|
|
return shares
|
|
|
|
else:
|
|
return self.get_user_shares(user_id, max_retries-1)
|
|
|
|
|
|
def set_share(self, user_id, isin, comment):
|
|
"""sets the share of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
isin (string): identifier of the share (standard is isin)
|
|
comment (string): comment of the share
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
|
|
return req.status_code
|
|
|
|
|
|
def delete_share(self, user_id, isin):
|
|
"""deletes the share of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
isin (string): identifier of the share (standard is isin)
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
|
|
return req.status_code
|
|
|
|
|
|
def get_user_transactions(self, user_id, max_retries=10):
|
|
"""gets the transactions of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
dict: dictionary of transactions
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.get(self.db_adress + "/transactions", headers=headers)
|
|
|
|
if req.status_code == 200:
|
|
transactions_dict = req.json()["data"]
|
|
return transactions_dict
|
|
else:
|
|
return self.get_user_transactions(user_id, max_retries-1)
|
|
|
|
|
|
def set_transaction(self, user_id, comment, isin, count, price, time):
|
|
"""sets the transaction of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
comment (string): comment of the transaction
|
|
isin (string): isin of the transaction
|
|
count (float): count of the transaction
|
|
price (float): price of the transaction
|
|
time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
|
|
req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
|
|
return req.status_code
|
|
|
|
|
|
def get_user_portfolio(self, user_id, max_retries=10):
|
|
"""gets the portfolio of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
max_retries (int): max retries for the request
|
|
|
|
Returns:
|
|
dict: dictionary of portfolio
|
|
"""
|
|
if max_retries <= 0:
|
|
return None
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
|
|
if req.status_code == 200:
|
|
portfolio_dict = req.json()["data"] # get the data of the JSON
|
|
return portfolio_dict
|
|
else:
|
|
return self.get_user_portfolio(user_id, max_retries-1)
|
|
|
|
def set_cron_interval(self, user_id, cron_interval):
|
|
"""sets the cron interval of the user
|
|
|
|
Args:
|
|
user_id (int): id of the user
|
|
cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
|
|
print("Error: Invalid cron format")
|
|
return -1 # return error code -1 if invalid cron format
|
|
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
|
|
req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
|
|
return req.status_code
|
|
|
|
|
|
def set_admin(self, email, is_admin):
|
|
"""sets the admin of the user
|
|
|
|
Args:
|
|
email (string): email of the user
|
|
is_admin (bool): "true" if user should be Admin, "false" if not
|
|
|
|
Returns:
|
|
int: status code
|
|
"""
|
|
with r.Session() as s:
|
|
headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
|
|
req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin,"email": str(email)}, headers=headers)
|
|
return req.status_code
|
|
|
|
|
|
if __name__ == "__main__": # editable, just for basic on the go testing of new functions
|
|
|
|
print("This is a module for the telegram bot. It is not intended to be run directly.")
|
|
handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env
|
|
print(handler.token)
|
|
keywords = handler.get_user_keywords(user_id = 1709356058) #user_id here is currently mine (Linus)
|
|
print(keywords)
|
|
shares = handler.get_user_portfolio(user_id = 1709356058)
|
|
print("set cron with status: "+ str(handler.set_cron_interval(user_id = 1709356058, cron_interval = "0 0 * * *")))
|
|
user = handler.get_user(user_id = 1709356058)
|
|
print(user)
|
|
all_users = handler.get_all_users()
|
|
admin_status = handler.set_admin("test@test.com", "true")
|
|
print(admin_status)
|
|
print(all_users)
|
|
print(shares)
|
|
sys.exit(1) |