2022-05-12 13:30:56 +02:00
< section id = "section-intro" >
< p > script for communicating with webservice to get data from database< / p >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > " " "
script for communicating with webservice to get data from database
" " "
__author__ = " Florian Kellermann, Linus Eickhoff"
__date__ = " 10.05.2022"
__version__ = " 1.0.2"
__license__ = " None"
# side-dependencies: none
# Work in Progress
import os
import sys
import requests as r
from croniter import croniter # used for checking cron formatting
from dotenv import load_dotenv
load_dotenv() # loads environment vars
# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
class API_Handler:
" " " class for interacting with the api webservice
db_adress (string): adress of the database
token (string): auth token for api
reauthorize(email, password): set new credentials
get_user_keywords(user_id): gets the keywords of the user
set_keyword(user_id, keyword): sets the keyword of the user
delete_keyword(user_id, keyword): deletes the keyword of the user
get_user_shares(user_id): gets the shares of the user
set_share(user_id, symbol): sets the share of the user
delete_share(user_id, symbol): deletes the share of the user
get_user_transactions(user_id): gets the transactions of the user
set_transaction(user_id, transaction): sets the transaction of the user
get_user_portfolio(user_id): gets the portfolio of the user
set_portfolio(user_id, portfolio): sets the portfolio of the user
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
set_cron_interval(user_id, interval): sets the cron interval of the user
set_admin(email, is_admin): sets the admin status of the user with the given email
" " "
def __init__(self, db_adress, email, password):
" " " initializes the API_Handler class
db_adress (string): adress of the database
email (string): email of the user
password (string): password of the user
" " "
self.db_adress = db_adress
payload = {' email' : email, ' password' : password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + " /user/login" , json=payload) # login to webservice
if p.status_code == 200:
self.token = p.json()[" data" ][' token' ] # store token for further authentication of requests
print(" Error: " + str(p.status_code) + " invalid credentials" )
self.token = None
def reauthorize(self, email, password): # can be used if token expired
" " " set new credentials
email (string): email of the user
password (string): password of the user
token (string): new token or None if not 200
" " "
payload = {' email' : email, ' password' : password}
with r.Session() as s:
p = s.post(self.db_adress + " /user/login" , json=payload)
if p.status_code == 200:
self.token = p.json()[" data" ][' token' ]
return p.json()[" data" ][' token' ]
self.token = None
return None
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
json: json of user infos
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + " /user" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
def get_all_users(self, max_retries=10):
" " " gets all users
max_retries (int): max retries for the request
list: list of users
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token}
req = s.get(self.db_adress + " /users" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_all_users(max_retries - 1)
def get_user_keywords(self, user_id, max_retries=10):
" " " gets the keywords of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of keywords
" " "
if max_retries < = 0:
return None
keywords = []
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /keywords" , headers=headers)
if (req.status_code == 200):
keywords_json = req.json()[" data" ]
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword[" keyword" ])
return keywords # will be empty if no keywords are set
return self.get_user_keywords(user_id, max_retries - 1)
def set_keyword(self, user_id, keyword):
" " " sets the keyword of the user
user_id (int): id of the user
keyword (int): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code
def delete_keyword(self, user_id, keyword):
" " " deletes the keyword of the user
user_id (int): id of the user
keyword (string): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code
def get_user_shares(self, user_id, max_retries=10):
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of shares
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /shares" , headers=headers)
if (req.status_code == 200):
shares_json = req.json()[" data" ]
shares = []
for share in shares_json:
shares.append(share[" isin" ]) # we only want the isin of the shares
return shares
return self.get_user_shares(user_id, max_retries - 1)
def set_share(self, user_id, isin, comment):
" " " sets the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
comment (string): comment of the share
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /share" , json={" comment" : comment, " isin" : isin},
headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. " Apple Inc."
return req.status_code
def delete_share(self, user_id, isin):
" " " deletes the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /share" , json={" isin" : str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code
def get_user_transactions(self, user_id, max_retries=10):
" " " gets the transactions of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of transactions
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /transactions" , headers=headers)
if req.status_code == 200:
transactions_dict = req.json()[" data" ]
return transactions_dict
return self.get_user_transactions(user_id, max_retries - 1)
def set_transaction(self, user_id, comment, isin, count, price, time):
" " " sets the transaction of the user
user_id (int): id of the user
comment (string): comment of the transaction
isin (string): isin of the transaction
count (float): count of the transaction
price (float): price of the transaction
time (string): time of the transaction formatted like e.g. " 2011-10-05T14:48:00.000Z"
int: status code
" " "
with r.Session() as s:
time = time[:-3] + " Z" # remove last character and add Z to make it a valid date for db
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
transaction = {" comment" : str(comment), " count" : float(count), " isin" : str(isin), " price" : float(price),
" time" : str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + " /transaction" , json=transaction, headers=headers)
return req.status_code
def get_user_portfolio(self, user_id, max_retries=10):
" " " gets the portfolio of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of portfolio
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /portfolio" , headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()[" data" ] # get the data of the JSON
return portfolio_dict
return self.get_user_portfolio(user_id, max_retries - 1)
def set_cron_interval(self, user_id, cron_interval):
" " " sets the cron interval of the user
user_id (int): id of the user
cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
int: status code
" " "
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print(" Error: Invalid cron format" )
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.put(self.db_adress + " /user/setCron" , json={" cron" : str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code
def set_admin(self, email, is_admin):
" " " sets the admin of the user
email (string): email of the user
is_admin (bool): " true" if user should be Admin, " false" if not
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + " /user/setAdmin" , json={" admin" : is_admin, " email" : str(email)}, headers=headers)
return req.status_code
if __name__ == " __main__" : # editable, just for basic on the go testing of new functions
print(" This is a module for the telegram bot. It is not intended to be run directly." )
handler = API_Handler(" https://gruppe1.testsites.info/api" , str(os.getenv(" BOT_EMAIL" )), str(os.getenv(" BOT_PASSWORD" ))) # get creds from env
keywords = handler.get_user_keywords(user_id=1709356058) # user_id here is currently mine (Linus)
shares = handler.get_user_portfolio(user_id=1709356058)
print(" set cron with status: " + str(handler.set_cron_interval(user_id=1709356058, cron_interval=" 0 0 * * *" )))
user = handler.get_user(user_id=1709356058)
all_users = handler.get_all_users()
admin_status = handler.set_admin(" test@test.com" , " true" )
sys.exit(1)< / code > < / pre >
< / details >
< / section >
< section >
< / section >
< section >
< / section >
< section >
< / section >
< section >
< h2 class = "section-title" id = "header-classes" > Classes< / h2 >
< dl >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler" > < code class = "flex name class" >
< span > class < span class = "ident" > API_Handler< / span > < / span >
< span > (< / span > < span > db_adress, email, password)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > class for interacting with the api webservice< / p >
< h2 id = "attributes" > Attributes< / h2 >
< dl >
< dt > < strong > < code > db_adress< / code > < / strong > :  < code > string< / code > < / dt >
< dd > adress of the database< / dd >
< dt > < strong > < code > token< / code > < / strong > :  < code > string< / code > < / dt >
< dd > auth token for api< / dd >
< / dl >
< h2 id = "methods" > Methods< / h2 >
< p > reauthorize(email, password): set new credentials
get_user_keywords(user_id): gets the keywords of the user
set_keyword(user_id, keyword): sets the keyword of the user
delete_keyword(user_id, keyword): deletes the keyword of the user
get_user_shares(user_id): gets the shares of the user
set_share(user_id, symbol): sets the share of the user
delete_share(user_id, symbol): deletes the share of the user
get_user_transactions(user_id): gets the transactions of the user
set_transaction(user_id, transaction): sets the transaction of the user
get_user_portfolio(user_id): gets the portfolio of the user
set_portfolio(user_id, portfolio): sets the portfolio of the user
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
set_cron_interval(user_id, interval): sets the cron interval of the user
set_admin(email, is_admin): sets the admin status of the user with the given email< / p >
< p > initializes the API_Handler class< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > db_adress< / code > < / strong > :  < code > string< / code > < / dt >
< dd > adress of the database< / dd >
< dt > < strong > < code > email< / code > < / strong > :  < code > string< / code > < / dt >
< dd > email of the user< / dd >
< dt > < strong > < code > password< / code > < / strong > :  < code > string< / code > < / dt >
< dd > password of the user< / dd >
< / dl > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > class API_Handler:
" " " class for interacting with the api webservice
db_adress (string): adress of the database
token (string): auth token for api
reauthorize(email, password): set new credentials
get_user_keywords(user_id): gets the keywords of the user
set_keyword(user_id, keyword): sets the keyword of the user
delete_keyword(user_id, keyword): deletes the keyword of the user
get_user_shares(user_id): gets the shares of the user
set_share(user_id, symbol): sets the share of the user
delete_share(user_id, symbol): deletes the share of the user
get_user_transactions(user_id): gets the transactions of the user
set_transaction(user_id, transaction): sets the transaction of the user
get_user_portfolio(user_id): gets the portfolio of the user
set_portfolio(user_id, portfolio): sets the portfolio of the user
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
set_cron_interval(user_id, interval): sets the cron interval of the user
set_admin(email, is_admin): sets the admin status of the user with the given email
" " "
def __init__(self, db_adress, email, password):
" " " initializes the API_Handler class
db_adress (string): adress of the database
email (string): email of the user
password (string): password of the user
" " "
self.db_adress = db_adress
payload = {' email' : email, ' password' : password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + " /user/login" , json=payload) # login to webservice
if p.status_code == 200:
self.token = p.json()[" data" ][' token' ] # store token for further authentication of requests
print(" Error: " + str(p.status_code) + " invalid credentials" )
self.token = None
def reauthorize(self, email, password): # can be used if token expired
" " " set new credentials
email (string): email of the user
password (string): password of the user
token (string): new token or None if not 200
" " "
payload = {' email' : email, ' password' : password}
with r.Session() as s:
p = s.post(self.db_adress + " /user/login" , json=payload)
if p.status_code == 200:
self.token = p.json()[" data" ][' token' ]
return p.json()[" data" ][' token' ]
self.token = None
return None
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
json: json of user infos
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + " /user" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
def get_all_users(self, max_retries=10):
" " " gets all users
max_retries (int): max retries for the request
list: list of users
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token}
req = s.get(self.db_adress + " /users" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_all_users(max_retries - 1)
def get_user_keywords(self, user_id, max_retries=10):
" " " gets the keywords of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of keywords
" " "
if max_retries < = 0:
return None
keywords = []
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /keywords" , headers=headers)
if (req.status_code == 200):
keywords_json = req.json()[" data" ]
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword[" keyword" ])
return keywords # will be empty if no keywords are set
return self.get_user_keywords(user_id, max_retries - 1)
def set_keyword(self, user_id, keyword):
" " " sets the keyword of the user
user_id (int): id of the user
keyword (int): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code
def delete_keyword(self, user_id, keyword):
" " " deletes the keyword of the user
user_id (int): id of the user
keyword (string): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code
def get_user_shares(self, user_id, max_retries=10):
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of shares
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /shares" , headers=headers)
if (req.status_code == 200):
shares_json = req.json()[" data" ]
shares = []
for share in shares_json:
shares.append(share[" isin" ]) # we only want the isin of the shares
return shares
return self.get_user_shares(user_id, max_retries - 1)
def set_share(self, user_id, isin, comment):
" " " sets the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
comment (string): comment of the share
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /share" , json={" comment" : comment, " isin" : isin},
headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. " Apple Inc."
return req.status_code
def delete_share(self, user_id, isin):
" " " deletes the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /share" , json={" isin" : str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code
def get_user_transactions(self, user_id, max_retries=10):
" " " gets the transactions of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of transactions
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /transactions" , headers=headers)
if req.status_code == 200:
transactions_dict = req.json()[" data" ]
return transactions_dict
return self.get_user_transactions(user_id, max_retries - 1)
def set_transaction(self, user_id, comment, isin, count, price, time):
" " " sets the transaction of the user
user_id (int): id of the user
comment (string): comment of the transaction
isin (string): isin of the transaction
count (float): count of the transaction
price (float): price of the transaction
time (string): time of the transaction formatted like e.g. " 2011-10-05T14:48:00.000Z"
int: status code
" " "
with r.Session() as s:
time = time[:-3] + " Z" # remove last character and add Z to make it a valid date for db
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
transaction = {" comment" : str(comment), " count" : float(count), " isin" : str(isin), " price" : float(price),
" time" : str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + " /transaction" , json=transaction, headers=headers)
return req.status_code
def get_user_portfolio(self, user_id, max_retries=10):
" " " gets the portfolio of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of portfolio
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /portfolio" , headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()[" data" ] # get the data of the JSON
return portfolio_dict
return self.get_user_portfolio(user_id, max_retries - 1)
def set_cron_interval(self, user_id, cron_interval):
" " " sets the cron interval of the user
user_id (int): id of the user
cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
int: status code
" " "
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print(" Error: Invalid cron format" )
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.put(self.db_adress + " /user/setCron" , json={" cron" : str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code
def set_admin(self, email, is_admin):
" " " sets the admin of the user
email (string): email of the user
is_admin (bool): " true" if user should be Admin, " false" if not
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + " /user/setAdmin" , json={" admin" : is_admin, " email" : str(email)}, headers=headers)
return req.status_code< / code > < / pre >
< / details >
< h3 > Methods< / h3 >
< dl >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.delete_keyword" > < code class = "name flex" >
< span > def < span class = "ident" > delete_keyword< / span > < / span > (< span > self, user_id, keyword)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > deletes the keyword of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > keyword< / code > < / strong > :  < code > string< / code > < / dt >
< dd > keyword of the user< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def delete_keyword(self, user_id, keyword):
" " " deletes the keyword of the user
user_id (int): id of the user
keyword (string): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.delete_share" > < code class = "name flex" >
< span > def < span class = "ident" > delete_share< / span > < / span > (< span > self, user_id, isin)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > deletes the share of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > isin< / code > < / strong > :  < code > string< / code > < / dt >
< dd > identifier of the share (standard is isin)< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def delete_share(self, user_id, isin):
" " " deletes the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.delete(self.db_adress + " /share" , json={" isin" : str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_all_users" > < code class = "name flex" >
< span > def < span class = "ident" > get_all_users< / span > < / span > (< span > self, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets all users< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > list< / code > < / dt >
< dd > list of users< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_all_users(self, max_retries=10):
" " " gets all users
max_retries (int): max retries for the request
list: list of users
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token}
req = s.get(self.db_adress + " /users" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_all_users(max_retries - 1)< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_user" > < code class = "name flex" >
< span > def < span class = "ident" > get_user< / span > < / span > (< span > self, user_id, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets the shares of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > json< / code > < / dt >
< dd > json of user infos< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
json: json of user infos
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + " /user" , headers=headers)
if (req.status_code == 200):
return req.json()[" data" ]
return self.get_user(user_id, max_retries - 1) # if request fails try again recursively< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_user_keywords" > < code class = "name flex" >
< span > def < span class = "ident" > get_user_keywords< / span > < / span > (< span > self, user_id, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets the keywords of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > list< / code > < / dt >
< dd > list of keywords< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_user_keywords(self, user_id, max_retries=10):
" " " gets the keywords of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of keywords
" " "
if max_retries < = 0:
return None
keywords = []
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /keywords" , headers=headers)
if (req.status_code == 200):
keywords_json = req.json()[" data" ]
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword[" keyword" ])
return keywords # will be empty if no keywords are set
return self.get_user_keywords(user_id, max_retries - 1)< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_user_portfolio" > < code class = "name flex" >
< span > def < span class = "ident" > get_user_portfolio< / span > < / span > (< span > self, user_id, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets the portfolio of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > dict< / code > < / dt >
< dd > dictionary of portfolio< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_user_portfolio(self, user_id, max_retries=10):
" " " gets the portfolio of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of portfolio
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /portfolio" , headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()[" data" ] # get the data of the JSON
return portfolio_dict
return self.get_user_portfolio(user_id, max_retries - 1)< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_user_shares" > < code class = "name flex" >
< span > def < span class = "ident" > get_user_shares< / span > < / span > (< span > self, user_id, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets the shares of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > list< / code > < / dt >
< dd > list of shares< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_user_shares(self, user_id, max_retries=10):
" " " gets the shares of the user
user_id (int): id of the user
max_retries (int): max retries for the request
list: list of shares
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /shares" , headers=headers)
if (req.status_code == 200):
shares_json = req.json()[" data" ]
shares = []
for share in shares_json:
shares.append(share[" isin" ]) # we only want the isin of the shares
return shares
return self.get_user_shares(user_id, max_retries - 1)< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.get_user_transactions" > < code class = "name flex" >
< span > def < span class = "ident" > get_user_transactions< / span > < / span > (< span > self, user_id, max_retries=10)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > gets the transactions of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > max_retries< / code > < / strong > :  < code > int< / code > < / dt >
< dd > max retries for the request< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > dict< / code > < / dt >
< dd > dictionary of transactions< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def get_user_transactions(self, user_id, max_retries=10):
" " " gets the transactions of the user
user_id (int): id of the user
max_retries (int): max retries for the request
dict: dictionary of transactions
" " "
if max_retries < = 0:
return None
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.get(self.db_adress + " /transactions" , headers=headers)
if req.status_code == 200:
transactions_dict = req.json()[" data" ]
return transactions_dict
return self.get_user_transactions(user_id, max_retries - 1)< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.reauthorize" > < code class = "name flex" >
< span > def < span class = "ident" > reauthorize< / span > < / span > (< span > self, email, password)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > set new credentials< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > email< / code > < / strong > :  < code > string< / code > < / dt >
< dd > email of the user< / dd >
< dt > < strong > < code > password< / code > < / strong > :  < code > string< / code > < / dt >
< dd > password of the user< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< p > token (string): new token or None if not 200< / p >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def reauthorize(self, email, password): # can be used if token expired
" " " set new credentials
email (string): email of the user
password (string): password of the user
token (string): new token or None if not 200
" " "
payload = {' email' : email, ' password' : password}
with r.Session() as s:
p = s.post(self.db_adress + " /user/login" , json=payload)
if p.status_code == 200:
self.token = p.json()[" data" ][' token' ]
return p.json()[" data" ][' token' ]
self.token = None
return None< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.set_admin" > < code class = "name flex" >
< span > def < span class = "ident" > set_admin< / span > < / span > (< span > self, email, is_admin)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > sets the admin of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > email< / code > < / strong > :  < code > string< / code > < / dt >
< dd > email of the user< / dd >
< dt > < strong > < code > is_admin< / code > < / strong > :  < code > bool< / code > < / dt >
< dd > "true" if user should be Admin, "false" if not< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def set_admin(self, email, is_admin):
" " " sets the admin of the user
email (string): email of the user
is_admin (bool): " true" if user should be Admin, " false" if not
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + " /user/setAdmin" , json={" admin" : is_admin, " email" : str(email)}, headers=headers)
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.set_cron_interval" > < code class = "name flex" >
< span > def < span class = "ident" > set_cron_interval< / span > < / span > (< span > self, user_id, cron_interval)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > sets the cron interval of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > cron_interval< / code > < / strong > :  < code > String< / code > < / dt >
< dd > Update interval in cron format => see < a href = "https://crontab.guru/" > https://crontab.guru/< / a > for formatting< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def set_cron_interval(self, user_id, cron_interval):
" " " sets the cron interval of the user
user_id (int): id of the user
cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
int: status code
" " "
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print(" Error: Invalid cron format" )
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.put(self.db_adress + " /user/setCron" , json={" cron" : str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.set_keyword" > < code class = "name flex" >
< span > def < span class = "ident" > set_keyword< / span > < / span > (< span > self, user_id, keyword)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > sets the keyword of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > keyword< / code > < / strong > :  < code > int< / code > < / dt >
< dd > keyword of the user< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def set_keyword(self, user_id, keyword):
" " " sets the keyword of the user
user_id (int): id of the user
keyword (int): keyword of the user
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /keyword" , json={" keyword" : keyword}, headers=headers)
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.set_share" > < code class = "name flex" >
< span > def < span class = "ident" > set_share< / span > < / span > (< span > self, user_id, isin, comment)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > sets the share of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > isin< / code > < / strong > :  < code > string< / code > < / dt >
< dd > identifier of the share (standard is isin)< / dd >
< dt > < strong > < code > comment< / code > < / strong > :  < code > string< / code > < / dt >
< dd > comment of the share< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def set_share(self, user_id, isin, comment):
" " " sets the share of the user
user_id (int): id of the user
isin (string): identifier of the share (standard is isin)
comment (string): comment of the share
int: status code
" " "
with r.Session() as s:
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
req = s.post(self.db_adress + " /share" , json={" comment" : comment, " isin" : isin},
headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. " Apple Inc."
return req.status_code< / code > < / pre >
< / details >
< / dd >
< dt id = "telegram_bot.api_handling.api_handler.API_Handler.set_transaction" > < code class = "name flex" >
< span > def < span class = "ident" > set_transaction< / span > < / span > (< span > self, user_id, comment, isin, count, price, time)< / span >
< / code > < / dt >
< dd >
< div class = "desc" > < p > sets the transaction of the user< / p >
< h2 id = "args" > Args< / h2 >
< dl >
< dt > < strong > < code > user_id< / code > < / strong > :  < code > int< / code > < / dt >
< dd > id of the user< / dd >
< dt > < strong > < code > comment< / code > < / strong > :  < code > string< / code > < / dt >
< dd > comment of the transaction< / dd >
< dt > < strong > < code > isin< / code > < / strong > :  < code > string< / code > < / dt >
< dd > isin of the transaction< / dd >
< dt > < strong > < code > count< / code > < / strong > :  < code > float< / code > < / dt >
< dd > count of the transaction< / dd >
< dt > < strong > < code > price< / code > < / strong > :  < code > float< / code > < / dt >
< dd > price of the transaction< / dd >
< dt > < strong > < code > time< / code > < / strong > :  < code > string< / code > < / dt >
< dd > time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"< / dd >
< / dl >
< h2 id = "returns" > Returns< / h2 >
< dl >
< dt > < code > int< / code > < / dt >
< dd > status code< / dd >
< / dl >
< h2 id = "raises" > Raises< / h2 >
< p > None< / p > < / div >
< details class = "source" >
< summary >
< span > Expand source code< / span >
< / summary >
< pre > < code class = "python" > def set_transaction(self, user_id, comment, isin, count, price, time):
" " " sets the transaction of the user
user_id (int): id of the user
comment (string): comment of the transaction
isin (string): isin of the transaction
count (float): count of the transaction
price (float): price of the transaction
time (string): time of the transaction formatted like e.g. " 2011-10-05T14:48:00.000Z"
int: status code
" " "
with r.Session() as s:
time = time[:-3] + " Z" # remove last character and add Z to make it a valid date for db
headers = {' Authorization' : ' Bearer ' + self.token + " :" + str(user_id)}
transaction = {" comment" : str(comment), " count" : float(count), " isin" : str(isin), " price" : float(price),
" time" : str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + " /transaction" , json=transaction, headers=headers)
return req.status_code< / code > < / pre >
< / details >
< / dd >
< / dl >
< / dd >
< / dl >
< / section >
< / article >
