""" script for telegram bot and its functions """ __author__ = "Florian Kellermann, Linus Eickhoff" __date__ = "11.03.2022" __version__ = "0.0.4" __license__ = "None" # side-dependencies: none # Work in Progress # Api-Key: 5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs /debugAPI Key: 5108535940:AAF5FpPHNV96WxGCDt8aMrGGKke1VILYib4 (https://t.me/mynewdebugbot) # text bot at t.me/projektaktienbot # API Documentation https://core.telegram.org/bots/api # Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started from ast import keyword import os import telebot import time import sys import logging import json import news.news_fetcher as news import shares.share_fetcher as share_fetcher import datetime as dt from telebot import types from dotenv import load_dotenv from api_handling.api_handler import API_Handler load_dotenv(dotenv_path='.env') bot_version = "0.2.1" user_list = [] #create api handler api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) print("Webserver Token: " + str(api_handler.token)) class User: # Currently saving users in this class to test functionality -> later database def __init__(self, p_user_id, p_user_name, p_chat_id): """ Initialize a new user :type self: User :param self: object of the class :type p_user_id: int :param p_user_id: telegram user id :type p_user_name: str :param p_user_name: first name of user :type p_chat_id: int :param p_chat_id: telegram chat id :raises: :rtype: """ self.user_id = int(p_user_id) self.chat_id = int(p_chat_id) self.user_name = str(p_user_name) bot = telebot.TeleBot(os.getenv('BOT_API_KEY')) @bot.message_handler(commands=['start']) # /start -> saving as new user and sending welcome def send_start(message): """ Description :type message: message object bot :param message: message that was reacted to, in this case always containing '/start' :raises: none :rtype: none """ new_user = User(int(message.from_user.id), message.from_user.first_name, int(message.chat.id)) existing_already = False for known_user in user_list: if known_user.user_id == new_user.user_id: existing_already = True if existing_already == False: user_list.append(new_user) bot.reply_to(message, "Welcome to this share bot project. Type /help to get information on what this bot can do") @bot.message_handler(commands=['version']) def send_version(message): """ Sending programm version :type message: message object bot :param message: message that was reacted to, in this case always containing '/version' :raises: none :rtype:none """ bot.reply_to(message, bot_version) @bot.message_handler(commands=['help']) # /help -> sending all functions def send_welcome(message): """ Send all functions :type message: message object bot :param message: message that was reacted to, in this case always containing '/help' :raises: none :rtype: none """ bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/users see all users.\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN') @bot.message_handler(commands=['users']) def send_all_users(message): """ Send all users, only possible for admins :type message: message object bot :param message: message that was reacted to, in this case always containing '/users' :raises: none :rtype: none """ print('Debug: users command') user_id = int(message.from_user.id) # tbd check if user is admin answer = 'Current number of users: ' + str(len(user_list)) bot.send_message(chat_id = user_id, text=answer) for known_user in user_list: answer = str(known_user.user_id) + ' : ' + known_user.user_name bot.send_message(chat_id=user_id, text=answer) @bot.message_handler(commands=['id', 'auth']) # /auth or /id -> Authentication with user_id over web tool def send_id(message): """ Send user id for authentication with browser :type message: message object bot :param message: message that was reacted to, in this case always containing '/id' or '/auth' :raises: none :rtype: none """ answer = 'Your ID/Authentication Code is: [' + str(message.from_user.id) + ']. Enter this code in the settings on https://gruppe1.testsites.info to get updates on your shares.' bot.reply_to(message, answer) #function that sends telegram status(running or offline) as message from telegram bot to user @bot.message_handler(commands=['status']) def send_status(message): """ Sends status to user :type message: message object bot :param message: message that was reacted to, if no other command handler gets called :raises: none :rtype: none """ bot.reply_to(message, "bot is running") @bot.message_handler(commands=['update']) def send_update(message): """ Send update on shares :type message: message object bot :param message: message that was reacted to, in this case always containing '/help' :raises: none :rtype: none """ user_id = int(message.from_user.id) #Can be deleted when getting from database dirname = os.path.dirname(__file__) json_path = os.path.join(dirname, 'shares/shares_example.json') with open(json_path) as json_file: json_share_data = json.load(json_file) int_share_count = int(json_share_data['share_count']) str_username = str(json_share_data['user']) bot.send_message(chat_id=user_id, text=f'Hello {str_username}. Here is the update on your currently owned shares:') for i in range(int_share_count): my_share = json_share_data['shares'][i] my_share_symbol = str(my_share['symbol']) my_share_amount = float(my_share['amount_bought']) my_share_buy_price = float(my_share['price_bought']) my_share_course = float(share_fetcher.get_share_price(my_share_symbol)) my_update_message = f'Symbol: {my_share_symbol}\nPrice: {my_share_course}\nBought for: {my_share_buy_price}\n\ Amount owned: {my_share_amount}\nWin/Lose: {(my_share_amount*my_share_course) - (my_share_amount*my_share_buy_price)}' bot.send_message(chat_id=user_id, text=my_update_message) @bot.message_handler(commands=['share']) def send_share_update(message): """ Send price of a specific share :type message: message object bot :param message: message that was reacted to, in this case always containing '/share' :raises: none :rtype: none """ user_id = int(message.from_user.id) #Get Information for user with this id bot.send_message(chat_id=user_id, text='Send symbol of share:') #str_share_price = shares.wait_for_share.main_loop(bot) bot.register_next_step_handler(message, send_share_price) def send_share_price(message): str_share_price = share_fetcher.get_share_price(str(message.text)) bot.reply_to(message, str_share_price) @bot.message_handler(commands=['allnews']) def send_all_news(message): """ Get news for keywords of user :type message: message object bot :param message: message that was reacted to, in this case always containing '/allnews' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) if keywords == None: bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure too connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /news') return keywords_search = ' OR '.join(keywords) print(keywords_search) now = dt.datetime.now().date() from_date = now - dt.timedelta(days=7) from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d') print(from_date_formatted) news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] if news_list: for article in news_list: formatted_article = news.format_article(article) bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN") else: bot.send_message(chat_id=user_id, text='No news found for your keywords.') @bot.message_handler(commands=['news']) def send_news(message): """ Get news for keywords of user :type message: message object bot :param message: message that was reacted to, in this case always containing '/news' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) if keywords == None: bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure too connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword') return if keywords: for keyword in keywords: top_news = news.get_top_news_by_keyword(keyword)["articles"] if top_news == None: bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.') return if not top_news: bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN") return formatted_article = news.format_article(top_news[0]) bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN") @bot.message_handler(commands=['addkeyword']) def add_keyword(message): """ Add keyword to user :type message: message object bot :param message: message that was reacted to, in this case always '/addkeyword' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type keyword to add:') bot.register_next_step_handler(message, store_keyword) def store_keyword(message): user_id = int(message.from_user.id) print(str(user_id)) keyword = str(message.text).lower() status = api_handler.set_keyword(user_id, keyword) if status == 200: bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') else: bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. (statuscode {status})') @bot.message_handler(commands=['removekeyword']) def remove_keyword(message): """ Remove keyword from user :type message: message object bot :param message: message that was reacted to, in this case always '/removekeyword' :raises: none :rtype: none """ user_id = int(message.from_user.id) bot.send_message(chat_id=user_id, text='Type keyword to remove:') bot.register_next_step_handler(message, remove_keyword_step) def remove_keyword_step(message): user_id = int(message.from_user.id) keyword = str(message.text).lower() status = api_handler.delete_keyword(user_id, keyword) if status == 200: bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') else: bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})') @bot.message_handler(commands=['keywords']) def send_keywords(message): """ Send keywords of user :type message: message object bot :param message: message that was reacted to, in this case always '/keywords' :raises: none :rtype: none """ user_id = int(message.from_user.id) keywords = api_handler.get_user_keywords(user_id) if keywords == None: bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure too connect your telegram id (/id) on https://gruppe1.testsites.info') return if not keywords: bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword') return else: keywords_str = ', '.join(keywords) bot.send_message(chat_id=user_id, text=f'Your keywords are: _{keywords_str}_', parse_mode="MARKDOWN") @bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement def echo_all(message): """ Tell that command is not known if it is no known command :type message: message object bot :param message: message that was reacted to, if no other command handler gets called :raises: none :rtype: none """ answer = 'Do not know this command or text: ' + message.text bot.reply_to(message, answer) telebot.logger.setLevel(logging.DEBUG) @bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging def query_text(inline_query): """ Output in the console about current user actions and status of bot :type inline_query: :param inline_query: :raises: none :rtype: none """ try: r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent('hi')) r2 = types.InlineQueryResultArticle('2', 'Result2', types.InputTextMessageContent('hi')) bot.answer_inline_query(inline_query.id, [r, r2]) except Exception as e: print(e) def main_loop(): """ Start bot :raises: none :rtype: none """ bot.infinity_polling() if __name__ == '__main__': try: main_loop() except KeyboardInterrupt: print('\nExiting by user request.\n') sys.exit(0)