TelegramAktienBot/telegram_bot/bot.py
2022-05-03 09:22:51 +02:00

603 lines
24 KiB
Python

"""
script for telegram bot and its functions
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "26.04.2022"
__version__ = "1.2.2"
__license__ = "None"
# side-dependencies: none
# Work in Progress
# Api-Key: 5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs /debugAPI Key: 5108535940:AAF5FpPHNV96WxGCDt8aMrGGKke1VILYib4 (https://t.me/mynewdebugbot)
# text bot at t.me/projektaktienbot
# API Documentation https://core.telegram.org/bots/api
# Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started
import os
import telebot
import sys
import logging
import re
import news.news_fetcher as news
import shares.share_fetcher as share_fetcher
import datetime as dt
from telebot import types
from dotenv import load_dotenv
from api_handling.api_handler import API_Handler
load_dotenv(dotenv_path='.env') # load environment variables
bot_version = "1.0.1" # version of bot
#create api handler
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars.
print("Webserver Token: " + str(api_handler.token))
bot = telebot.TeleBot(os.getenv('BOT_API_KEY'))
@bot.message_handler(commands=['start', 'Start'])
def send_start(message):
""" Sending welcome message to new user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/start'
:raises: none
:rtype: none
"""
bot.reply_to(message, "Welcome to this share bot project. \
\nType /help to get information on what this bot can do. \
\nAlso see https://gruppe1.testsites.info \
to start configuring your bot")
@bot.message_handler(commands=['version', 'Version'])
def send_version(message):
""" Sending programm version
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/version'
:raises: none
:rtype:none
"""
bot.reply_to(message, "the current bot version is " + bot_version)
@bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions
def send_help(message):
""" Send all functions
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/help'
:raises: none
:rtype: none
"""
bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/setAdmin set admin rights of user (ADMIN)\n/users see all users. (ADMIN)\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n/portfolio see own portfolio\n/newtransaction add new transaction\n/interval get update interval\n/setinterval set update interval\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN')
@bot.message_handler(commands=['users', 'Users']) # /users -> sending all users
def send_all_users(message):
""" Send all users, only possible for admins
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/users'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
user_list = api_handler.get_all_users()
user_count = len(user_list)
bot.send_message(chat_id=user_id, text="There are " + str(user_count) + " users in the database:")
for user in user_list:
username = user['username']
email = user['email']
id = user['telegram_user_id']
cron = user['cron']
admin = user['admin']
bot.send_message(chat_id=user_id, text=f'Username: {username}\nEmail: {email}\nID: {id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['setAdmin', 'SetAdmin', 'setadmin', 'Setadmin']) # set admin rights to user TBD: not working!!
def set_admin(message):
""" Set admin rights to user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/setAdmin'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
bot.send_message(chat_id=user_id, text='send email and true if this account should have admin rights, else false\n in format: <email>,<is_admin>') # request email and admin rights to change to
bot.register_next_step_handler(message, set_admin_step)
def set_admin_step(message):
str_message = str(message.text)
args_message = str_message.split(',') # split message into email and admin rights
if len(args_message) != 2: # make sure 2 args (email,is_admin) are given
bot.reply_to(message, "exactly 2 arguments (<email>,<is_admin>) required, try again")
return
email = args_message[0]
is_admin = False # default: False
if args_message[1].lower() == "true": # if user types true, set is_admin to true
is_admin = True
status = api_handler.set_admin(email, is_admin) # set admin in db
if(status == 200):
bot.reply_to(message, "Admin rights set")
else:
bot.reply_to(message, f"Admin rights could not be set ({status})")
@bot.message_handler(commands=['me', 'Me']) # /me -> sending user info
def send_user(message):
""" Send user data
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/me'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if not user_data or user_data == None: # true if user is not registered
bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info")
return
username = user_data['username']
email = user_data['email']
user_id = user_data['telegram_user_id']
cron = user_data['cron']
admin = user_data['admin']
bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool
def send_id(message):
""" Send user id for authentication with browser
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/id' or '/auth'
:raises: none
:rtype: none
"""
answer = 'Your ID/Authentication Code is: [' + str(message.from_user.id) + ']. Enter this code in the settings on https://gruppe1.testsites.info to get updates on your shares.'
bot.reply_to(message, answer)
#function that can be used to ensure that the bot is online and running
@bot.message_handler(commands=['status', 'Status'])
def send_status(message):
""" Sends status to user
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
:raises: none
:rtype: none
"""
bot.reply_to(message, "bot is running")
@bot.message_handler(commands=['update', 'Update']) # /update -> update shares
def update_for_user(message):
p_user_id = int(message.from_user.id)
p_my_handler = api_handler
share_symbols = []
share_amounts = []
share_courses = []
my_portfolio = p_my_handler.get_user_portfolio(p_user_id)
for element in my_portfolio:
if element["count"] != '' and element["isin"]!= '':
print(element["count"], element["isin"])
share_symbols.append(element["isin"])
share_amounts.append(element["count"])
share_courses.append(element["current_price"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update:"%str(my_user["username"]), pUser_id=p_user_id)
if len(share_symbols) != 0:
for i in range(len(share_symbols)):
my_update_message = f'Symbol: {share_symbols[i]}\nCurrent Price per Share: {share_courses[i]}\nAmount owned: {share_amounts[i]}\nTotal Investment: {float(share_courses[i]) * float(share_amounts[i])}'
send_to_user(my_update_message, pUser_id=p_user_id)
else:
send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)
def send_to_user(pText, pUser_id):
""" Send message to user
:type pText: string
:param pText: Text to send to user
:type pUser_id: int
:param pUser_id: user to send to. per default me (Florian Kellermann)
:raises: none
:rtype: none
"""
bot.send_message(chat_id=pUser_id, text=pText)
@bot.message_handler(commands=['share', 'Share']) # /share -> get share price
def send_share_update(message):
""" Send price of a specific share
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/share'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
#Get Information for user with this id
bot.send_message(chat_id=user_id, text='Send symbol of share:')
#str_share_price = shares.wait_for_share.main_loop(bot)
bot.register_next_step_handler(message, send_share_price)
def send_share_price(message):
str_share_price = share_fetcher.get_share_price(str(message.text))
bot.reply_to(message, str_share_price) # add dollar symbol etc.
@bot.message_handler(commands=['allnews', 'Allnews']) # /allnews -> get all news
def send_all_news(message):
""" Get news for keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/allnews'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /news')
return
keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc.
now = dt.datetime.now().date() # get current date
from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max
from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d')
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects
if news_list: # true if news_list is not empty
for article in news_list:
formatted_article = news.format_article(article)
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN") # Markdown allows to write bold text with * etc.
else:
bot.send_message(chat_id=user_id, text='No news found for your keywords.')
@bot.message_handler(commands=['news', 'News']) # /news -> get news for specific keyword
def send_news(message):
""" Get news for keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/news'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword')
return
if keywords:
for keyword in keywords:
top_news = news.get_top_news_by_keyword(keyword)["articles"]
if top_news == None: # true if request to NewsAPI failed
bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.')
if not top_news: # true if no news found for keyword (empty list)
bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN")
else:
formatted_article = news.format_article(top_news[0]) # only format and send most popular news
bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN")
@bot.message_handler(commands=['addkeyword', 'Addkeyword']) # /addkeyword -> add keyword to user
def add_keyword(message):
""" Add keyword to user
:type message: message object bot
:param message: message that was reacted to, in this case always '/addkeyword'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to add:')
bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function
def store_keyword(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords
status = api_handler.set_keyword(user_id, keyword) # set keyword in database
if status == 200: # statuscode 200 means keyword was added successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here
else:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info (statuscode {status})')
@bot.message_handler(commands=['removekeyword', 'Removekeyword']) # /removekeyword -> remove keyword from user
def remove_keyword(message):
""" Remove keyword from user
:type message: message object bot
:param message: message that was reacted to, in this case always '/removekeyword'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to remove:')
bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function
def remove_keyword_step(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower()
status = api_handler.delete_keyword(user_id, keyword)
if status == 200: # statuscode 200 means keyword was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here
else:
bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})')
@bot.message_handler(commands=['keywords', 'Keywords']) # /keywords -> get keywords of user
def send_keywords(message):
""" Send keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always '/keywords'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword')
return
else: # send keyword list
keywords_str = ', '.join(keywords)
bot.send_message(chat_id=user_id, text=f'Your keywords are: _{keywords_str}_', parse_mode="MARKDOWN")
@bot.message_handler(commands=['portfolio', 'Portfolio'])
def send_portfolio(message):
""" Send portfolio of user
:type message: message object bot
:param message: message that was reacted to, in this case always '/portfolio'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json
if portfolio == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not portfolio: # true if user is registered but does not have any stocks in portfolio
bot.send_message(chat_id=user_id, text='You do not have any stocks in your portfolio.')
return
else: # send portfolio
for stock in portfolio:
comment = str(stock["comment"]) # comment may be written name of stock, comment is made by user when adding an stock to portfolio
count = "{:.2f}".format(float(stock["count"])) # round count to 2 decimal places
isin = str(stock["isin"])
worth = "{:.2f}".format(float(stock["current_price"]) * float(stock["count"])) # round current_price to 2 decimal places
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWN") # formatted message in markdown
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn may be deleted in future
def set_new_transaction(message):
""" Set new transaction for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/newtransaction'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type "<name of stock>,<isin>,<amount>,<price_per_stock_usd>" (time of transaction will be set to now, negative amount is selling, positive is buying):')
bot.register_next_step_handler(message, set_new_transaction_step)
def set_new_transaction_step(message):
user_id = int(message.from_user.id)
if not re.match(r"[A-Za-z0-9]+,[A-Za-z0-9]+,(-)?[0-9]+(.[0-9]+)?,[0-9]+(.[0-9]+)?", message.text):
bot.send_message(chat_id=user_id, text='Invalid format \n(e.g. Apple,US0378331005,53.2,120.4).\n Try again with /newtransaction.')
return
transaction_data = str(message.text).split(',')
desc = str(transaction_data[0])
isin = str(transaction_data[1])
amount = float(transaction_data[2])
price = float(transaction_data[3])
time = dt.datetime.now().isoformat()
print("\n\n\n\n\n")
print(f"{isin},{amount},{price},{time}")
status = api_handler.set_transaction(user_id, desc, isin, amount, price, time)
if status == 200:
bot.send_message(chat_id=user_id, text='Transaction succesfully added.')
else:
bot.send_message(chat_id=user_id, text=f'Failed adding transaction. (statuscode {status})')
@bot.message_handler(commands=['interval', 'Interval'])
def send_interval(message):
""" send interval for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/interval'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data)
if user_data == None: # true if user is not registered in DB
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info and set an interval with /setinterval')
return
else: # send interval
interval = str(user_data['cron']) # get cron from user data
if interval == 'None': # true if user has no cron set
bot.send_message(chat_id=user_id, text='You do not have an interval set. Set one with /setinterval')
return
formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru
bot.send_message(chat_id=user_id, text=f'Your update interval: {interval} (https://crontab.guru/#{formatted_interval})')
@bot.message_handler(commands=['setinterval', 'Setinterval'])
def set_new_interval(message):
""" Set new interval for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/setinterval'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type interval in cron format:\n(https://crontab.guru/)')
bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message
def set_new_interval_step(message):
user_id = int(message.from_user.id)
interval = str(message.text)
status = api_handler.set_cron_interval(user_id, interval) # send cron to db
if status == 200:
bot.send_message(chat_id=user_id, text='Interval succesfully set.')
return
if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format
bot.send_message(chat_id=user_id, text='Invalid interval format. Try again with\n /setinterval.')
return
else:
bot.send_message(chat_id=user_id, text=f'Failed setting interval. (statuscode {status})')
@bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement
def echo_all(message):
""" Tell that command is not known if it is no known command
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
:raises: none
:rtype: none
"""
answer = 'Do not know this command or text: ' + message.text
bot.reply_to(message, answer)
telebot.logger.setLevel(logging.DEBUG)
@bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging
def query_text(inline_query):
""" Output in the console about current user actions and status of bot
:type inline_query:
:param inline_query:
:raises: none
:rtype: none
"""
try:
r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent('hi'))
r2 = types.InlineQueryResultArticle('2', 'Result2', types.InputTextMessageContent('hi'))
bot.answer_inline_query(inline_query.id, [r, r2])
except Exception as e:
print(e)
def main_loop():
""" Start bot
:raises: none
:rtype: none
"""
bot.infinity_polling()
if __name__ == '__main__':
try:
main_loop()
except KeyboardInterrupt:
print('\nExiting by user request.\n')
sys.exit(0)