Module telegram_bot.api_handling.api_handler

script for communicating with webservice to get data from database

Expand source code
"""
script for communicating with webservice to get data from database
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"

# side-dependencies: none
# Work in Progress

import os
import sys

import requests as r
from croniter import croniter  # used for checking cron formatting
from dotenv import load_dotenv

load_dotenv()  # loads environment vars


# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/

class API_Handler:
    """class for interacting with the api webservice
    
    Attributes:
        db_adress (string): adress of the database
        token (string): auth token for api

    Methods:
        reauthorize(email, password): set new credentials
        get_user_keywords(user_id): gets the keywords of the user
        set_keyword(user_id, keyword): sets the keyword of the user
        delete_keyword(user_id, keyword): deletes the keyword of the user
        get_user_shares(user_id): gets the shares of the user
        set_share(user_id, symbol): sets the share of the user
        delete_share(user_id, symbol): deletes the share of the user
        get_user_transactions(user_id): gets the transactions of the user
        set_transaction(user_id, transaction): sets the transaction of the user
        get_user_portfolio(user_id): gets the portfolio of the user
        set_portfolio(user_id, portfolio): sets the portfolio of the user
        delete_portfolio(user_id, portfolio): deletes the portfolio of the user
        set_cron_interval(user_id, interval): sets the cron interval of the user
        set_admin(email, is_admin): sets the admin status of the user with the given email
    """

    def __init__(self, db_adress, email, password):
        """initializes the API_Handler class

        Args:
            db_adress (string): adress of the database
            email (string): email of the user
            password (string): password of the user
        """
        self.db_adress = db_adress

        payload = {'email': email, 'password': password}  # credentials for admin account that has all permissions to get and set data (in this case bot account)
        with r.Session() as s:  # open session
            p = s.post(self.db_adress + "/user/login", json=payload)  # login to webservice
            if p.status_code == 200:
                self.token = p.json()["data"]['token']  # store token for further authentication of requests
            else:
                print("Error: " + str(p.status_code) + " invalid credentials")
                self.token = None

    def reauthorize(self, email, password):  # can be used if token expired
        """set new credentials

        Args:
            email (string): email of the user
            password (string): password of the user

        Returns:
            token (string): new token or None if not 200

        Raises:
            None
        """
        payload = {'email': email, 'password': password}
        with r.Session() as s:
            p = s.post(self.db_adress + "/user/login", json=payload)
            if p.status_code == 200:
                self.token = p.json()["data"]['token']
                return p.json()["data"]['token']
            else:
                self.token = None
                return None

    def get_user(self, user_id, max_retries=10):  # max retries are used recursively if the request fails
        """gets the shares of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            json: json of user infos
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}  # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
            req = s.get(self.db_adress + "/user", headers=headers)
            if (req.status_code == 200):
                return req.json()["data"]

            else:
                return self.get_user(user_id, max_retries - 1)  # if request fails try again recursively

    def get_all_users(self, max_retries=10):
        """gets all users

        Args:
            max_retries (int): max retries for the request

        Returns:
            list: list of users
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token}
            req = s.get(self.db_adress + "/users", headers=headers)
            if (req.status_code == 200):
                return req.json()["data"]

            else:
                return self.get_all_users(max_retries - 1)

    def get_user_keywords(self, user_id, max_retries=10):
        """gets the keywords of the user
        
        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            list: list of keywords
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        keywords = []
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/keywords", headers=headers)
            if (req.status_code == 200):
                keywords_json = req.json()["data"]
                for keyword in keywords_json:  # keywords_json is a list of dictionaries
                    keywords.append(keyword["keyword"])

                return keywords  # will be empty if no keywords are set

            else:
                return self.get_user_keywords(user_id, max_retries - 1)

    def set_keyword(self, user_id, keyword):
        """sets the keyword of the user

        Args:
            user_id (int): id of the user
            keyword (int): keyword of the user

        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

            return req.status_code

    def delete_keyword(self, user_id, keyword):
        """deletes the keyword of the user

        Args:
            user_id (int): id of the user
            keyword (string): keyword of the user

        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

            return req.status_code

    def get_user_shares(self, user_id, max_retries=10):
        """gets the shares of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            list: list of shares

        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/shares", headers=headers)
            if (req.status_code == 200):
                shares_json = req.json()["data"]
                shares = []
                for share in shares_json:
                    shares.append(share["isin"])  # we only want the isin of the shares

                return shares

            else:
                return self.get_user_shares(user_id, max_retries - 1)

    def set_share(self, user_id, isin, comment):
        """sets the share of the user

        Args:
            user_id (int): id of the user
            isin (string): identifier of the share (standard is isin)
            comment (string): comment of the share
        
        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
                         headers=headers)  # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
            return req.status_code

    def delete_share(self, user_id, isin):
        """deletes the share of the user

        Args:
            user_id (int): id of the user
            isin (string): identifier of the share (standard is isin)
        
        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers)  # to delete a share only the isin is needed because it is unique, shares are not transactions!
            return req.status_code

    def get_user_transactions(self, user_id, max_retries=10):
        """gets the transactions of the user
        
        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request
        
        Returns:
            dict: dictionary of transactions
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/transactions", headers=headers)

            if req.status_code == 200:
                transactions_dict = req.json()["data"]
                return transactions_dict
            else:
                return self.get_user_transactions(user_id, max_retries - 1)

    def set_transaction(self, user_id, comment, isin, count, price, time):
        """sets the transaction of the user

        Args:
            user_id (int): id of the user
            comment (string): comment of the transaction
            isin (string): isin of the transaction
            count (float): count of the transaction
            price (float): price of the transaction
            time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        with r.Session() as s:
            time = time[:-3] + "Z"  # remove last character and add Z to make it a valid date for db
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
                           "time": str(time)}  # set transaction as JSON with all the attributes needed according to Swagger docs
            req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
            return req.status_code

    def get_user_portfolio(self, user_id, max_retries=10):
        """gets the portfolio of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request
        
        Returns:
            dict: dictionary of portfolio
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/portfolio", headers=headers)  # get portfolio as JSON
            if req.status_code == 200:
                portfolio_dict = req.json()["data"]  # get the data of the JSON
                return portfolio_dict
            else:
                return self.get_user_portfolio(user_id, max_retries - 1)

    def set_cron_interval(self, user_id, cron_interval):
        """sets the cron interval of the user

        Args:
            user_id (int): id of the user
            cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        if not croniter.is_valid(cron_interval):  # check if cron_interval is in valid format
            print("Error: Invalid cron format")
            return -1  # return error code -1 if invalid cron format

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers)  # put not post (see swagger docs)
            return req.status_code

    def set_admin(self, email, is_admin):
        """sets the admin of the user

        Args:
            email (string): email of the user
            is_admin (bool): "true" if user should be Admin, "false" if not
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token}  # only bot token is needed, user is chosen by email
            req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
            return req.status_code


