Module telegram_bot.api_handling.api_handler
script for communicating with webservice to get data from database
Expand source code
"""
script for communicating with webservice to get data from database
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"
# side-dependencies: none
# Work in Progress
import os
import sys
import requests as r
from croniter import croniter # used for checking cron formatting
from dotenv import load_dotenv
load_dotenv() # loads environment vars
# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
class API_Handler:
"""class for interacting with the api webservice
Attributes:
db_adress (string): adress of the database
token (string): auth token for api
Methods:
reauthorize(email, password): set new credentials
get_user_keywords(user_id): gets the keywords of the user
set_keyword(user_id, keyword): sets the keyword of the user
delete_keyword(user_id, keyword): deletes the keyword of the user
get_user_shares(user_id): gets the shares of the user
set_share(user_id, symbol): sets the share of the user
delete_share(user_id, symbol): deletes the share of the user
get_user_transactions(user_id): gets the transactions of the user
set_transaction(user_id, transaction): sets the transaction of the user
get_user_portfolio(user_id): gets the portfolio of the user
set_portfolio(user_id, portfolio): sets the portfolio of the user
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
set_cron_interval(user_id, interval): sets the cron interval of the user
set_admin(email, is_admin): sets the admin status of the user with the given email
"""
def __init__(self, db_adress, email, password):
"""initializes the API_Handler class
Args:
db_adress (string): adress of the database
email (string): email of the user
password (string): password of the user
"""
self.db_adress = db_adress
payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
if p.status_code == 200:
self.token = p.json()["data"]['token'] # store token for further authentication of requests
else:
print("Error: " + str(p.status_code) + " invalid credentials")
self.token = None
def reauthorize(self, email, password): # can be used if token expired
"""set new credentials
Args:
email (string): email of the user
password (string): password of the user
Returns:
token (string): new token or None if not 200
Raises:
None
"""
payload = {'email': email, 'password': password}
with r.Session() as s:
p = s.post(self.db_adress + "/user/login", json=payload)
if p.status_code == 200:
self.token = p.json()["data"]['token']
return p.json()["data"]['token']
else:
self.token = None
return None
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
"""gets the shares of the user
Args:
user_id (int): id of the user
max_retries (int): max retries for the request
Returns:
json: json of user infos
Raises:
None
"""
if max_retries <= 0:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + "/user", headers=headers)
if (req.status_code == 200):
return req.json()["data"]
else:
return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
def get_all_users(self, max_retries=10):
"""gets all users
Args:
max_retries (int): max retries for the request
Returns:
list: list of users
Raises:
None
"""
if max_retries <= 0:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token}
req = s.get(self.db_adress + "/users", headers=headers)
if (req.status_code == 200):
return req.json()["data"]
else:
return self.get_all_users(max_retries - 1)
def get_user_keywords(self, user_id, max_retries=10):
"""gets the keywords of the user
Args:
user_id (int): id of the user
max_retries (int): max retries for the request
Returns:
list: list of keywords
Raises:
None
"""
if max_retries <= 0:
return None
keywords = []
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/keywords", headers=headers)
if (req.status_code == 200):
keywords_json = req.json()["data"]
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword["keyword"])
return keywords # will be empty if no keywords are set
else:
return self.get_user_keywords(user_id, max_retries - 1)
def set_keyword(self, user_id, keyword):
"""sets the keyword of the user
Args:
user_id (int): id of the user
keyword (int): keyword of the user
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
return req.status_code
def delete_keyword(self, user_id, keyword):
"""deletes the keyword of the user
Args:
user_id (int): id of the user
keyword (string): keyword of the user
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
return req.status_code
def get_user_shares(self, user_id, max_retries=10):
"""gets the shares of the user
Args:
user_id (int): id of the user
max_retries (int): max retries for the request
Returns:
list: list of shares
Raises:
None
"""
if max_retries <= 0:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/shares", headers=headers)
if (req.status_code == 200):
shares_json = req.json()["data"]
shares = []
for share in shares_json:
shares.append(share["isin"]) # we only want the isin of the shares
return shares
else:
return self.get_user_shares(user_id, max_retries - 1)
def set_share(self, user_id, isin, comment):
"""sets the share of the user
Args:
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
comment (string): comment of the share
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
return req.status_code
def delete_share(self, user_id, isin):
"""deletes the share of the user
Args:
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code
def get_user_transactions(self, user_id, max_retries=10):
"""gets the transactions of the user
Args:
user_id (int): id of the user
max_retries (int): max retries for the request
Returns:
dict: dictionary of transactions
Raises:
None
"""
if max_retries <= 0:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/transactions", headers=headers)
if req.status_code == 200:
transactions_dict = req.json()["data"]
return transactions_dict
else:
return self.get_user_transactions(user_id, max_retries - 1)
def set_transaction(self, user_id, comment, isin, count, price, time):
"""sets the transaction of the user
Args:
user_id (int): id of the user
comment (string): comment of the transaction
isin (string): isin of the transaction
count (float): count of the transaction
price (float): price of the transaction
time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
"time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
return req.status_code
def get_user_portfolio(self, user_id, max_retries=10):
"""gets the portfolio of the user
Args:
user_id (int): id of the user
max_retries (int): max retries for the request
Returns:
dict: dictionary of portfolio
Raises:
None
"""
if max_retries <= 0:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()["data"] # get the data of the JSON
return portfolio_dict
else:
return self.get_user_portfolio(user_id, max_retries - 1)
def set_cron_interval(self, user_id, cron_interval):
"""sets the cron interval of the user
Args:
user_id (int): id of the user
cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
Returns:
int: status code
Raises:
None
"""
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print("Error: Invalid cron format")
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code
def set_admin(self, email, is_admin):
"""sets the admin of the user
Args:
email (string): email of the user
is_admin (bool): "true" if user should be Admin, "false" if not
Returns:
int: status code
Raises:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
return req.status_code
if __name__ == "__main__": # editable, just for basic on the go testing of new functions
print("This is a module for the telegram bot. It is not intended to be run directly.")
handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env
print(handler.token)
keywords = handler.get_user_keywords(user_id=1709356058) # user_id here is currently mine (Linus)
print(keywords)
shares = handler.get_user_portfolio(user_id=1709356058)
print("set cron with status: " + str(handler.set_cron_interval(user_id=1709356058, cron_interval="0 0 * * *")))
user = handler.get_user(user_id=1709356058)
print(user)
all_users = handler.get_all_users()
admin_status = handler.set_admin("test@test.com", "true")
print(admin_status)
print(all_users)
print(shares)
sys.exit(1)
Classes
class API_Handler (db_adress, email, password)
-
class for interacting with the api webservice
Attributes
db_adress
:string
- adress of the database
token
:string
- auth token for api
Methods
reauthorize(email, password): set new credentials get_user_keywords(user_id): gets the keywords of the user set_keyword(user_id, keyword): sets the keyword of the user delete_keyword(user_id, keyword): deletes the keyword of the user get_user_shares(user_id): gets the shares of the user set_share(user_id, symbol): sets the share of the user delete_share(user_id, symbol): deletes the share of the user get_user_transactions(user_id): gets the transactions of the user set_transaction(user_id, transaction): sets the transaction of the user get_user_portfolio(user_id): gets the portfolio of the user set_portfolio(user_id, portfolio): sets the portfolio of the user delete_portfolio(user_id, portfolio): deletes the portfolio of the user set_cron_interval(user_id, interval): sets the cron interval of the user set_admin(email, is_admin): sets the admin status of the user with the given email
initializes the API_Handler class
Args
db_adress
:string
- adress of the database
email
:string
- email of the user
password
:string
- password of the user
Expand source code
class API_Handler: """class for interacting with the api webservice Attributes: db_adress (string): adress of the database token (string): auth token for api Methods: reauthorize(email, password): set new credentials get_user_keywords(user_id): gets the keywords of the user set_keyword(user_id, keyword): sets the keyword of the user delete_keyword(user_id, keyword): deletes the keyword of the user get_user_shares(user_id): gets the shares of the user set_share(user_id, symbol): sets the share of the user delete_share(user_id, symbol): deletes the share of the user get_user_transactions(user_id): gets the transactions of the user set_transaction(user_id, transaction): sets the transaction of the user get_user_portfolio(user_id): gets the portfolio of the user set_portfolio(user_id, portfolio): sets the portfolio of the user delete_portfolio(user_id, portfolio): deletes the portfolio of the user set_cron_interval(user_id, interval): sets the cron interval of the user set_admin(email, is_admin): sets the admin status of the user with the given email """ def __init__(self, db_adress, email, password): """initializes the API_Handler class Args: db_adress (string): adress of the database email (string): email of the user password (string): password of the user """ self.db_adress = db_adress payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account) with r.Session() as s: # open session p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice if p.status_code == 200: self.token = p.json()["data"]['token'] # store token for further authentication of requests else: print("Error: " + str(p.status_code) + " invalid credentials") self.token = None def reauthorize(self, email, password): # can be used if token expired """set new credentials Args: email (string): email of the user password (string): password of the user Returns: token (string): new token or None if not 200 Raises: None """ payload = {'email': email, 'password': password} with r.Session() as s: p = s.post(self.db_adress + "/user/login", json=payload) if p.status_code == 200: self.token = p.json()["data"]['token'] return p.json()["data"]['token'] else: self.token = None return None def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails """gets the shares of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: json: json of user infos Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from) req = s.get(self.db_adress + "/user", headers=headers) if (req.status_code == 200): return req.json()["data"] else: return self.get_user(user_id, max_retries - 1) # if request fails try again recursively def get_all_users(self, max_retries=10): """gets all users Args: max_retries (int): max retries for the request Returns: list: list of users Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token} req = s.get(self.db_adress + "/users", headers=headers) if (req.status_code == 200): return req.json()["data"] else: return self.get_all_users(max_retries - 1) def get_user_keywords(self, user_id, max_retries=10): """gets the keywords of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: list: list of keywords Raises: None """ if max_retries <= 0: return None keywords = [] with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/keywords", headers=headers) if (req.status_code == 200): keywords_json = req.json()["data"] for keyword in keywords_json: # keywords_json is a list of dictionaries keywords.append(keyword["keyword"]) return keywords # will be empty if no keywords are set else: return self.get_user_keywords(user_id, max_retries - 1) def set_keyword(self, user_id, keyword): """sets the keyword of the user Args: user_id (int): id of the user keyword (int): keyword of the user Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) return req.status_code def delete_keyword(self, user_id, keyword): """deletes the keyword of the user Args: user_id (int): id of the user keyword (string): keyword of the user Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) return req.status_code def get_user_shares(self, user_id, max_retries=10): """gets the shares of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: list: list of shares Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/shares", headers=headers) if (req.status_code == 200): shares_json = req.json()["data"] shares = [] for share in shares_json: shares.append(share["isin"]) # we only want the isin of the shares return shares else: return self.get_user_shares(user_id, max_retries - 1) def set_share(self, user_id, isin, comment): """sets the share of the user Args: user_id (int): id of the user isin (string): identifier of the share (standard is isin) comment (string): comment of the share Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc." return req.status_code def delete_share(self, user_id, isin): """deletes the share of the user Args: user_id (int): id of the user isin (string): identifier of the share (standard is isin) Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions! return req.status_code def get_user_transactions(self, user_id, max_retries=10): """gets the transactions of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: dict: dictionary of transactions Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/transactions", headers=headers) if req.status_code == 200: transactions_dict = req.json()["data"] return transactions_dict else: return self.get_user_transactions(user_id, max_retries - 1) def set_transaction(self, user_id, comment, isin, count, price, time): """sets the transaction of the user Args: user_id (int): id of the user comment (string): comment of the transaction isin (string): isin of the transaction count (float): count of the transaction price (float): price of the transaction time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z" Returns: int: status code Raises: None """ with r.Session() as s: time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers) return req.status_code def get_user_portfolio(self, user_id, max_retries=10): """gets the portfolio of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: dict: dictionary of portfolio Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON if req.status_code == 200: portfolio_dict = req.json()["data"] # get the data of the JSON return portfolio_dict else: return self.get_user_portfolio(user_id, max_retries - 1) def set_cron_interval(self, user_id, cron_interval): """sets the cron interval of the user Args: user_id (int): id of the user cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting Returns: int: status code Raises: None """ if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format print("Error: Invalid cron format") return -1 # return error code -1 if invalid cron format with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs) return req.status_code def set_admin(self, email, is_admin): """sets the admin of the user Args: email (string): email of the user is_admin (bool): "true" if user should be Admin, "false" if not Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers) return req.status_code
Methods
def delete_keyword(self, user_id, keyword)
-
deletes the keyword of the user
Args
user_id
:int
- id of the user
keyword
:string
- keyword of the user
Returns
int
- status code
Raises
None
Expand source code
def delete_keyword(self, user_id, keyword): """deletes the keyword of the user Args: user_id (int): id of the user keyword (string): keyword of the user Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) return req.status_code
-
deletes the share of the user
Args
user_id
:int
- id of the user
isin
:string
- identifier of the share (standard is isin)
Returns
int
- status code
Raises
None
Expand source code
def delete_share(self, user_id, isin): """deletes the share of the user Args: user_id (int): id of the user isin (string): identifier of the share (standard is isin) Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions! return req.status_code
def get_all_users(self, max_retries=10)
-
gets all users
Args
max_retries
:int
- max retries for the request
Returns
list
- list of users
Raises
None
Expand source code
def get_all_users(self, max_retries=10): """gets all users Args: max_retries (int): max retries for the request Returns: list: list of users Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token} req = s.get(self.db_adress + "/users", headers=headers) if (req.status_code == 200): return req.json()["data"] else: return self.get_all_users(max_retries - 1)
def get_user(self, user_id, max_retries=10)
-
gets the shares of the user
Args
user_id
:int
- id of the user
max_retries
:int
- max retries for the request
Returns
json
- json of user infos
Raises
None
Expand source code
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails """gets the shares of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: json: json of user infos Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from) req = s.get(self.db_adress + "/user", headers=headers) if (req.status_code == 200): return req.json()["data"] else: return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
def get_user_keywords(self, user_id, max_retries=10)
-
gets the keywords of the user
Args
user_id
:int
- id of the user
max_retries
:int
- max retries for the request
Returns
list
- list of keywords
Raises
None
Expand source code
def get_user_keywords(self, user_id, max_retries=10): """gets the keywords of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: list: list of keywords Raises: None """ if max_retries <= 0: return None keywords = [] with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/keywords", headers=headers) if (req.status_code == 200): keywords_json = req.json()["data"] for keyword in keywords_json: # keywords_json is a list of dictionaries keywords.append(keyword["keyword"]) return keywords # will be empty if no keywords are set else: return self.get_user_keywords(user_id, max_retries - 1)
def get_user_portfolio(self, user_id, max_retries=10)
-
gets the portfolio of the user
Args
user_id
:int
- id of the user
max_retries
:int
- max retries for the request
Returns
dict
- dictionary of portfolio
Raises
None
Expand source code
def get_user_portfolio(self, user_id, max_retries=10): """gets the portfolio of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: dict: dictionary of portfolio Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON if req.status_code == 200: portfolio_dict = req.json()["data"] # get the data of the JSON return portfolio_dict else: return self.get_user_portfolio(user_id, max_retries - 1)
-
gets the shares of the user
Args
user_id
:int
- id of the user
max_retries
:int
- max retries for the request
Returns
list
- list of shares
Raises
None
Expand source code
def get_user_shares(self, user_id, max_retries=10): """gets the shares of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: list: list of shares Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/shares", headers=headers) if (req.status_code == 200): shares_json = req.json()["data"] shares = [] for share in shares_json: shares.append(share["isin"]) # we only want the isin of the shares return shares else: return self.get_user_shares(user_id, max_retries - 1)
def get_user_transactions(self, user_id, max_retries=10)
-
gets the transactions of the user
Args
user_id
:int
- id of the user
max_retries
:int
- max retries for the request
Returns
dict
- dictionary of transactions
Raises
None
Expand source code
def get_user_transactions(self, user_id, max_retries=10): """gets the transactions of the user Args: user_id (int): id of the user max_retries (int): max retries for the request Returns: dict: dictionary of transactions Raises: None """ if max_retries <= 0: return None with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.get(self.db_adress + "/transactions", headers=headers) if req.status_code == 200: transactions_dict = req.json()["data"] return transactions_dict else: return self.get_user_transactions(user_id, max_retries - 1)
-
set new credentials
Args
email
:string
- email of the user
password
:string
- password of the user
Returns
token (string): new token or None if not 200
Raises
None
Expand source code
def reauthorize(self, email, password): # can be used if token expired """set new credentials Args: email (string): email of the user password (string): password of the user Returns: token (string): new token or None if not 200 Raises: None """ payload = {'email': email, 'password': password} with r.Session() as s: p = s.post(self.db_adress + "/user/login", json=payload) if p.status_code == 200: self.token = p.json()["data"]['token'] return p.json()["data"]['token'] else: self.token = None return None
def set_admin(self, email, is_admin)
-
sets the admin of the user
Args
email
:string
- email of the user
is_admin
:bool
- "true" if user should be Admin, "false" if not
Returns
int
- status code
Raises
None
Expand source code
def set_admin(self, email, is_admin): """sets the admin of the user Args: email (string): email of the user is_admin (bool): "true" if user should be Admin, "false" if not Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers) return req.status_code
def set_cron_interval(self, user_id, cron_interval)
-
sets the cron interval of the user
Args
user_id
:int
- id of the user
cron_interval
:String
- Update interval in cron format => see https://crontab.guru/ for formatting
Returns
int
- status code
Raises
None
Expand source code
def set_cron_interval(self, user_id, cron_interval): """sets the cron interval of the user Args: user_id (int): id of the user cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting Returns: int: status code Raises: None """ if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format print("Error: Invalid cron format") return -1 # return error code -1 if invalid cron format with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs) return req.status_code
def set_keyword(self, user_id, keyword)
-
sets the keyword of the user
Args
user_id
:int
- id of the user
keyword
:int
- keyword of the user
Returns
int
- status code
Raises
None
Expand source code
def set_keyword(self, user_id, keyword): """sets the keyword of the user Args: user_id (int): id of the user keyword (int): keyword of the user Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) return req.status_code
-
sets the share of the user
Args
user_id
:int
- id of the user
isin
:string
- identifier of the share (standard is isin)
comment
:string
- comment of the share
Returns
int
- status code
Raises
None
Expand source code
def set_share(self, user_id, isin, comment): """sets the share of the user Args: user_id (int): id of the user isin (string): identifier of the share (standard is isin) comment (string): comment of the share Returns: int: status code Raises: None """ with r.Session() as s: headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc." return req.status_code
def set_transaction(self, user_id, comment, isin, count, price, time)
-
sets the transaction of the user
Args
user_id
:int
- id of the user
comment
:string
- comment of the transaction
isin
:string
- isin of the transaction
count
:float
- count of the transaction
price
:float
- price of the transaction
time
:string
- time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
Returns
int
- status code
Raises
None
Expand source code
def set_transaction(self, user_id, comment, isin, count, price, time): """sets the transaction of the user Args: user_id (int): id of the user comment (string): comment of the transaction isin (string): isin of the transaction count (float): count of the transaction price (float): price of the transaction time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z" Returns: int: status code Raises: None """ with r.Session() as s: time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers) return req.status_code