""" script for telegram bot and its functions """ __author__ = "Florian Kellermann, Linus Eickhoff" __date__ = "11.03.2022" __version__ = "0.0.3" __license__ = "None" # side-dependencies: none # Work in Progress # Api-Key: 5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs /debugAPI Key: 5108535940:AAF5FpPHNV96WxGCDt8aMrGGKke1VILYib4 (https://t.me/mynewdebugbot) # text bot at t.me/projektaktienbot # API Documentation https://core.telegram.org/bots/api # Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started import os import telebot import time import sys import logging import news_fetcher from telebot import types from dotenv import load_dotenv load_dotenv() bot_version = "0.1.1" user_list = [] class User: # Currently saving users in this class to test functionality -> later database def __init__(self, p_user_id, p_user_name, p_chat_id): self.user_id = int(p_user_id) self.chat_id = int(p_chat_id) self.user_name = str(p_user_name) bot = telebot.TeleBot(os.getenv('BOT_API_KEY')) @bot.message_handler(commands=['start']) # /start -> saving as new user and sending welcome def send_start(message): new_user = User(int(message.from_user.id), message.from_user.first_name, int(message.chat.id)) existing_already = False for known_user in user_list: if known_user.user_id == new_user.user_id: existing_already = True if existing_already == False: user_list.append(new_user) bot.reply_to(message, "Welcome to this share bot project. Type /help to get information on what this bot can do") @bot.message_handler(commands=['version']) def send_version(message): bot.reply_to(message, bot_version) @bot.message_handler(commands=['help']) # /help -> sending all functions def send_welcome(message): bot.reply_to(message, "/id or /auth for authentication. /update to get updates on your shares. /users to see all users. For further details see aktienbot.flokaiser.com") @bot.message_handler(commands=['users']) def send_all_users(message): print('Debug: users command') user_id = int(message.from_user.id) answer = 'Current number of users: ' + str(len(user_list)) bot.send_message(chat_id = user_id, text=answer) for known_user in user_list: answer = str(known_user.user_id) + ' : ' + known_user.user_name bot.send_message(chat_id=user_id, text=answer) @bot.message_handler(commands=['id', 'auth']) # /auth or /id -> Authentication with user_id over web tool def send_id(message): answer = 'Your ID/Authentication Code is: [' + str(message.from_user.id) + ']. Enter this code in the settings on aktienbot.flokaiser.com to get updates on your shares.' bot.reply_to(message, answer) @bot.message_handler(commands=['update']) # /update -> send static update via user_id to this user, later fetch from database def send_update(message): user_id = int(message.from_user.id) #Get Information for user with this id bot.send_message(chat_id=user_id, text='This is your update') @bot.message_handler(commands=['news']) # /news -> send news by keyword def send_news(message): keyword = "business" user_id = int(message.from_user.id) #Get Information for user with this id articles = news_fetcher.get_top_news_by_keyword(keyword) #tbd: get keyword from db try: formatted_article = news_fetcher.format_article(articles["articles"][0]) except IndexError: bot.send_message(chat_id=user_id, text=f"no news currently available for keyword: {keyword}") return bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN") @bot.message_handler(func=lambda message: True) # Returning that command is unkown for any other statement def echo_all(message): answer = 'Do not know this command or text: ' + message.text bot.reply_to(message, answer) telebot.logger.setLevel(logging.DEBUG) @bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging def query_text(inline_query): try: r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent('hi')) r2 = types.InlineQueryResultArticle('2', 'Result2', types.InputTextMessageContent('hi')) bot.answer_inline_query(inline_query.id, [r, r2]) except Exception as e: print(e) def main_loop(): bot.infinity_polling() while 1: time.sleep(3) if __name__ == '__main__': try: main_loop() except KeyboardInterrupt: print('\nExiting by user request.\n') sys.exit(0)