if __name__ == "__main__":  # editable, just for basic on the go testing of new functions

    print("This is a module for the telegram bot. It is not intended to be run directly.")
    handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))  # get creds from env
    print(handler.token)
    keywords = handler.get_user_keywords(user_id=1709356058)  # user_id here is currently mine (Linus)
    print(keywords)
    shares = handler.get_user_portfolio(user_id=1709356058)
    print("set cron with status: " + str(handler.set_cron_interval(user_id=1709356058, cron_interval="0 0 * * *")))
    user = handler.get_user(user_id=1709356058)
    print(user)
    all_users = handler.get_all_users()
    admin_status = handler.set_admin("test@test.com", "true")
    print(admin_status)
    print(all_users)
    print(shares)
    sys.exit(1)

Classes

class API_Handler (db_adress, email, password)

class for interacting with the api webservice

Attributes

db_adress : string
adress of the database
token : string
auth token for api

Methods

reauthorize(email, password): set new credentials get_user_keywords(user_id): gets the keywords of the user set_keyword(user_id, keyword): sets the keyword of the user delete_keyword(user_id, keyword): deletes the keyword of the user get_user_shares(user_id): gets the shares of the user set_share(user_id, symbol): sets the share of the user delete_share(user_id, symbol): deletes the share of the user get_user_transactions(user_id): gets the transactions of the user set_transaction(user_id, transaction): sets the transaction of the user get_user_portfolio(user_id): gets the portfolio of the user set_portfolio(user_id, portfolio): sets the portfolio of the user delete_portfolio(user_id, portfolio): deletes the portfolio of the user set_cron_interval(user_id, interval): sets the cron interval of the user set_admin(email, is_admin): sets the admin status of the user with the given email

