Module telegram_bot.bot_updates
script for regularly sending updates on shares and news based on user interval
Expand source code
"""
script for regularly sending updates on shares and news based on user interval
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"
import os
import sys
import time
from apscheduler.schedulers.background import BackgroundScheduler # scheduler for cron
from dotenv import load_dotenv
import telegram_bot.helper_functions as hf
import telegram_bot.news.news_fetcher as news_fetcher
import telegram_bot.shares.share_fetcher as share_fetcher
from telegram_bot.api_handling.api_handler import API_Handler
from telegram_bot.bot import bot
'''
* * * * * code
┬ ┬ ┬ ┬ ┬
│ │ │ │ │
│ │ │ │ └──── weekday (0->Monday, 7->Sunday)
│ │ │ └────── Month (1-12)
│ │ └──────── Day (1-31)
│ └────────── Hour (0-23)
└──────────── Minute (0-59)
example 0 8 * * * -> daily update at 8am
'''
user_ids = []
user_crontab = []
load_dotenv(dotenv_path='.env')
def start_updater():
""" starting function for regularly sending updates
:raises: none
:rtype: none
"""
print("Bot updates started")
my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))
update_crontab(my_handler)
def update_crontab(p_my_handler):
""" Updating crontab lists every hour
:type pCurrent_Time: time when starting crontab update
:param pCurrent_Time: datetime
:raises: none
:rtype: none
"""
global user_crontab
global user_ids
all_users = p_my_handler.get_all_users() # get all users so crontabs can update for everybody
user_ids = []
user_crontab = []
for element in all_users:
if element["cron"] != '' and element["telegram_user_id"] != '': # check if both values are existing so I have consistent data
try:
user_ids.append(int(element["telegram_user_id"]))
try:
user_crontab.append(str(element["cron"]))
except:
user_ids.pop() # if something goes wrong with cron I have to delete matching user id
except: continue
print(user_ids)
update_based_on_crontab(user_ids, user_crontab, p_my_handler)
update_crontab(p_my_handler) # restart the update after time sleep
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
""" Check all the crontab codes and add jobs to start in time
:type p_user_ids: array
:param p_user_ids: user id array of all users
:type p_user_crontab: array
:param p_user_crontab: crontabs for all users equivalent to the user array
:type p_my_handler: Api_Handler
:param p_my_handler: get database stuff
:raises: none
:rtype: none
"""
my_scheduler = BackgroundScheduler() # schedule sends based on cron
for i in range(len(p_user_ids)):
cron_split = p_user_crontab[i].split(" ") # split it up to use in scheduler
print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2])
my_scheduler.add_job(update_for_user, 'cron', day_of_week=cron_split[4], hour=cron_split[1], minute=cron_split[0], month=cron_split[3], day=cron_split[2], args=(p_user_ids[i], p_my_handler))
my_scheduler.start()
time.sleep( 600 ) # scheduler runs in background and I wait 10mins
my_scheduler.shutdown() # after this the new crontabs will be loaded
def update_for_user(p_user_id, p_my_handler):
""" Pull shares and send updates for specific user id
:type p_user_id: integer
:param p_user_id: user id of user that shall receive update
:type p_my_handler: Api_Handler
:param p_my_handler: handle the api and pull from database
:raises: none
:rtype: none
"""
share_symbols = []
share_amounts = []
my_portfolio = p_my_handler.get_user_portfolio(p_user_id) # get all existing shares for user
for element in my_portfolio:
if element["count"] != '' and element["isin"] != '':
print(element["count"], element["isin"])
share_symbols.append(element["isin"])
share_amounts.append(element["count"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id)
shares = p_my_handler.get_user_shares(p_user_id) # all interest shares
if len(share_symbols) != 0: # iterate through all shares
for i in range(len(share_symbols)):
my_price = share_fetcher.get_share_price_no_currency(share_symbols[i])
my_update_message = f'{share_fetcher.get_share_information_markdown(share_symbols[i])}\ncount: {hf.make_markdown_proof(share_amounts[i])}\nTotal: {hf.make_markdown_proof(round(float(my_price) * float(share_amounts[i]), 2))} EUR'
bot.send_message(chat_id=p_user_id, text=my_update_message, parse_mode="MARKDOWNV2")
else:
send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)
if len(shares) != 0: # Send updates on watchlist shares if existing
send_to_user("Your watchlist shares:", pUser_id=p_user_id)
for element in shares:
send_to_user(share_fetcher.get_share_information_markdown(element), pUser_id=p_user_id, md_mode=True)
keywords = p_my_handler.get_user_keywords(p_user_id) # get keywords as array
if (keywords): # if keywords exist and array is not empty
send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id)
for keyword in keywords:
news = news_fetcher.get_top_news_by_keyword(keyword)["articles"]
keyword = hf.make_markdown_proof(keyword)
if not news: # if empty news array
send_to_user(f"No news found for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
elif news == None: # if news is none
send_to_user(f"Server error for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True)
else:
news_formatted = news_fetcher.format_article(news[0]) # format for message, only use the most popular article
send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True) # send news with related keyword in Markdown
def send_to_user(pText, pUser_id, md_mode=False):
""" Send message to user
:type pText: string
:param pText: Text to send to user
:type pUser_id: int
:param pUser_id: user to send to. per default me (Florian Kellermann)
:type md_mode: boolean
:param md_mode: if true, parse_mode is markdown
:raises: none
:rtype: none
"""
if md_mode:
bot.send_message(chat_id=pUser_id, text=pText, parse_mode="MARKDOWNV2")
else:
bot.send_message(chat_id=pUser_id, text=pText)
if __name__ == "__main__":
try:
start_updater()
sys.exit(-1)
except KeyboardInterrupt:
print("Ending")
sys.exit(-1)
Functions
def send_to_user(pText, pUser_id, md_mode=False)
-
Send message to user :type pText: string :param pText: Text to send to user
:type pUser_id: int :param pUser_id: user to send to. per default me (Florian Kellermann)
:type md_mode: boolean :param md_mode: if true, parse_mode is markdown
:raises: none
:rtype: none
Expand source code
def send_to_user(pText, pUser_id, md_mode=False): """ Send message to user :type pText: string :param pText: Text to send to user :type pUser_id: int :param pUser_id: user to send to. per default me (Florian Kellermann) :type md_mode: boolean :param md_mode: if true, parse_mode is markdown :raises: none :rtype: none """ if md_mode: bot.send_message(chat_id=pUser_id, text=pText, parse_mode="MARKDOWNV2") else: bot.send_message(chat_id=pUser_id, text=pText)
def start_updater()
-
starting function for regularly sending updates :raises: none
:rtype: none
Expand source code
def start_updater(): """ starting function for regularly sending updates :raises: none :rtype: none """ print("Bot updates started") my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) update_crontab(my_handler)
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler)
-
Check all the crontab codes and add jobs to start in time :type p_user_ids: array :param p_user_ids: user id array of all users
:type p_user_crontab: array :param p_user_crontab: crontabs for all users equivalent to the user array
:type p_my_handler: Api_Handler :param p_my_handler: get database stuff
:raises: none
:rtype: none
Expand source code
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler): """ Check all the crontab codes and add jobs to start in time :type p_user_ids: array :param p_user_ids: user id array of all users :type p_user_crontab: array :param p_user_crontab: crontabs for all users equivalent to the user array :type p_my_handler: Api_Handler :param p_my_handler: get database stuff :raises: none :rtype: none """ my_scheduler = BackgroundScheduler() # schedule sends based on cron for i in range(len(p_user_ids)): cron_split = p_user_crontab[i].split(" ") # split it up to use in scheduler print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2]) my_scheduler.add_job(update_for_user, 'cron', day_of_week=cron_split[4], hour=cron_split[1], minute=cron_split[0], month=cron_split[3], day=cron_split[2], args=(p_user_ids[i], p_my_handler)) my_scheduler.start() time.sleep( 600 ) # scheduler runs in background and I wait 10mins my_scheduler.shutdown() # after this the new crontabs will be loaded
def update_crontab(p_my_handler)
-
Updating crontab lists every hour :type pCurrent_Time: time when starting crontab update :param pCurrent_Time: datetime
:raises: none
:rtype: none
Expand source code
def update_crontab(p_my_handler): """ Updating crontab lists every hour :type pCurrent_Time: time when starting crontab update :param pCurrent_Time: datetime :raises: none :rtype: none """ global user_crontab global user_ids all_users = p_my_handler.get_all_users() # get all users so crontabs can update for everybody user_ids = [] user_crontab = [] for element in all_users: if element["cron"] != '' and element["telegram_user_id"] != '': # check if both values are existing so I have consistent data try: user_ids.append(int(element["telegram_user_id"])) try: user_crontab.append(str(element["cron"])) except: user_ids.pop() # if something goes wrong with cron I have to delete matching user id except: continue print(user_ids) update_based_on_crontab(user_ids, user_crontab, p_my_handler) update_crontab(p_my_handler) # restart the update after time sleep
def update_for_user(p_user_id, p_my_handler)
-
Pull shares and send updates for specific user id :type p_user_id: integer :param p_user_id: user id of user that shall receive update
:type p_my_handler: Api_Handler :param p_my_handler: handle the api and pull from database
:raises: none
:rtype: none
Expand source code
def update_for_user(p_user_id, p_my_handler): """ Pull shares and send updates for specific user id :type p_user_id: integer :param p_user_id: user id of user that shall receive update :type p_my_handler: Api_Handler :param p_my_handler: handle the api and pull from database :raises: none :rtype: none """ share_symbols = [] share_amounts = [] my_portfolio = p_my_handler.get_user_portfolio(p_user_id) # get all existing shares for user for element in my_portfolio: if element["count"] != '' and element["isin"] != '': print(element["count"], element["isin"]) share_symbols.append(element["isin"]) share_amounts.append(element["count"]) my_user = p_my_handler.get_user(p_user_id) send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id) shares = p_my_handler.get_user_shares(p_user_id) # all interest shares if len(share_symbols) != 0: # iterate through all shares for i in range(len(share_symbols)): my_price = share_fetcher.get_share_price_no_currency(share_symbols[i]) my_update_message = f'{share_fetcher.get_share_information_markdown(share_symbols[i])}\ncount: {hf.make_markdown_proof(share_amounts[i])}\nTotal: {hf.make_markdown_proof(round(float(my_price) * float(share_amounts[i]), 2))} EUR' bot.send_message(chat_id=p_user_id, text=my_update_message, parse_mode="MARKDOWNV2") else: send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id) if len(shares) != 0: # Send updates on watchlist shares if existing send_to_user("Your watchlist shares:", pUser_id=p_user_id) for element in shares: send_to_user(share_fetcher.get_share_information_markdown(element), pUser_id=p_user_id, md_mode=True) keywords = p_my_handler.get_user_keywords(p_user_id) # get keywords as array if (keywords): # if keywords exist and array is not empty send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id) for keyword in keywords: news = news_fetcher.get_top_news_by_keyword(keyword)["articles"] keyword = hf.make_markdown_proof(keyword) if not news: # if empty news array send_to_user(f"No news found for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True) elif news == None: # if news is none send_to_user(f"Server error for keyword _{keyword}_\.", pUser_id=p_user_id, md_mode=True) else: news_formatted = news_fetcher.format_article(news[0]) # format for message, only use the most popular article send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True) # send news with related keyword in Markdown