Module telegram_bot.api_handling.api_handler
+script for communicating with webservice to get data from database
++Expand source code +
+"""
+script for communicating with webservice to get data from database
+"""
+__author__ = "Florian Kellermann, Linus Eickhoff"
+__date__ = "10.05.2022"
+__version__ = "1.0.2"
+__license__ = "None"
+
+# side-dependencies: none
+# Work in Progress
+
+import os
+import sys
+
+import requests as r
+from croniter import croniter # used for checking cron formatting
+from dotenv import load_dotenv
+
+load_dotenv() # loads environment vars
+
+
+# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
+
+class API_Handler:
+ """class for interacting with the api webservice
+
+ Attributes:
+ db_adress (string): adress of the database
+ token (string): auth token for api
+
+ Methods:
+ reauthorize(email, password): set new credentials
+ get_user_keywords(user_id): gets the keywords of the user
+ set_keyword(user_id, keyword): sets the keyword of the user
+ delete_keyword(user_id, keyword): deletes the keyword of the user
+ get_user_shares(user_id): gets the shares of the user
+ set_share(user_id, symbol): sets the share of the user
+ delete_share(user_id, symbol): deletes the share of the user
+ get_user_transactions(user_id): gets the transactions of the user
+ set_transaction(user_id, transaction): sets the transaction of the user
+ get_user_portfolio(user_id): gets the portfolio of the user
+ set_portfolio(user_id, portfolio): sets the portfolio of the user
+ delete_portfolio(user_id, portfolio): deletes the portfolio of the user
+ set_cron_interval(user_id, interval): sets the cron interval of the user
+ set_admin(email, is_admin): sets the admin status of the user with the given email
+ """
+
+ def __init__(self, db_adress, email, password):
+ """initializes the API_Handler class
+
+ Args:
+ db_adress (string): adress of the database
+ email (string): email of the user
+ password (string): password of the user
+ """
+ self.db_adress = db_adress
+
+ payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
+ with r.Session() as s: # open session
+ p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
+ if p.status_code == 200:
+ self.token = p.json()["data"]['token'] # store token for further authentication of requests
+ else:
+ print("Error: " + str(p.status_code) + " invalid credentials")
+ self.token = None
+
+ def reauthorize(self, email, password): # can be used if token expired
+ """set new credentials
+
+ Args:
+ email (string): email of the user
+ password (string): password of the user
+
+ Returns:
+ token (string): new token or None if not 200
+
+ Raises:
+ None
+ """
+ payload = {'email': email, 'password': password}
+ with r.Session() as s:
+ p = s.post(self.db_adress + "/user/login", json=payload)
+ if p.status_code == 200:
+ self.token = p.json()["data"]['token']
+ return p.json()["data"]['token']
+ else:
+ self.token = None
+ return None
+
+ def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
+ """gets the shares of the user
+
+ Args:
+ user_id (int): id of the user
+ max_retries (int): max retries for the request
+
+ Returns:
+ json: json of user infos
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
+ req = s.get(self.db_adress + "/user", headers=headers)
+ if (req.status_code == 200):
+ return req.json()["data"]
+
+ else:
+ return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
+
+ def get_all_users(self, max_retries=10):
+ """gets all users
+
+ Args:
+ max_retries (int): max retries for the request
+
+ Returns:
+ list: list of users
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token}
+ req = s.get(self.db_adress + "/users", headers=headers)
+ if (req.status_code == 200):
+ return req.json()["data"]
+
+ else:
+ return self.get_all_users(max_retries - 1)
+
+ def get_user_keywords(self, user_id, max_retries=10):
+ """gets the keywords of the user
+
+ Args:
+ user_id (int): id of the user
+ max_retries (int): max retries for the request
+
+ Returns:
+ list: list of keywords
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ keywords = []
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.get(self.db_adress + "/keywords", headers=headers)
+ if (req.status_code == 200):
+ keywords_json = req.json()["data"]
+ for keyword in keywords_json: # keywords_json is a list of dictionaries
+ keywords.append(keyword["keyword"])
+
+ return keywords # will be empty if no keywords are set
+
+ else:
+ return self.get_user_keywords(user_id, max_retries - 1)
+
+ def set_keyword(self, user_id, keyword):
+ """sets the keyword of the user
+
+ Args:
+ user_id (int): id of the user
+ keyword (int): keyword of the user
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
+
+ return req.status_code
+
+ def delete_keyword(self, user_id, keyword):
+ """deletes the keyword of the user
+
+ Args:
+ user_id (int): id of the user
+ keyword (string): keyword of the user
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)
+
+ return req.status_code
+
+ def get_user_shares(self, user_id, max_retries=10):
+ """gets the shares of the user
+
+ Args:
+ user_id (int): id of the user
+ max_retries (int): max retries for the request
+
+ Returns:
+ list: list of shares
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.get(self.db_adress + "/shares", headers=headers)
+ if (req.status_code == 200):
+ shares_json = req.json()["data"]
+ shares = []
+ for share in shares_json:
+ shares.append(share["isin"]) # we only want the isin of the shares
+
+ return shares
+
+ else:
+ return self.get_user_shares(user_id, max_retries - 1)
+
+ def set_share(self, user_id, isin, comment):
+ """sets the share of the user
+
+ Args:
+ user_id (int): id of the user
+ isin (string): identifier of the share (standard is isin)
+ comment (string): comment of the share
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
+ headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
+ return req.status_code
+
+ def delete_share(self, user_id, isin):
+ """deletes the share of the user
+
+ Args:
+ user_id (int): id of the user
+ isin (string): identifier of the share (standard is isin)
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
+ return req.status_code
+
+ def get_user_transactions(self, user_id, max_retries=10):
+ """gets the transactions of the user
+
+ Args:
+ user_id (int): id of the user
+ max_retries (int): max retries for the request
+
+ Returns:
+ dict: dictionary of transactions
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.get(self.db_adress + "/transactions", headers=headers)
+
+ if req.status_code == 200:
+ transactions_dict = req.json()["data"]
+ return transactions_dict
+ else:
+ return self.get_user_transactions(user_id, max_retries - 1)
+
+ def set_transaction(self, user_id, comment, isin, count, price, time):
+ """sets the transaction of the user
+
+ Args:
+ user_id (int): id of the user
+ comment (string): comment of the transaction
+ isin (string): isin of the transaction
+ count (float): count of the transaction
+ price (float): price of the transaction
+ time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
+ "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
+ req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
+ return req.status_code
+
+ def get_user_portfolio(self, user_id, max_retries=10):
+ """gets the portfolio of the user
+
+ Args:
+ user_id (int): id of the user
+ max_retries (int): max retries for the request
+
+ Returns:
+ dict: dictionary of portfolio
+
+ Raises:
+ None
+ """
+ if max_retries <= 0:
+ return None
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
+ if req.status_code == 200:
+ portfolio_dict = req.json()["data"] # get the data of the JSON
+ return portfolio_dict
+ else:
+ return self.get_user_portfolio(user_id, max_retries - 1)
+
+ def set_cron_interval(self, user_id, cron_interval):
+ """sets the cron interval of the user
+
+ Args:
+ user_id (int): id of the user
+ cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
+ print("Error: Invalid cron format")
+ return -1 # return error code -1 if invalid cron format
+
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
+ req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
+ return req.status_code
+
+ def set_admin(self, email, is_admin):
+ """sets the admin of the user
+
+ Args:
+ email (string): email of the user
+ is_admin (bool): "true" if user should be Admin, "false" if not
+
+ Returns:
+ int: status code
+
+ Raises:
+ None
+ """
+ with r.Session() as s:
+ headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
+ req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
+ return req.status_code
+
+
+if __name__ == "__main__": # editable, just for basic on the go testing of new functions
+
+ print("This is a module for the telegram bot. It is not intended to be run directly.")
+ handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env
+ print(handler.token)
+ keywords = handler.get_user_keywords(user_id=1709356058) # user_id here is currently mine (Linus)
+ print(keywords)
+ shares = handler.get_user_portfolio(user_id=1709356058)
+ print("set cron with status: " + str(handler.set_cron_interval(user_id=1709356058, cron_interval="0 0 * * *")))
+ user = handler.get_user(user_id=1709356058)
+ print(user)
+ all_users = handler.get_all_users()
+ admin_status = handler.set_admin("test@test.com", "true")
+ print(admin_status)
+ print(all_users)
+ print(shares)
+ sys.exit(1)
+Classes
+-
+
+class API_Handler +(db_adress, email, password) +
+-
++
class for interacting with the api webservice
+Attributes
+-
+
db_adress
:string
+- adress of the database +
token
:string
+- auth token for api +
Methods
+reauthorize(email, password): set new credentials +get_user_keywords(user_id): gets the keywords of the user +set_keyword(user_id, keyword): sets the keyword of the user +delete_keyword(user_id, keyword): deletes the keyword of the user +get_user_shares(user_id): gets the shares of the user +set_share(user_id, symbol): sets the share of the user +delete_share(user_id, symbol): deletes the share of the user +get_user_transactions(user_id): gets the transactions of the user +set_transaction(user_id, transaction): sets the transaction of the user +get_user_portfolio(user_id): gets the portfolio of the user +set_portfolio(user_id, portfolio): sets the portfolio of the user +delete_portfolio(user_id, portfolio): deletes the portfolio of the user +set_cron_interval(user_id, interval): sets the cron interval of the user +set_admin(email, is_admin): sets the admin status of the user with the given email
+initializes the API_Handler class
+Args
+-
+
db_adress
:string
+- adress of the database +
email
:string
+- email of the user +
password
:string
+- password of the user +
+++Expand source code +
+
+class API_Handler: + """class for interacting with the api webservice + + Attributes: + db_adress (string): adress of the database + token (string): auth token for api + + Methods: + reauthorize(email, password): set new credentials + get_user_keywords(user_id): gets the keywords of the user + set_keyword(user_id, keyword): sets the keyword of the user + delete_keyword(user_id, keyword): deletes the keyword of the user + get_user_shares(user_id): gets the shares of the user + set_share(user_id, symbol): sets the share of the user + delete_share(user_id, symbol): deletes the share of the user + get_user_transactions(user_id): gets the transactions of the user + set_transaction(user_id, transaction): sets the transaction of the user + get_user_portfolio(user_id): gets the portfolio of the user + set_portfolio(user_id, portfolio): sets the portfolio of the user + delete_portfolio(user_id, portfolio): deletes the portfolio of the user + set_cron_interval(user_id, interval): sets the cron interval of the user + set_admin(email, is_admin): sets the admin status of the user with the given email + """ + + def __init__(self, db_adress, email, password): + """initializes the API_Handler class + + Args: + db_adress (string): adress of the database + email (string): email of the user + password (string): password of the user + """ + self.db_adress = db_adress + + payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account) + with r.Session() as s: # open session + p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice + if p.status_code == 200: + self.token = p.json()["data"]['token'] # store token for further authentication of requests + else: + print("Error: " + str(p.status_code) + " invalid credentials") + self.token = None + + def reauthorize(self, email, password): # can be used if token expired + """set new credentials + + Args: + email (string): email of the user + password (string): password of the user + + Returns: + token (string): new token or None if not 200 + + Raises: + None + """ + payload = {'email': email, 'password': password} + with r.Session() as s: + p = s.post(self.db_adress + "/user/login", json=payload) + if p.status_code == 200: + self.token = p.json()["data"]['token'] + return p.json()["data"]['token'] + else: + self.token = None + return None + + def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails + """gets the shares of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + json: json of user infos + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from) + req = s.get(self.db_adress + "/user", headers=headers) + if (req.status_code == 200): + return req.json()["data"] + + else: + return self.get_user(user_id, max_retries - 1) # if request fails try again recursively + + def get_all_users(self, max_retries=10): + """gets all users + + Args: + max_retries (int): max retries for the request + + Returns: + list: list of users + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token} + req = s.get(self.db_adress + "/users", headers=headers) + if (req.status_code == 200): + return req.json()["data"] + + else: + return self.get_all_users(max_retries - 1) + + def get_user_keywords(self, user_id, max_retries=10): + """gets the keywords of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + list: list of keywords + + Raises: + None + """ + if max_retries <= 0: + return None + + keywords = [] + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/keywords", headers=headers) + if (req.status_code == 200): + keywords_json = req.json()["data"] + for keyword in keywords_json: # keywords_json is a list of dictionaries + keywords.append(keyword["keyword"]) + + return keywords # will be empty if no keywords are set + + else: + return self.get_user_keywords(user_id, max_retries - 1) + + def set_keyword(self, user_id, keyword): + """sets the keyword of the user + + Args: + user_id (int): id of the user + keyword (int): keyword of the user + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) + + return req.status_code + + def delete_keyword(self, user_id, keyword): + """deletes the keyword of the user + + Args: + user_id (int): id of the user + keyword (string): keyword of the user + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) + + return req.status_code + + def get_user_shares(self, user_id, max_retries=10): + """gets the shares of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + list: list of shares + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/shares", headers=headers) + if (req.status_code == 200): + shares_json = req.json()["data"] + shares = [] + for share in shares_json: + shares.append(share["isin"]) # we only want the isin of the shares + + return shares + + else: + return self.get_user_shares(user_id, max_retries - 1) + + def set_share(self, user_id, isin, comment): + """sets the share of the user + + Args: + user_id (int): id of the user + isin (string): identifier of the share (standard is isin) + comment (string): comment of the share + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, + headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc." + return req.status_code + + def delete_share(self, user_id, isin): + """deletes the share of the user + + Args: + user_id (int): id of the user + isin (string): identifier of the share (standard is isin) + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions! + return req.status_code + + def get_user_transactions(self, user_id, max_retries=10): + """gets the transactions of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + dict: dictionary of transactions + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/transactions", headers=headers) + + if req.status_code == 200: + transactions_dict = req.json()["data"] + return transactions_dict + else: + return self.get_user_transactions(user_id, max_retries - 1) + + def set_transaction(self, user_id, comment, isin, count, price, time): + """sets the transaction of the user + + Args: + user_id (int): id of the user + comment (string): comment of the transaction + isin (string): isin of the transaction + count (float): count of the transaction + price (float): price of the transaction + time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z" + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), + "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs + req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers) + return req.status_code + + def get_user_portfolio(self, user_id, max_retries=10): + """gets the portfolio of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + dict: dictionary of portfolio + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON + if req.status_code == 200: + portfolio_dict = req.json()["data"] # get the data of the JSON + return portfolio_dict + else: + return self.get_user_portfolio(user_id, max_retries - 1) + + def set_cron_interval(self, user_id, cron_interval): + """sets the cron interval of the user + + Args: + user_id (int): id of the user + cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting + + Returns: + int: status code + + Raises: + None + """ + if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format + print("Error: Invalid cron format") + return -1 # return error code -1 if invalid cron format + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs) + return req.status_code + + def set_admin(self, email, is_admin): + """sets the admin of the user + + Args: + email (string): email of the user + is_admin (bool): "true" if user should be Admin, "false" if not + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email + req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers) + return req.status_code
Methods
+-
+
+def delete_keyword(self, user_id, keyword) +
+-
++
deletes the keyword of the user
+Args
+-
+
user_id
:int
+- id of the user +
keyword
:string
+- keyword of the user +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def delete_keyword(self, user_id, keyword): + """deletes the keyword of the user + + Args: + user_id (int): id of the user + keyword (string): keyword of the user + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) + + return req.status_code
+
+ -
++
deletes the share of the user
+Args
+-
+
user_id
:int
+- id of the user +
isin
:string
+- identifier of the share (standard is isin) +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def delete_share(self, user_id, isin): + """deletes the share of the user + + Args: + user_id (int): id of the user + isin (string): identifier of the share (standard is isin) + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions! + return req.status_code
+ +def get_all_users(self, max_retries=10) +
+-
++
gets all users
+Args
+-
+
max_retries
:int
+- max retries for the request +
Returns
+-
+
list
+- list of users +
Raises
+None
+++Expand source code +
+
+def get_all_users(self, max_retries=10): + """gets all users + + Args: + max_retries (int): max retries for the request + + Returns: + list: list of users + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token} + req = s.get(self.db_adress + "/users", headers=headers) + if (req.status_code == 200): + return req.json()["data"] + + else: + return self.get_all_users(max_retries - 1)
+ +def get_user(self, user_id, max_retries=10) +
+-
++
gets the shares of the user
+Args
+-
+
user_id
:int
+- id of the user +
max_retries
:int
+- max retries for the request +
Returns
+-
+
json
+- json of user infos +
Raises
+None
+++Expand source code +
+
+def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails + """gets the shares of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + json: json of user infos + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from) + req = s.get(self.db_adress + "/user", headers=headers) + if (req.status_code == 200): + return req.json()["data"] + + else: + return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
+ +def get_user_keywords(self, user_id, max_retries=10) +
+-
++
gets the keywords of the user
+Args
+-
+
user_id
:int
+- id of the user +
max_retries
:int
+- max retries for the request +
Returns
+-
+
list
+- list of keywords +
Raises
+None
+++Expand source code +
+
+def get_user_keywords(self, user_id, max_retries=10): + """gets the keywords of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + list: list of keywords + + Raises: + None + """ + if max_retries <= 0: + return None + + keywords = [] + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/keywords", headers=headers) + if (req.status_code == 200): + keywords_json = req.json()["data"] + for keyword in keywords_json: # keywords_json is a list of dictionaries + keywords.append(keyword["keyword"]) + + return keywords # will be empty if no keywords are set + + else: + return self.get_user_keywords(user_id, max_retries - 1)
+ +def get_user_portfolio(self, user_id, max_retries=10) +
+-
++
gets the portfolio of the user
+Args
+-
+
user_id
:int
+- id of the user +
max_retries
:int
+- max retries for the request +
Returns
+-
+
dict
+- dictionary of portfolio +
Raises
+None
+++Expand source code +
+
+def get_user_portfolio(self, user_id, max_retries=10): + """gets the portfolio of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + dict: dictionary of portfolio + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON + if req.status_code == 200: + portfolio_dict = req.json()["data"] # get the data of the JSON + return portfolio_dict + else: + return self.get_user_portfolio(user_id, max_retries - 1)
+
+ -
++
gets the shares of the user
+Args
+-
+
user_id
:int
+- id of the user +
max_retries
:int
+- max retries for the request +
Returns
+-
+
list
+- list of shares +
Raises
+None
+++Expand source code +
+
+def get_user_shares(self, user_id, max_retries=10): + """gets the shares of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + list: list of shares + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/shares", headers=headers) + if (req.status_code == 200): + shares_json = req.json()["data"] + shares = [] + for share in shares_json: + shares.append(share["isin"]) # we only want the isin of the shares + + return shares + + else: + return self.get_user_shares(user_id, max_retries - 1)
+ +def get_user_transactions(self, user_id, max_retries=10) +
+-
++
gets the transactions of the user
+Args
+-
+
user_id
:int
+- id of the user +
max_retries
:int
+- max retries for the request +
Returns
+-
+
dict
+- dictionary of transactions +
Raises
+None
+++Expand source code +
+
+def get_user_transactions(self, user_id, max_retries=10): + """gets the transactions of the user + + Args: + user_id (int): id of the user + max_retries (int): max retries for the request + + Returns: + dict: dictionary of transactions + + Raises: + None + """ + if max_retries <= 0: + return None + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.get(self.db_adress + "/transactions", headers=headers) + + if req.status_code == 200: + transactions_dict = req.json()["data"] + return transactions_dict + else: + return self.get_user_transactions(user_id, max_retries - 1)
+
+ -
++
set new credentials
+Args
+-
+
email
:string
+- email of the user +
password
:string
+- password of the user +
Returns
+token (string): new token or None if not 200
+Raises
+None
+++Expand source code +
+
+def reauthorize(self, email, password): # can be used if token expired + """set new credentials + + Args: + email (string): email of the user + password (string): password of the user + + Returns: + token (string): new token or None if not 200 + + Raises: + None + """ + payload = {'email': email, 'password': password} + with r.Session() as s: + p = s.post(self.db_adress + "/user/login", json=payload) + if p.status_code == 200: + self.token = p.json()["data"]['token'] + return p.json()["data"]['token'] + else: + self.token = None + return None
+ +def set_admin(self, email, is_admin) +
+-
++
sets the admin of the user
+Args
+-
+
email
:string
+- email of the user +
is_admin
:bool
+- "true" if user should be Admin, "false" if not +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def set_admin(self, email, is_admin): + """sets the admin of the user + + Args: + email (string): email of the user + is_admin (bool): "true" if user should be Admin, "false" if not + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email + req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers) + return req.status_code
+ +def set_cron_interval(self, user_id, cron_interval) +
+-
++
sets the cron interval of the user
+Args
+-
+
user_id
:int
+- id of the user +
cron_interval
:String
+- Update interval in cron format => see https://crontab.guru/ for formatting +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def set_cron_interval(self, user_id, cron_interval): + """sets the cron interval of the user + + Args: + user_id (int): id of the user + cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting + + Returns: + int: status code + + Raises: + None + """ + if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format + print("Error: Invalid cron format") + return -1 # return error code -1 if invalid cron format + + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs) + return req.status_code
+ +def set_keyword(self, user_id, keyword) +
+-
++
sets the keyword of the user
+Args
+-
+
user_id
:int
+- id of the user +
keyword
:int
+- keyword of the user +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def set_keyword(self, user_id, keyword): + """sets the keyword of the user + + Args: + user_id (int): id of the user + keyword (int): keyword of the user + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers) + + return req.status_code
+
+ -
++
sets the share of the user
+Args
+-
+
user_id
:int
+- id of the user +
isin
:string
+- identifier of the share (standard is isin) +
comment
:string
+- comment of the share +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def set_share(self, user_id, isin, comment): + """sets the share of the user + + Args: + user_id (int): id of the user + isin (string): identifier of the share (standard is isin) + comment (string): comment of the share + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, + headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc." + return req.status_code
+ +def set_transaction(self, user_id, comment, isin, count, price, time) +
+-
++
sets the transaction of the user
+Args
+-
+
user_id
:int
+- id of the user +
comment
:string
+- comment of the transaction +
isin
:string
+- isin of the transaction +
count
:float
+- count of the transaction +
price
:float
+- price of the transaction +
time
:string
+- time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z" +
Returns
+-
+
int
+- status code +
Raises
+None
+++Expand source code +
+
+def set_transaction(self, user_id, comment, isin, count, price, time): + """sets the transaction of the user + + Args: + user_id (int): id of the user + comment (string): comment of the transaction + isin (string): isin of the transaction + count (float): count of the transaction + price (float): price of the transaction + time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z" + + Returns: + int: status code + + Raises: + None + """ + with r.Session() as s: + time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db + headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} + transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), + "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs + req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers) + return req.status_code
+
+