Module telegram_bot.bot_updates

script for regularly sending updates on shares and news based on user interval

Expand source code
"""
script for regularly sending updates on shares and news based on user interval
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"

import os
import sys
import time

from apscheduler.schedulers.background import BackgroundScheduler # scheduler for cron
from dotenv import load_dotenv

import telegram_bot.helper_functions as hf
import telegram_bot.news.news_fetcher as news_fetcher
import telegram_bot.shares.share_fetcher as share_fetcher
from telegram_bot.api_handling.api_handler import API_Handler
from telegram_bot.bot import bot

'''
* * * * * code
┬ ┬ ┬ ┬ ┬
│ │ │ │ │
│ │ │ │ └──── weekday (0->Monday, 7->Sunday)
│ │ │ └────── Month (1-12)
│ │ └──────── Day (1-31)
│ └────────── Hour (0-23)
└──────────── Minute (0-59)

example 0 8 * * * -> daily update at 8am
'''
user_ids = []
user_crontab = []

load_dotenv(dotenv_path='.env')


def start_updater():
    """ starting function for regularly sending updates
    :raises: none

    :rtype: none
    """

    print("Bot updates started")

    my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))

    update_crontab(my_handler)


def update_crontab(p_my_handler):
    """ Updating crontab lists every hour
    :type pCurrent_Time: time when starting crontab update
    :param pCurrent_Time: datetime

    :raises: none

    :rtype: none
    """

    global user_crontab
    global user_ids
    
    all_users = p_my_handler.get_all_users() # get all users so crontabs can update for everybody
    
    user_ids = []
    user_crontab = []

    for element in all_users:
        if element["cron"] != '' and element["telegram_user_id"] != '': # check if both values are existing so I have consistent data
            try:
                user_ids.append(int(element["telegram_user_id"]))
                try:
                    user_crontab.append(str(element["cron"]))
                except: 
                    user_ids.pop() # if something goes wrong with cron I have to delete matching user id 
            except: continue
                
    
    print(user_ids)

    update_based_on_crontab(user_ids, user_crontab, p_my_handler)
    
    update_crontab(p_my_handler) # restart the update after time sleep
    
    
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
    """ Check all the crontab codes and add jobs to start in time
    :type p_user_ids: array
    :param p_user_ids: user id array of all users

    :type p_user_crontab: array
    :param p_user_crontab: crontabs for all users equivalent to the user array

    :type p_my_handler: Api_Handler
    :param p_my_handler: get database stuff

    :raises: none

    :rtype: none
    """
    
    my_scheduler = BackgroundScheduler() # schedule sends based on cron
    
    for i in range(len(p_user_ids)):
        cron_split = p_user_crontab[i].split(" ") # split it up to use in scheduler
        print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2])
        my_scheduler.add_job(update_for_user, 'cron', day_of_week=cron_split[4], hour=cron_split[1], minute=cron_split[0], month=cron_split[3], day=cron_split[2], args=(p_user_ids[i], p_my_handler))

    my_scheduler.start()
    
    time.sleep( 600 ) # scheduler runs in background and I wait 10mins
    my_scheduler.shutdown() # after this the new crontabs will be loaded
                    
def update_for_user(p_user_id, p_my_handler):
    """ Pull shares and send updates for specific user id
    :type p_user_id: integer
    :param p_user_id: user id of user that shall receive update

    :type p_my_handler: Api_Handler
    :param p_my_handler: handle the api and pull from database

    :raises: none

    :rtype: none
    """
    share_symbols = []
    share_amounts = []
    
    my_portfolio = p_my_handler.get_user_portfolio(p_user_id) # get all existing shares for user
    
    for element in my_portfolio:
        if element["count"] != '' and element["isin"] != '':
            print(element["count"], element["isin"])
            share_symbols.append(element["isin"])
            share_amounts.append(element["count"])

    my_user = p_my_handler.get_user(p_user_id)
    send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id)
    
    shares = p_my_handler.get_user_shares(p_user_id) # all interest shares
    
    if len(share_symbols) != 0: # iterate through all shares
        for i in range(len(share_symbols)):
            my_price = share_fetcher.get_share_price_no_currency(share_symbols[i])
            my_update_message = f'{share_fetcher.get_share_information_markdown(share_symbols[i])}\ncount: {hf.make_markdown_proof(share_amounts[i])}\nTotal: {hf.make_markdown_proof(round(float(my_price) * float(share_amounts[i]), 2))} EUR'
            bot.send_message(chat_id=p_user_id, text=my_update_message, parse_mode="MARKDOWNV2")
    else:
        send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)

    if len(shares) != 0:  # Send updates on watchlist shares if existing
        send_to_user("Your watchlist shares:", pUser_id=p_user_id)
        for element in shares:
            send_to_user(share_fetcher.get_share_information_markdown(element), pUser_id=p_user_id, md_mode=True)

    keywords = p_my_handler.get_user_keywords(p_user_id)  # get keywords as array

    if (keywords):  # if keywords exist and array is not empty
        send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id)
        for keyword in keywords:
            news = news_fetcher.get_top_news_by_keyword(keyword)["articles"]
            keyword = hf.make_markdown_proof(keyword)

            if not news: # if empty news array
                send_to_user(f"No news found for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
            
            elif news == None: # if news is none
                send_to_user(f"Server error for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
            else:
                news_formatted = news_fetcher.format_article(news[0])  # format for message, only use the most popular article
                send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True)  # send news with related keyword in Markdown


def send_to_user(pText, pUser_id, md_mode=False):
    """ Send message to user
    :type pText: string
    :param pText: Text to send to user

    :type pUser_id: int
    :param pUser_id: user to send to. per default me (Florian Kellermann)

    :type md_mode: boolean
    :param md_mode: if true, parse_mode is markdown

    :raises: none

    :rtype: none
    """
    if md_mode:
        bot.send_message(chat_id=pUser_id, text=pText, parse_mode="MARKDOWNV2")
    else:
        bot.send_message(chat_id=pUser_id, text=pText)


if __name__ == "__main__":
    try:
        start_updater()
        sys.exit(-1)
    except KeyboardInterrupt:
        print("Ending")
        sys.exit(-1)

Functions

def send_to_user(pText, pUser_id, md_mode=False)

Send message to user :type pText: string :param pText: Text to send to user

:type pUser_id: int :param pUser_id: user to send to. per default me (Florian Kellermann)

:type md_mode: boolean :param md_mode: if true, parse_mode is markdown

:raises: none

:rtype: none

Expand source code
def send_to_user(pText, pUser_id, md_mode=False):
    """ Send message to user
    :type pText: string
    :param pText: Text to send to user

    :type pUser_id: int
    :param pUser_id: user to send to. per default me (Florian Kellermann)

    :type md_mode: boolean
    :param md_mode: if true, parse_mode is markdown

    :raises: none

    :rtype: none
    """
    if md_mode:
        bot.send_message(chat_id=pUser_id, text=pText, parse_mode="MARKDOWNV2")
    else:
        bot.send_message(chat_id=pUser_id, text=pText)
def start_updater()

starting function for regularly sending updates :raises: none

:rtype: none

Expand source code
def start_updater():
    """ starting function for regularly sending updates
    :raises: none

    :rtype: none
    """

    print("Bot updates started")

    my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))

    update_crontab(my_handler)
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler)

Check all the crontab codes and add jobs to start in time :type p_user_ids: array :param p_user_ids: user id array of all users

:type p_user_crontab: array :param p_user_crontab: crontabs for all users equivalent to the user array

:type p_my_handler: Api_Handler :param p_my_handler: get database stuff

:raises: none

:rtype: none

Expand source code
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
    """ Check all the crontab codes and add jobs to start in time
    :type p_user_ids: array
    :param p_user_ids: user id array of all users

    :type p_user_crontab: array
    :param p_user_crontab: crontabs for all users equivalent to the user array

    :type p_my_handler: Api_Handler
    :param p_my_handler: get database stuff

    :raises: none

    :rtype: none
    """
    
    my_scheduler = BackgroundScheduler() # schedule sends based on cron
    
    for i in range(len(p_user_ids)):
        cron_split = p_user_crontab[i].split(" ") # split it up to use in scheduler
        print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2])
        my_scheduler.add_job(update_for_user, 'cron', day_of_week=cron_split[4], hour=cron_split[1], minute=cron_split[0], month=cron_split[3], day=cron_split[2], args=(p_user_ids[i], p_my_handler))

    my_scheduler.start()
    
    time.sleep( 600 ) # scheduler runs in background and I wait 10mins
    my_scheduler.shutdown() # after this the new crontabs will be loaded
def update_crontab(p_my_handler)

Updating crontab lists every hour :type pCurrent_Time: time when starting crontab update :param pCurrent_Time: datetime

:raises: none

:rtype: none

Expand source code
def update_crontab(p_my_handler):
    """ Updating crontab lists every hour
    :type pCurrent_Time: time when starting crontab update
    :param pCurrent_Time: datetime

    :raises: none

    :rtype: none
    """

    global user_crontab
    global user_ids
    
    all_users = p_my_handler.get_all_users() # get all users so crontabs can update for everybody
    
    user_ids = []
    user_crontab = []

    for element in all_users:
        if element["cron"] != '' and element["telegram_user_id"] != '': # check if both values are existing so I have consistent data
            try:
                user_ids.append(int(element["telegram_user_id"]))
                try:
                    user_crontab.append(str(element["cron"]))
                except: 
                    user_ids.pop() # if something goes wrong with cron I have to delete matching user id 
            except: continue
                
    
    print(user_ids)

    update_based_on_crontab(user_ids, user_crontab, p_my_handler)
    
    update_crontab(p_my_handler) # restart the update after time sleep
def update_for_user(p_user_id, p_my_handler)

Pull shares and send updates for specific user id :type p_user_id: integer :param p_user_id: user id of user that shall receive update

:type p_my_handler: Api_Handler :param p_my_handler: handle the api and pull from database

:raises: none

:rtype: none

Expand source code
def update_for_user(p_user_id, p_my_handler):
    """ Pull shares and send updates for specific user id
    :type p_user_id: integer
    :param p_user_id: user id of user that shall receive update

    :type p_my_handler: Api_Handler
    :param p_my_handler: handle the api and pull from database

    :raises: none

    :rtype: none
    """
    share_symbols = []
    share_amounts = []
    
    my_portfolio = p_my_handler.get_user_portfolio(p_user_id) # get all existing shares for user
    
    for element in my_portfolio:
        if element["count"] != '' and element["isin"] != '':
            print(element["count"], element["isin"])
            share_symbols.append(element["isin"])
            share_amounts.append(element["count"])

    my_user = p_my_handler.get_user(p_user_id)
    send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id)
    
    shares = p_my_handler.get_user_shares(p_user_id) # all interest shares
    
    if len(share_symbols) != 0: # iterate through all shares
        for i in range(len(share_symbols)):
            my_price = share_fetcher.get_share_price_no_currency(share_symbols[i])
            my_update_message = f'{share_fetcher.get_share_information_markdown(share_symbols[i])}\ncount: {hf.make_markdown_proof(share_amounts[i])}\nTotal: {hf.make_markdown_proof(round(float(my_price) * float(share_amounts[i]), 2))} EUR'
            bot.send_message(chat_id=p_user_id, text=my_update_message, parse_mode="MARKDOWNV2")
    else:
        send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)

    if len(shares) != 0:  # Send updates on watchlist shares if existing
        send_to_user("Your watchlist shares:", pUser_id=p_user_id)
        for element in shares:
            send_to_user(share_fetcher.get_share_information_markdown(element), pUser_id=p_user_id, md_mode=True)

    keywords = p_my_handler.get_user_keywords(p_user_id)  # get keywords as array

    if (keywords):  # if keywords exist and array is not empty
        send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id)
        for keyword in keywords:
            news = news_fetcher.get_top_news_by_keyword(keyword)["articles"]
            keyword = hf.make_markdown_proof(keyword)

            if not news: # if empty news array
                send_to_user(f"No news found for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
            
            elif news == None: # if news is none
                send_to_user(f"Server error for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
            else:
                news_formatted = news_fetcher.format_article(news[0])  # format for message, only use the most popular article
                send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True)  # send news with related keyword in Markdown