""" script for telegram bot and its functions """ __author__ = "Florian Kellermann, Linus Eickhoff" __date__ = "11.03.2022" __version__ = "0.0.4" __license__ = "None" # side-dependencies: none # Work in Progress # Api-Key: 5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs /debugAPI Key: 5108535940:AAF5FpPHNV96WxGCDt8aMrGGKke1VILYib4 (https://t.me/mynewdebugbot) # text bot at t.me/projektaktienbot # API Documentation https://core.telegram.org/bots/api # Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started import email import os import telebot import sys import logging import json import re import news.news_fetcher as news import shares.share_fetcher as share_fetcher import datetime as dt from telebot import types from dotenv import load_dotenv from api_handling.api_handler import API_Handler load_dotenv(dotenv_path='.env') # load environment variables bot_version = "1.0.1" # version of bot #create api handler api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars. print("Webserver Token: " + str(api_handler.token)) class User: # Currently saving users in this class to test functionality -> later database REMOVABLE def __init__(self, p_user_id, p_user_name, p_chat_id): """ Initialize a new user :type self: User :param self: object of the class :type p_user_id: int :param p_user_id: telegram user id :type p_user_name: str :param p_user_name: first name of user :type p_chat_id: int :param p_chat_id: telegram chat id :raises: :rtype: """ self.user_id = int(p_user_id) self.chat_id = int(p_chat_id) self.user_name = str(p_user_name) bot = telebot.TeleBot(os.getenv('BOT_API_KEY')) @bot.message_handler(commands=['start', 'Start']) # /start -> saving as new user and sending welcome def send_start(message): """ Description :type message: message object bot :param message: message that was reacted to, in this case always containing '/start' :raises: none :rtype: none """ bot.reply_to(message, "Welcome to this share bot project. Type /help to get information on what this bot can do") @bot.message_handler(commands=['version', 'Version']) def send_version(message): """ Sending programm version :type message: message object bot :param message: message that was reacted to, in this case always containing '/version' :raises: none :rtype:none """ bot.reply_to(message, "the current bot version is " + bot_version) @bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions def send_help(message): """ Send all functions :type message: message object bot :param message: message that was reacted to, in this case always containing '/help' :raises: none :rtype: none """ bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/setAdmin set admin rights of user (ADMIN)\n/users see all users. (ADMIN)\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n/portfolio see own portfolio\n/newtransaction add new transaction\n/interval get update interval\n/setinterval set update interval\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN') @bot.message_handler(commands=['users', 'Users']) # /users -> sending all users def send_all_users(message): """ Send all users, only possible for admins :type message: message object bot :param message: message that was reacted to, in this case always containing '/users' :raises: none :rtype: none """ user_id = int(message.from_user.id) user_data = api_handler.get_user(user_id) if(user_data["admin"] == False): # check if user has admin rights bot.reply_to(message, "You have to be an admin to use this command") return user_list = api_handler.get_all_users() user_count = len(user_list) bot.send_message(chat_id=user_id, text="There are " + str(user_count) + " users in the database:") for user in user_list: username = user['username'] email = user['email'] id = user['telegram_user_id'] cron = user['cron'] admin = user['admin'] bot.send_message(chat_id=user_id, text=f'Username: {username}\nEmail: {email}\nID: {id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text @bot.message_handler(commands=['setAdmin', 'SetAdmin']) # set admin rights to user TBD: not working!! def set_admin(message): """ Set admin rights to user :type message: message object bot :param message: message that was reacted to, in this case always containing '/setAdmin' :raises: none :rtype: none """ user_id = int(message.from_user.id) user_data = api_handler.get_user(user_id) if(user_data["admin"] == False): # check if user has admin rights bot.reply_to(message, "You have to be an admin to use this command") return bot.send_message(chat_id=user_id, text='send email and "true" if this account should have admin rights, else "false"\n in format: ,') # ask for email and admin rights to set bot.register_next_step_handler(message, set_admin_step) def set_admin_step(message): str_message = str(message.text) args_message = str_message.split(',') # split message into email and admin rights if len(args_message) != 2: # make sure 2 args (email,is_admin) are given bot.reply_to(message, "exactly 2 arguments (,) required, try again") return email = args_message[0] is_admin = args_message[1] status = api_handler.set_admin(email, is_admin) # set admin in db if(status == 200): bot.reply_to(message, "Admin rights set") else: bot.reply_to(message, f"Admin rights could not be set ({status})") @bot.message_handler(commands=['me', 'Me']) # /me -> sending user info def send_user(message): """ Send user data :type message: message object bot :param message: message that was reacted to, in this case always containing '/me' :raises: none :rtype: none """ user_id = int(message.from_user.id) user_data = api_handler.get_user(user_id) if not user_data or user_data == None: # true if user is not registered bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info") return username = user_data['username'] email = user_data['email'] user_id = user_data['telegram_user_id'] cron = user_data['cron'] admin = user_data['admin'] bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text @bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool def send_id(message): """ Send user id for authentication with browser :type message: message object bot :param message: message that was reacted to, in this case always containing '/id' or '/auth' :raises: none :rtype: none """ answer = 'Your ID/Authentication Code is: [' + str(message.from_user.id) + ']. Enter this code in the settings on https://gruppe1.testsites.info to get updates on your shares.' bot.reply_to(message, answer) #function that can be used to ensure that the bot is online and running @bot.message_handler(commands=['status', 'Status']) def send_status(message): """ Sends status to user :type message: message object bot :param message: message that was reacted to, if no other command handler gets called :raises: none :rtype: none """ bot.reply_to(message, "bot is running") @bot.message_handler(commands=['update', 'Update']) # /update -> update shares def update_for_user(message): p_user_id = int(message.from_user.id) p_my_handler = api_handler share_symbols = [] share_amounts = [] share_courses = [] my_portfolio = p_my_handler.get_user_portfolio(p_user_id) for element in my_portfolio: if element["count"] != '' and element["isin"]!= '': print(element["count"], element["isin"]) share_symbols.append(element["isin"]) share_amounts.append(element["count"]) share_courses.append(element["current_price"]) my_user = p_my_handler.get_user(p_user_id) send_to_user("Hello %s this is your share update:"%str(my_user["username"]), pUser_id=p_user_id) if len(share_symbols) != 0: for i in range(len(share_symbols)): my_update_message = f'Symbol: {share_symbols[i]}\nCurrent Price per Share: {share_courses[i]}\nAmount owned: {share_amounts[i]}\nTotal Investment: {float(share_courses[i]) * float(share_amounts[i])}' send_to_user(my_update_message, pUser_id=p_user_id) else: send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id) def send_to_user(pText, pUser_id): """ Send message to user :type pText: string :param pText: Text to send to user :type pUser_id: int :param pUser_id: user to send to. per default me (Florian Kellermann) :raises: none :rtype: none """ bot.send_message(chat_id=pUser_id, text=pText) @bot.message_handler(commands=['share', 'Share']) # /share -> get share price def send_share_update(message): """ Send price of a specific share :type message: message object bot :param message: message that was reacted to, in this case always containing '/share' :raises: none :rtype: none """ user_id = int(message.from_user.id) #Get Information for user with this id bot.send_message(chat_id=user_id, text='Send symbol of share:') #str_share_price = shares.wait_for_share.main_loop(bot) bot.register_next_step_handler(message, send_share_price) def send_share_price(message): str_share_price = share_fetcher.get_share_price(str(message.text)) bot.reply_to(message, str_share_price) @bot.message_handler(commands=['allnews', 'Allnews']) # /allnews -> get all news def send_all_news(message): """ Get news for keywords of user :type message: message object bot :param message: message that was reacted to, in this case always containing '/allnews' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) # get keywords of user if keywords == None: # true if user is not registered bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: # true if user is registered but does not have any keywords bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /news') return keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc. now = dt.datetime.now().date() # get current date from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d') news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects if news_list: # true if news_list is not empty for article in news_list: formatted_article = news.format_article(article) bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN") # Markdown allows to write bold text with * etc. else: bot.send_message(chat_id=user_id, text='No news found for your keywords.') @bot.message_handler(commands=['news', 'News']) # /news -> get news for specific keyword def send_news(message): """ Get news for keywords of user :type message: message object bot :param message: message that was reacted to, in this case always containing '/news' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) # get keywords of user if keywords == None: # true if user is not registered bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: # true if user is registered but does not have any keywords bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword') return if keywords: for keyword in keywords: top_news = news.get_top_news_by_keyword(keyword)["articles"] if top_news == None: # true if request to NewsAPI failed bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.') if not top_news: # true if no news found for keyword (empty list) bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN") else: formatted_article = news.format_article(top_news[0]) # only format and send most popular news bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN") @bot.message_handler(commands=['addkeyword', 'Addkeyword']) # /addkeyword -> add keyword to user def add_keyword(message): """ Add keyword to user :type message: message object bot :param message: message that was reacted to, in this case always '/addkeyword' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type keyword to add:') bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function def store_keyword(message): user_id = int(message.from_user.id) keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords status = api_handler.set_keyword(user_id, keyword) # set keyword in database if status == 200: # statuscode 200 means keyword was added successfully without errors bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here else: bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info (statuscode {status})') @bot.message_handler(commands=['removekeyword', 'Removekeyword']) # /removekeyword -> remove keyword from user def remove_keyword(message): """ Remove keyword from user :type message: message object bot :param message: message that was reacted to, in this case always '/removekeyword' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type keyword to remove:') bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function def remove_keyword_step(message): user_id = int(message.from_user.id) keyword = str(message.text).lower() status = api_handler.delete_keyword(user_id, keyword) if status == 200: # statuscode 200 means keyword was removed successfully without errors bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here else: bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})') @bot.message_handler(commands=['keywords', 'Keywords']) # /keywords -> get keywords of user def send_keywords(message): """ Send keywords of user :type message: message object bot :param message: message that was reacted to, in this case always '/keywords' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) # get keywords of user if keywords == None: # true if user is not registered bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: # true if user is registered but does not have any keywords bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword') return else: # send keyword list keywords_str = ', '.join(keywords) bot.send_message(chat_id=user_id, text=f'Your keywords are: _{keywords_str}_', parse_mode="MARKDOWN") @bot.message_handler(commands=['portfolio', 'Portfolio']) def send_portfolio(message): """ Send portfolio of user :type message: message object bot :param message: message that was reacted to, in this case always '/portfolio' :raises: none :rtype: none """ user_id = int(message.from_user.id) portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json if portfolio == None: # true if user is not registered bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info') return if not portfolio: # true if user is registered but does not have any stocks in portfolio bot.send_message(chat_id=user_id, text='You do not have any stocks in your portfolio.') return else: # send portfolio for stock in portfolio: comment = str(stock["comment"]) # comment may be written name of stock, comment is made by user when adding an stock to portfolio count = "{:.2f}".format(float(stock["count"])) # round count to 2 decimal places isin = str(stock["isin"]) worth = "{:.2f}".format(float(stock["current_price"]) * float(stock["count"])) # round current_price to 2 decimal places bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWN") # formatted message in markdown @bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn may be deleted in future def set_new_transaction(message): """ Set new transaction for user :type message: message object bot :param message: message that was reacted to, in this case always '/newtransaction' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type ",,," (time of transaction will be set to now):') bot.register_next_step_handler(message, set_new_transaction_step) def set_new_transaction_step(message): user_id = int(message.from_user.id) if not re.match(r"[A-Za-z0-9]+,[A-Za-z0-9]+,[0-9]+(.[0-9]+)?,[0-9]+(.[0-9]+)?", message.text): bot.send_message(chat_id=user_id, text='Invalid format \n(e.g. Apple,US0378331005,53.2,120.4).\n Try again with /newtransaction.') return transaction_data = str(message.text).split(',') desc = str(transaction_data[0]) isin = str(transaction_data[1]) amount = float(transaction_data[2]) price = float(transaction_data[3]) time = dt.datetime.now().isoformat() #print("\n\n\n\n\n") #print(f"{symbol},{amount},{price},{time}") status = api_handler.set_transaction(user_id, desc, isin, amount, price, time) if status == 200: bot.send_message(chat_id=user_id, text='Transaction succesfully added.') else: bot.send_message(chat_id=user_id, text=f'Failed adding transaction. (statuscode {status})') @bot.message_handler(commands=['interval', 'Interval']) def send_interval(message): """ send interval for user :type message: message object bot :param message: message that was reacted to, in this case always '/interval' :raises: none :rtype: none """ user_id = int(message.from_user.id) user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data) if user_data == None: # true if user is not registered in DB bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info and set an interval with /setinterval') return else: # send interval interval = str(user_data['cron']) # get cron from user data if interval == 'None': # true if user has no cron set bot.send_message(chat_id=user_id, text='You do not have an interval set. Set one with /setinterval') return formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru bot.send_message(chat_id=user_id, text=f'Your update interval: {interval} (https://crontab.guru/#{formatted_interval})') @bot.message_handler(commands=['setinterval', 'Setinterval']) def set_new_interval(message): """ Set new interval for user :type message: message object bot :param message: message that was reacted to, in this case always '/setinterval' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type interval in cron format:\n(https://crontab.guru/)') bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message def set_new_interval_step(message): user_id = int(message.from_user.id) interval = str(message.text) status = api_handler.set_cron_interval(user_id, interval) # send cron to db if status == 200: bot.send_message(chat_id=user_id, text='Interval succesfully set.') return if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format bot.send_message(chat_id=user_id, text='Invalid interval format. Try again with\n /setinterval.') return else: bot.send_message(chat_id=user_id, text=f'Failed setting interval. (statuscode {status})') @bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement def echo_all(message): """ Tell that command is not known if it is no known command :type message: message object bot :param message: message that was reacted to, if no other command handler gets called :raises: none :rtype: none """ answer = 'Do not know this command or text: ' + message.text bot.reply_to(message, answer) telebot.logger.setLevel(logging.DEBUG) @bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging def query_text(inline_query): """ Output in the console about current user actions and status of bot :type inline_query: :param inline_query: :raises: none :rtype: none """ try: r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent('hi')) r2 = types.InlineQueryResultArticle('2', 'Result2', types.InputTextMessageContent('hi')) bot.answer_inline_query(inline_query.id, [r, r2]) except Exception as e: print(e) def main_loop(): """ Start bot :raises: none :rtype: none """ bot.infinity_polling() if __name__ == '__main__': try: main_loop() except KeyboardInterrupt: print('\nExiting by user request.\n') sys.exit(0)