initializes the API_Handler class

Args

db_adress : string
adress of the database
email : string
email of the user
password : string
password of the user
Expand source code
class API_Handler:
    """class for interacting with the api webservice
    
    Attributes:
        db_adress (string): adress of the database
        token (string): auth token for api

    Methods:
        reauthorize(email, password): set new credentials
        get_user_keywords(user_id): gets the keywords of the user
        set_keyword(user_id, keyword): sets the keyword of the user
        delete_keyword(user_id, keyword): deletes the keyword of the user
        get_user_shares(user_id): gets the shares of the user
        set_share(user_id, symbol): sets the share of the user
        delete_share(user_id, symbol): deletes the share of the user
        get_user_transactions(user_id): gets the transactions of the user
        set_transaction(user_id, transaction): sets the transaction of the user
        get_user_portfolio(user_id): gets the portfolio of the user
        set_portfolio(user_id, portfolio): sets the portfolio of the user
        delete_portfolio(user_id, portfolio): deletes the portfolio of the user
        set_cron_interval(user_id, interval): sets the cron interval of the user
        set_admin(email, is_admin): sets the admin status of the user with the given email
    """

    def __init__(self, db_adress, email, password):
        """initializes the API_Handler class

        Args:
            db_adress (string): adress of the database
            email (string): email of the user
            password (string): password of the user
        """
        self.db_adress = db_adress

        payload = {'email': email, 'password': password}  # credentials for admin account that has all permissions to get and set data (in this case bot account)
        with r.Session() as s:  # open session
            p = s.post(self.db_adress + "/user/login", json=payload)  # login to webservice
            if p.status_code == 200:
                self.token = p.json()["data"]['token']  # store token for further authentication of requests
            else:
                print("Error: " + str(p.status_code) + " invalid credentials")
                self.token = None

    def reauthorize(self, email, password):  # can be used if token expired
        """set new credentials

        Args:
            email (string): email of the user
            password (string): password of the user

        Returns:
            token (string): new token or None if not 200

        Raises:
            None
        """
        payload = {'email': email, 'password': password}
        with r.Session() as s:
            p = s.post(self.db_adress + "/user/login", json=payload)
            if p.status_code == 200:
                self.token = p.json()["data"]['token']
                return p.json()["data"]['token']
            else:
                self.token = None
                return None

    def get_user(self, user_id, max_retries=10):  # max retries are used recursively if the request fails
        """gets the shares of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            json: json of user infos
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}  # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
            req = s.get(self.db_adress + "/user", headers=headers)
            if (req.status_code == 200):
                return req.json()["data"]

            else:
                return self.get_user(user_id, max_retries - 1)  # if request fails try again recursively

    def get_all_users(self, max_retries=10):
        """gets all users

        Args:
            max_retries (int): max retries for the request

        Returns:
            list: list of users
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token}
            req = s.get(self.db_adress + "/users", headers=headers)
            if (req.status_code == 200):
                return req.json()["data"]

            else:
                return self.get_all_users(max_retries - 1)

    def get_user_keywords(self, user_id, max_retries=10):
        """gets the keywords of the user
        
        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            list: list of keywords
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        keywords = []
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/keywords", headers=headers)
            if (req.status_code == 200):
                keywords_json = req.json()["data"]
                for keyword in keywords_json:  # keywords_json is a list of dictionaries
                    keywords.append(keyword["keyword"])

                return keywords  # will be empty if no keywords are set

            else:
                return self.get_user_keywords(user_id, max_retries - 1)

    def set_keyword(self, user_id, keyword):
        """sets the keyword of the user

        Args:
            user_id (int): id of the user
            keyword (int): keyword of the user

        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

            return req.status_code

    def delete_keyword(self, user_id, keyword):
        """deletes the keyword of the user

        Args:
            user_id (int): id of the user
            keyword (string): keyword of the user

        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

            return req.status_code

    def get_user_shares(self, user_id, max_retries=10):
        """gets the shares of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request

        Returns:
            list: list of shares

        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/shares", headers=headers)
            if (req.status_code == 200):
                shares_json = req.json()["data"]
                shares = []
                for share in shares_json:
                    shares.append(share["isin"])  # we only want the isin of the shares

                return shares

            else:
                return self.get_user_shares(user_id, max_retries - 1)

    def set_share(self, user_id, isin, comment):
        """sets the share of the user

        Args:
            user_id (int): id of the user
            isin (string): identifier of the share (standard is isin)
            comment (string): comment of the share
        
        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
                         headers=headers)  # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
            return req.status_code

    def delete_share(self, user_id, isin):
        """deletes the share of the user

        Args:
            user_id (int): id of the user
            isin (string): identifier of the share (standard is isin)
        
        Returns:
            int: status code

        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers)  # to delete a share only the isin is needed because it is unique, shares are not transactions!
            return req.status_code

    def get_user_transactions(self, user_id, max_retries=10):
        """gets the transactions of the user
        
        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request
        
        Returns:
            dict: dictionary of transactions
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/transactions", headers=headers)

            if req.status_code == 200:
                transactions_dict = req.json()["data"]
                return transactions_dict
            else:
                return self.get_user_transactions(user_id, max_retries - 1)

    def set_transaction(self, user_id, comment, isin, count, price, time):
        """sets the transaction of the user

        Args:
            user_id (int): id of the user
            comment (string): comment of the transaction
            isin (string): isin of the transaction
            count (float): count of the transaction
            price (float): price of the transaction
            time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        with r.Session() as s:
            time = time[:-3] + "Z"  # remove last character and add Z to make it a valid date for db
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
                           "time": str(time)}  # set transaction as JSON with all the attributes needed according to Swagger docs
            req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
            return req.status_code

    def get_user_portfolio(self, user_id, max_retries=10):
        """gets the portfolio of the user

        Args:
            user_id (int): id of the user
            max_retries (int): max retries for the request
        
        Returns:
            dict: dictionary of portfolio
        
        Raises:
            None
        """
        if max_retries <= 0:
            return None

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.get(self.db_adress + "/portfolio", headers=headers)  # get portfolio as JSON
            if req.status_code == 200:
                portfolio_dict = req.json()["data"]  # get the data of the JSON
                return portfolio_dict
            else:
                return self.get_user_portfolio(user_id, max_retries - 1)

    def set_cron_interval(self, user_id, cron_interval):
        """sets the cron interval of the user

        Args:
            user_id (int): id of the user
            cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        if not croniter.is_valid(cron_interval):  # check if cron_interval is in valid format
            print("Error: Invalid cron format")
            return -1  # return error code -1 if invalid cron format

        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
            req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers)  # put not post (see swagger docs)
            return req.status_code

    def set_admin(self, email, is_admin):
        """sets the admin of the user

        Args:
            email (string): email of the user
            is_admin (bool): "true" if user should be Admin, "false" if not
        
        Returns:
            int: status code
        
        Raises:
            None
        """
        with r.Session() as s:
            headers = {'Authorization': 'Bearer ' + self.token}  # only bot token is needed, user is chosen by email
            req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
            return req.status_code

Methods

def delete_keyword(self, user_id, keyword)

deletes the keyword of the user

Args

user_id : int
id of the user
keyword : string
keyword of the user

Returns

int
status code

Raises

None

Expand source code
def delete_keyword(self, user_id, keyword):
    """deletes the keyword of the user

    Args:
        user_id (int): id of the user
        keyword (string): keyword of the user

    Returns:
        int: status code

    Raises:
        None
    """
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.delete(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

        return req.status_code
def delete_share(self, user_id, isin)

deletes the share of the user

Args

user_id : int
id of the user
isin : string
identifier of the share (standard is isin)

Returns

int
status code

Raises

None

Expand source code
def delete_share(self, user_id, isin):
    """deletes the share of the user

    Args:
        user_id (int): id of the user
        isin (string): identifier of the share (standard is isin)
    
    Returns:
        int: status code

    Raises:
        None
    """
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers)  # to delete a share only the isin is needed because it is unique, shares are not transactions!
        return req.status_code
def get_all_users(self, max_retries=10)

gets all users

Args

max_retries : int
max retries for the request

Returns

list
list of users

Raises

None

Expand source code
def get_all_users(self, max_retries=10):
    """gets all users

    Args:
        max_retries (int): max retries for the request

    Returns:
        list: list of users
    
    Raises:
        None
    """
    if max_retries <= 0:
        return None

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token}
        req = s.get(self.db_adress + "/users", headers=headers)
        if (req.status_code == 200):
            return req.json()["data"]

        else:
            return self.get_all_users(max_retries - 1)
def get_user(self, user_id, max_retries=10)

gets the shares of the user

Args

user_id : int
id of the user
max_retries : int
max retries for the request

Returns

json
json of user infos

Raises

None

Expand source code
def get_user(self, user_id, max_retries=10):  # max retries are used recursively if the request fails
    """gets the shares of the user

    Args:
        user_id (int): id of the user
        max_retries (int): max retries for the request

    Returns:
        json: json of user infos
    
    Raises:
        None
    """
    if max_retries <= 0:
        return None

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}  # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
        req = s.get(self.db_adress + "/user", headers=headers)
        if (req.status_code == 200):
            return req.json()["data"]

        else:
            return self.get_user(user_id, max_retries - 1)  # if request fails try again recursively
def get_user_keywords(self, user_id, max_retries=10)

gets the keywords of the user

Args

user_id : int
id of the user
max_retries : int
max retries for the request

Returns

list
list of keywords

Raises

None

Expand source code
def get_user_keywords(self, user_id, max_retries=10):
    """gets the keywords of the user
    
    Args:
        user_id (int): id of the user
        max_retries (int): max retries for the request

    Returns:
        list: list of keywords
    
    Raises:
        None
    """
    if max_retries <= 0:
        return None

    keywords = []
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.get(self.db_adress + "/keywords", headers=headers)
        if (req.status_code == 200):
            keywords_json = req.json()["data"]
            for keyword in keywords_json:  # keywords_json is a list of dictionaries
                keywords.append(keyword["keyword"])

            return keywords  # will be empty if no keywords are set

        else:
            return self.get_user_keywords(user_id, max_retries - 1)
def get_user_portfolio(self, user_id, max_retries=10)

gets the portfolio of the user

Args

user_id : int
id of the user
max_retries : int
max retries for the request

Returns

dict
dictionary of portfolio

Raises

None

Expand source code
def get_user_portfolio(self, user_id, max_retries=10):
    """gets the portfolio of the user

    Args:
        user_id (int): id of the user
        max_retries (int): max retries for the request
    
    Returns:
        dict: dictionary of portfolio
    
    Raises:
        None
    """
    if max_retries <= 0:
        return None

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.get(self.db_adress + "/portfolio", headers=headers)  # get portfolio as JSON
        if req.status_code == 200:
            portfolio_dict = req.json()["data"]  # get the data of the JSON
            return portfolio_dict
        else:
            return self.get_user_portfolio(user_id, max_retries - 1)
def get_user_shares(self, user_id, max_retries=10)

gets the shares of the user

Args

user_id : int
id of the user
max_retries : int
max retries for the request

Returns

list
list of shares

Raises

None

Expand source code
def get_user_shares(self, user_id, max_retries=10):
    """gets the shares of the user

    Args:
        user_id (int): id of the user
        max_retries (int): max retries for the request

    Returns:
        list: list of shares

    Raises:
        None
    """
    if max_retries <= 0:
        return None

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.get(self.db_adress + "/shares", headers=headers)
        if (req.status_code == 200):
            shares_json = req.json()["data"]
            shares = []
            for share in shares_json:
                shares.append(share["isin"])  # we only want the isin of the shares

            return shares

        else:
            return self.get_user_shares(user_id, max_retries - 1)
def get_user_transactions(self, user_id, max_retries=10)

gets the transactions of the user

Args

user_id : int
id of the user
max_retries : int
max retries for the request

Returns

dict
dictionary of transactions

Raises

None

Expand source code
def get_user_transactions(self, user_id, max_retries=10):
    """gets the transactions of the user
    
    Args:
        user_id (int): id of the user
        max_retries (int): max retries for the request
    
    Returns:
        dict: dictionary of transactions
    
    Raises:
        None
    """
    if max_retries <= 0:
        return None

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.get(self.db_adress + "/transactions", headers=headers)

        if req.status_code == 200:
            transactions_dict = req.json()["data"]
            return transactions_dict
        else:
            return self.get_user_transactions(user_id, max_retries - 1)
def reauthorize(self, email, password)

set new credentials

Args

email : string
email of the user
password : string
password of the user

Returns

token (string): new token or None if not 200

Raises

None

Expand source code
def reauthorize(self, email, password):  # can be used if token expired
    """set new credentials

    Args:
        email (string): email of the user
        password (string): password of the user

    Returns:
        token (string): new token or None if not 200

    Raises:
        None
    """
    payload = {'email': email, 'password': password}
    with r.Session() as s:
        p = s.post(self.db_adress + "/user/login", json=payload)
        if p.status_code == 200:
            self.token = p.json()["data"]['token']
            return p.json()["data"]['token']
        else:
            self.token = None
            return None
def set_admin(self, email, is_admin)

sets the admin of the user

Args

email : string
email of the user
is_admin : bool
"true" if user should be Admin, "false" if not

Returns

int
status code

Raises

None

Expand source code
def set_admin(self, email, is_admin):
    """sets the admin of the user

    Args:
        email (string): email of the user
        is_admin (bool): "true" if user should be Admin, "false" if not
    
    Returns:
        int: status code
    
    Raises:
        None
    """
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token}  # only bot token is needed, user is chosen by email
        req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
        return req.status_code
def set_cron_interval(self, user_id, cron_interval)

sets the cron interval of the user

Args

user_id : int
id of the user
cron_interval : String
Update interval in cron format => see https://crontab.guru/ for formatting

Returns

int
status code

Raises

None

Expand source code
def set_cron_interval(self, user_id, cron_interval):
    """sets the cron interval of the user

    Args:
        user_id (int): id of the user
        cron_interval (String): Update interval in cron format => see https://crontab.guru/ for formatting
    
    Returns:
        int: status code
    
    Raises:
        None
    """
    if not croniter.is_valid(cron_interval):  # check if cron_interval is in valid format
        print("Error: Invalid cron format")
        return -1  # return error code -1 if invalid cron format

    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers)  # put not post (see swagger docs)
        return req.status_code
def set_keyword(self, user_id, keyword)

sets the keyword of the user

Args

user_id : int
id of the user
keyword : int
keyword of the user

Returns

int
status code

Raises

None

Expand source code
def set_keyword(self, user_id, keyword):
    """sets the keyword of the user

    Args:
        user_id (int): id of the user
        keyword (int): keyword of the user

    Returns:
        int: status code

    Raises:
        None
    """
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.post(self.db_adress + "/keyword", json={"keyword": keyword}, headers=headers)

        return req.status_code
def set_share(self, user_id, isin, comment)

sets the share of the user

Args

user_id : int
id of the user
isin : string
identifier of the share (standard is isin)
comment : string
comment of the share

Returns

int
status code

Raises

None

Expand source code
def set_share(self, user_id, isin, comment):
    """sets the share of the user

    Args:
        user_id (int): id of the user
        isin (string): identifier of the share (standard is isin)
        comment (string): comment of the share
    
    Returns:
        int: status code

    Raises:
        None
    """
    with r.Session() as s:
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
                     headers=headers)  # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
        return req.status_code
def set_transaction(self, user_id, comment, isin, count, price, time)

sets the transaction of the user

Args

user_id : int
id of the user
comment : string
comment of the transaction
isin : string
isin of the transaction
count : float
count of the transaction
price : float
price of the transaction
time : string
time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"

Returns

int
status code

Raises

None

Expand source code
def set_transaction(self, user_id, comment, isin, count, price, time):
    """sets the transaction of the user

    Args:
        user_id (int): id of the user
        comment (string): comment of the transaction
        isin (string): isin of the transaction
        count (float): count of the transaction
        price (float): price of the transaction
        time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
    
    Returns:
        int: status code
    
    Raises:
        None
    """
    with r.Session() as s:
        time = time[:-3] + "Z"  # remove last character and add Z to make it a valid date for db
        headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
        transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
                       "time": str(time)}  # set transaction as JSON with all the attributes needed according to Swagger docs
        req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
        return req.status_code