TelegramAktienBot/telegram_bot/bot.py
2022-04-25 18:25:22 +02:00

580 lines
22 KiB
Python

"""
script for telegram bot and its functions
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "11.03.2022"
__version__ = "0.0.4"
__license__ = "None"
# side-dependencies: none
# Work in Progress
# Api-Key: 5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs /debugAPI Key: 5108535940:AAF5FpPHNV96WxGCDt8aMrGGKke1VILYib4 (https://t.me/mynewdebugbot)
# text bot at t.me/projektaktienbot
# API Documentation https://core.telegram.org/bots/api
# Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started
import email
import os
import telebot
import sys
import logging
import json
import re
import news.news_fetcher as news
import shares.share_fetcher as share_fetcher
import datetime as dt
from telebot import types
from dotenv import load_dotenv
from api_handling.api_handler import API_Handler
load_dotenv(dotenv_path='.env') # load environment variables
bot_version = "1.0.1"
user_list = []
#create api handler
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars.
print("Webserver Token: " + str(api_handler.token))
class User: # Currently saving users in this class to test functionality -> later database REMOVABLE
def __init__(self, p_user_id, p_user_name, p_chat_id):
""" Initialize a new user
:type self: User
:param self: object of the class
:type p_user_id: int
:param p_user_id: telegram user id
:type p_user_name: str
:param p_user_name: first name of user
:type p_chat_id: int
:param p_chat_id: telegram chat id
:raises:
:rtype:
"""
self.user_id = int(p_user_id)
self.chat_id = int(p_chat_id)
self.user_name = str(p_user_name)
bot = telebot.TeleBot(os.getenv('BOT_API_KEY'))
@bot.message_handler(commands=['start', 'Start']) # /start -> saving as new user and sending welcome
def send_start(message):
""" Description
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/start'
:raises: none
:rtype: none
"""
new_user = User(int(message.from_user.id), message.from_user.first_name, int(message.chat.id)) # now Database driven CHANGE
existing_already = False
for known_user in user_list:
if known_user.user_id == new_user.user_id:
existing_already = True
if existing_already == False:
user_list.append(new_user)
bot.reply_to(message, "Welcome to this share bot project. Type /help to get information on what this bot can do")
@bot.message_handler(commands=['version', 'Version'])
def send_version(message):
""" Sending programm version
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/version'
:raises: none
:rtype:none
"""
bot.reply_to(message, bot_version)
@bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions
def send_welcome(message):
""" Send all functions
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/help'
:raises: none
:rtype: none
"""
bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/users see all users.\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n/portfolio see own portfolio\n/newtransaction add new transaction\n/interval get update interval\n/setinterval set update interval\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN')
@bot.message_handler(commands=['users', 'Users']) # /users -> sending all users
def send_all_users(message):
""" Send all users, only possible for admins
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/users'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
# tbd check if user is admin
answer = 'Current number of users: ' + str(len(user_list))
bot.send_message(chat_id = user_id, text=answer)
for known_user in user_list:
answer = str(known_user.user_id) + ' : ' + known_user.user_name
bot.send_message(chat_id=user_id, text=answer)
@bot.message_handler(commands=['me', 'Me']) # /me -> sending user info
def send_user(message):
""" Send user data
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/me'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if not user_data or user_data == None: # true if user is not registered
bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info")
return
username = user_data['username']
email = user_data['email']
user_id = user_data['telegram_user_id']
cron = user_data['cron']
admin = user_data['admin']
bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool
def send_id(message):
""" Send user id for authentication with browser
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/id' or '/auth'
:raises: none
:rtype: none
"""
answer = 'Your ID/Authentication Code is: [' + str(message.from_user.id) + ']. Enter this code in the settings on https://gruppe1.testsites.info to get updates on your shares.'
bot.reply_to(message, answer)
#function that can be used to ensure that the bot is online and running
@bot.message_handler(commands=['status', 'Status'])
def send_status(message):
""" Sends status to user
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
:raises: none
:rtype: none
"""
bot.reply_to(message, "bot is running")
@bot.message_handler(commands=['update', 'Update']) # /update -> update shares
def update_for_user(message):
p_user_id = int(message.from_user.id)
p_my_handler = api_handler
share_symbols = []
share_amounts = []
share_courses = []
my_portfolio = p_my_handler.get_user_portfolio(p_user_id)
for element in my_portfolio:
if element["count"] != '' and element["symbol"]!= '':
print(element["count"], element["symbol"])
share_symbols.append(element["symbol"])
share_amounts.append(element["count"])
share_courses.append(element["current_price"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update:"%str(my_user["username"]), pUser_id=p_user_id)
if len(share_symbols) != 0:
for i in range(len(share_symbols)):
my_update_message = f'Symbol: {share_symbols[i]}\nCurrent Price per Share: {share_courses[i]}\nAmount owned: {share_amounts[i]}\nTotal Investment: {float(share_courses[i]) * float(share_amounts[i])}'
send_to_user(my_update_message, pUser_id=p_user_id)
else:
send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)
def send_to_user(pText, pUser_id = 1770205310):
""" Send message to user
:type pText: string
:param pText: Text to send to user
:type pUser_id: int
:param pUser_id: user to send to. per default me (Florian Kellermann)
:raises: none
:rtype: none
"""
bot.send_message(chat_id=pUser_id, text=pText)
@bot.message_handler(commands=['share', 'Share']) # /share -> get share price
def send_share_update(message):
""" Send price of a specific share
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/share'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
#Get Information for user with this id
bot.send_message(chat_id=user_id, text='Send symbol of share:')
#str_share_price = shares.wait_for_share.main_loop(bot)
bot.register_next_step_handler(message, send_share_price)
def send_share_price(message):
str_share_price = share_fetcher.get_share_price(str(message.text))
bot.reply_to(message, str_share_price)
@bot.message_handler(commands=['allnews', 'Allnews']) # /allnews -> get all news
def send_all_news(message):
""" Get news for keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/allnews'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /news')
return
keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc.
now = dt.datetime.now().date() # get current date
from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max
from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d')
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects
if news_list: # true if news_list is not empty
for article in news_list:
formatted_article = news.format_article(article)
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN") # Markdown allows to write bold text with * etc.
else:
bot.send_message(chat_id=user_id, text='No news found for your keywords.')
@bot.message_handler(commands=['news', 'News']) # /news -> get news for specific keyword
def send_news(message):
""" Get news for keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/news'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword')
return
if keywords:
for keyword in keywords:
top_news = news.get_top_news_by_keyword(keyword)["articles"]
if top_news == None: # true if request to NewsAPI failed
bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.')
return
if not top_news: # true if no news found for keyword (empty list)
bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN")
return
formatted_article = news.format_article(top_news[0]) # only format and send most popular news
bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN")
@bot.message_handler(commands=['addkeyword', 'Addkeyword']) # /addkeyword -> add keyword to user
def add_keyword(message):
""" Add keyword to user
:type message: message object bot
:param message: message that was reacted to, in this case always '/addkeyword'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to add:')
bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function
def store_keyword(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords
status = api_handler.set_keyword(user_id, keyword) # set keyword in database
if status == 200: # statuscode 200 means keyword was added successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here
else:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info (statuscode {status})')
@bot.message_handler(commands=['removekeyword', 'Removekeyword']) # /removekeyword -> remove keyword from user
def remove_keyword(message):
""" Remove keyword from user
:type message: message object bot
:param message: message that was reacted to, in this case always '/removekeyword'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to remove:')
bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function
def remove_keyword_step(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower()
status = api_handler.delete_keyword(user_id, keyword)
if status == 200: # statuscode 200 means keyword was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here
else:
bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})')
@bot.message_handler(commands=['keywords', 'Keywords']) # /keywords -> get keywords of user
def send_keywords(message):
""" Send keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always '/keywords'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword')
return
else: # send keyword list
keywords_str = ', '.join(keywords)
bot.send_message(chat_id=user_id, text=f'Your keywords are: _{keywords_str}_', parse_mode="MARKDOWN")
@bot.message_handler(commands=['portfolio', 'Portfolio'])
def send_portfolio(message):
""" Send portfolio of user
:type message: message object bot
:param message: message that was reacted to, in this case always '/portfolio'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json
if portfolio == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not portfolio: # true if user is registered but does not have any stocks in portfolio
bot.send_message(chat_id=user_id, text='You do not have any stocks in your portfolio.')
return
else: # send portfolio
for stock in portfolio:
comment = str(stock["comment"]) # comment may be written name of stock, comment is made by user when adding an stock to portfolio
count = "{:.2f}".format(float(stock["count"])) # round count to 2 decimal places
isin = str(stock["isin"])
worth = "{:.2f}".format(float(stock["current_price"]) * float(stock["count"])) # round current_price to 2 decimal places
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWN") # formatted message in markdown
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn may be deleted in future
def set_new_transaction(message):
""" Set new transaction for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/newtransaction'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type "<name of stock>,<isin>,<amount>,<price_per_stock_usd>" (time of transaction will be set to now):')
bot.register_next_step_handler(message, set_new_transaction_step)
def set_new_transaction_step(message):
user_id = int(message.from_user.id)
if not re.match(r"[A-Za-z0-9]+,[A-Za-z0-9]+,[0-9]+(.[0-9]+)?,[0-9]+(.[0-9]+)?", message.text):
bot.send_message(chat_id=user_id, text='Invalid format \n(e.g. Apple,US0378331005,53.2,120.4).\n Try again with /newtransaction.')
return
transaction_data = str(message.text).split(',')
desc = str(transaction_data[0])
isin = str(transaction_data[1])
amount = float(transaction_data[2])
price = float(transaction_data[3])
time = dt.datetime.now().isoformat()
#print("\n\n\n\n\n")
#print(f"{symbol},{amount},{price},{time}")
status = api_handler.set_transaction(user_id, desc, isin, amount, price, time)
if status == 200:
bot.send_message(chat_id=user_id, text='Transaction succesfully added.')
else:
bot.send_message(chat_id=user_id, text=f'Failed adding transaction. (statuscode {status})')
@bot.message_handler(commands=['interval', 'Interval'])
def send_interval(message):
""" send interval for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/interval'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data)
if user_data == None: # true if user is not registered in DB
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info and set an interval with /setinterval')
return
else: # send interval
interval = str(user_data['cron']) # get cron from user data
if interval == 'None': # true if user has no cron set
bot.send_message(chat_id=user_id, text='You do not have an interval set. Set one with /setinterval')
return
formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru
bot.send_message(chat_id=user_id, text=f'Your update interval: {interval} (https://crontab.guru/#{formatted_interval})')
@bot.message_handler(commands=['setinterval', 'Setinterval'])
def set_new_interval(message):
""" Set new interval for user
:type message: message object bot
:param message: message that was reacted to, in this case always '/setinterval'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type interval in cron format:\n(https://crontab.guru/)')
bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message
def set_new_interval_step(message):
user_id = int(message.from_user.id)
interval = str(message.text)
status = api_handler.set_cron_interval(user_id, interval) # send cron to db
if status == 200:
bot.send_message(chat_id=user_id, text='Interval succesfully set.')
return
if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format
bot.send_message(chat_id=user_id, text='Invalid interval format. Try again with\n /setinterval.')
return
else:
bot.send_message(chat_id=user_id, text=f'Failed setting interval. (statuscode {status})')
@bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement
def echo_all(message):
""" Tell that command is not known if it is no known command
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
:raises: none
:rtype: none
"""
answer = 'Do not know this command or text: ' + message.text
bot.reply_to(message, answer)
telebot.logger.setLevel(logging.DEBUG)
@bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging
def query_text(inline_query):
""" Output in the console about current user actions and status of bot
:type inline_query:
:param inline_query:
:raises: none
:rtype: none
"""
try:
r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent('hi'))
r2 = types.InlineQueryResultArticle('2', 'Result2', types.InputTextMessageContent('hi'))
bot.answer_inline_query(inline_query.id, [r, r2])
except Exception as e:
print(e)
def main_loop():
""" Start bot
:raises: none
:rtype: none
"""
bot.infinity_polling()
if __name__ == '__main__':
try:
main_loop()
except KeyboardInterrupt:
print('\nExiting by user request.\n')
sys.exit(0)