New try for bot and bot_updates #80

Merged
NormalParameter merged 19 commits from bot into main 2022-04-26 06:40:42 +00:00
5 changed files with 221 additions and 181 deletions

View File

@ -12,9 +12,9 @@ __license__ = "None"
import sys
import os
import requests as r
from croniter import croniter
from croniter import croniter # used for checking cron formatting
# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
class API_Handler:
"""class for interacting with the api webservice
@ -37,6 +37,7 @@ class API_Handler:
set_portfolio(user_id, portfolio): sets the portfolio of the user
delete_portfolio(user_id, portfolio): deletes the portfolio of the user
set_cron_interval(user_id, interval): sets the cron interval of the user
set_admin(email, is_admin): sets the admin status of the user with the given email
"""
@ -50,17 +51,17 @@ class API_Handler:
"""
self.db_adress = db_adress
payload = {'email': email, 'password': password}
with r.Session() as s:
p = s.post(self.db_adress + "/user/login", json=payload)
payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
if p.status_code == 200:
self.token = p.json()["data"]['token']
self.token = p.json()["data"]['token'] # store token for further authentication of requests
else:
print("Error: " + str(p.status_code) + " invalid credentials")
self.token = None
def reauthorize(self, email, password):
def reauthorize(self, email, password): # can be used if token expired
"""set new credentials
Args:
@ -78,7 +79,7 @@ class API_Handler:
return None
def get_user(self, user_id, max_retries=10):
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
"""gets the shares of the user
Args:
@ -92,13 +93,13 @@ class API_Handler:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + "/user", headers=headers)
if(req.status_code == 200):
return req.json()["data"]
else:
return self.get_user(user_id, max_retries-1)
return self.get_user(user_id, max_retries-1) # if request fails try again recursively
def get_all_users(self, max_retries=10):
@ -142,10 +143,10 @@ class API_Handler:
req = s.get(self.db_adress + "/keywords", headers=headers)
if(req.status_code == 200):
keywords_json = req.json()["data"]
for keyword in keywords_json:
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword["keyword"])
return keywords
return keywords # will be empty if no keywords are set
else:
return self.get_user_keywords(user_id, max_retries-1)
@ -206,7 +207,7 @@ class API_Handler:
shares_json = req.json()["data"]
shares = []
for share in shares_json:
shares.append(share["symbol"])
shares.append(share["isin"]) # we only want the isin of the shares
return shares
@ -214,23 +215,24 @@ class API_Handler:
return self.get_user_shares(user_id, max_retries-1)
def set_share(self, user_id, symbol):
def set_share(self, user_id, isin, comment):
"""sets the share of the user
Args:
user_id (int): id of the user
symbol (string): symbol of the share
isin (string): isin of the share
comment (string): comment of the share
Returns:
int: status code
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.post(self.db_adress + "/share", json={"symbol": symbol}, headers=headers)
req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
return req.status_code
def delete_share(self, user_id, symbol):
def delete_share(self, user_id, isin):
"""deletes the share of the user
Args:
@ -242,7 +244,7 @@ class API_Handler:
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.delete(self.db_adress + "/share", json={"symbol": symbol}, headers=headers)
req = s.delete(self.db_adress + "/share", json={"isin": isin}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code
@ -277,16 +279,16 @@ class API_Handler:
user_id (int): id of the user
comment (string): comment of the transaction
isin (string): isin of the transaction
count (int): count of the transaction
count (float): count of the transaction
price (float): price of the transaction
time (string): time of the transaction
time (string): time of the transaction formatted like e.g. "2011-10-05T14:48:00.000Z"
Returns:
int: status code
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
transaction = {"comment": comment, "count": count, "isin": isin, "price": price, "time": time}
transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
return req.status_code
@ -306,9 +308,9 @@ class API_Handler:
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/portfolio", headers=headers)
req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()["data"]
portfolio_dict = req.json()["data"] # get the data of the JSON
return portfolio_dict
else:
return self.get_user_portfolio(user_id, max_retries-1)
@ -323,28 +325,46 @@ class API_Handler:
Returns:
int: status code
"""
if not croniter.is_valid(cron_interval):
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print("Error: Invalid cron format")
return -1
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.put(self.db_adress + "/user/setCron", json={"cron": cron_interval}, headers=headers)
req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code
if __name__ == "__main__":
def set_admin(self, email, is_admin):
"""sets the admin of the user
Args:
email (string): email of the user
is_admin (String): "true" if user should be Admin, "false" if not
Returns:
int: status code
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + "/user/setAdmin", json={"admin": str(is_admin),"email": str(email)})
return req.status_code
if __name__ == "__main__": # editable, just for basic on the go testing of new functions
print("This is a module for the telegram bot. It is not intended to be run directly.")
handler = API_Handler("https://gruppe1.testsites.info/api", "bot@example.com", "bot")
print(handler.token)
keywords = handler.get_user_keywords(user_id = 1709356058) #user_id is currently mine (Linus)
keywords = handler.get_user_keywords(user_id = 1709356058) #user_id here is currently mine (Linus)
print(keywords)
shares = handler.get_user_portfolio(user_id = 1709356058)
print("set cron with status: "+ str(handler.set_cron_interval(user_id = 1709356058, cron_interval = "0 0 * * *")))
user = handler.get_user(user_id = 1709356058)
print(user)
all_users = handler.get_all_users()
admin_status = handler.set_admin("test@test.com", "true")
print(admin_status)
print(all_users)
print(shares)
sys.exit(1)

View File

@ -14,6 +14,7 @@ __license__ = "None"
# API Documentation https://core.telegram.org/bots/api
# Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started
import email
import os
import telebot
@ -32,16 +33,15 @@ from dotenv import load_dotenv
from api_handling.api_handler import API_Handler
load_dotenv(dotenv_path='.env')
load_dotenv(dotenv_path='.env') # load environment variables
bot_version = "1.0.1"
user_list = []
bot_version = "1.0.1" # version of bot
#create api handler
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars.
print("Webserver Token: " + str(api_handler.token))
class User: # Currently saving users in this class to test functionality -> later database
class User: # Currently saving users in this class to test functionality -> later database REMOVABLE
def __init__(self, p_user_id, p_user_name, p_chat_id):
""" Initialize a new user
@ -79,14 +79,6 @@ def send_start(message):
:rtype: none
"""
new_user = User(int(message.from_user.id), message.from_user.first_name, int(message.chat.id))
existing_already = False
for known_user in user_list:
if known_user.user_id == new_user.user_id:
existing_already = True
if existing_already == False:
user_list.append(new_user)
bot.reply_to(message, "Welcome to this share bot project. Type /help to get information on what this bot can do")
@ -101,11 +93,11 @@ def send_version(message):
:rtype:none
"""
bot.reply_to(message, bot_version)
bot.reply_to(message, "the current bot version is " + bot_version)
@bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions
def send_welcome(message):
def send_help(message):
""" Send all functions
:type message: message object bot
@ -115,7 +107,7 @@ def send_welcome(message):
:rtype: none
"""
bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/users see all users.\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n/portfolio see own portfolio\n/newtransaction add new transaction\n/interval get update interval\n/setinterval set update interval\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN')
bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/setAdmin set admin rights of user (ADMIN)\n/users see all users. (ADMIN)\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/share get price of specific share\n/portfolio see own portfolio\n/newtransaction add new transaction\n/interval get update interval\n/setinterval set update interval\n_For further details see https://gruppe1.testsites.info _", parse_mode='MARKDOWN')
@bot.message_handler(commands=['users', 'Users']) # /users -> sending all users
@ -129,16 +121,67 @@ def send_all_users(message):
:rtype: none
"""
print('Debug: users command')
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
# tbd check if user is admin
user_list = api_handler.get_all_users()
user_count = len(user_list)
bot.send_message(chat_id=user_id, text="There are " + str(user_count) + " users in the database:")
answer = 'Current number of users: ' + str(len(user_list))
bot.send_message(chat_id = user_id, text=answer)
for known_user in user_list:
answer = str(known_user.user_id) + ' : ' + known_user.user_name
bot.send_message(chat_id=user_id, text=answer)
for user in user_list:
username = user['username']
email = user['email']
id = user['telegram_user_id']
cron = user['cron']
admin = user['admin']
bot.send_message(chat_id=user_id, text=f'Username: {username}\nEmail: {email}\nID: {id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['setAdmin', 'SetAdmin']) # set admin rights to user TBD: not working!!
def set_admin(message):
""" Set admin rights to user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/setAdmin'
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
bot.send_message(chat_id=user_id, text='send email and "true" if this account should have admin rights, else "false"\n in format: <email>,<is_admin>') # ask for email and admin rights to set
bot.register_next_step_handler(message, set_admin_step)
def set_admin_step(message):
str_message = str(message.text)
args_message = str_message.split(',') # split message into email and admin rights
if len(args_message) != 2: # make sure 2 args (email,is_admin) are given
bot.reply_to(message, "exactly 2 arguments (<email>,<is_admin>) required, try again")
return
email = args_message[0]
is_admin = args_message[1]
status = api_handler.set_admin(email, is_admin) # set admin in db
if(status == 200):
bot.reply_to(message, "Admin rights set")
else:
bot.reply_to(message, f"Admin rights could not be set ({status})")
@bot.message_handler(commands=['me', 'Me']) # /me -> sending user info
@ -152,12 +195,16 @@ def send_user(message):
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id) # tbd: formatting
if not user_data or user_data == None:
user_data = api_handler.get_user(user_id)
if not user_data or user_data == None: # true if user is not registered
bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info")
return
bot.reply_to(message, 'Your user data:\n' + str(user_data))
username = user_data['username']
email = user_data['email']
user_id = user_data['telegram_user_id']
cron = user_data['cron']
admin = user_data['admin']
bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool
@ -175,7 +222,7 @@ def send_id(message):
bot.reply_to(message, answer)
#function that sends telegram status(running or offline) as message from telegram bot to user
#function that can be used to ensure that the bot is online and running
@bot.message_handler(commands=['status', 'Status'])
def send_status(message):
@ -272,28 +319,26 @@ def send_all_news(message):
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None:
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords:
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /news')
return
keywords_search = ' OR '.join(keywords)
print(keywords_search)
now = dt.datetime.now().date()
from_date = now - dt.timedelta(days=7)
keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc.
now = dt.datetime.now().date() # get current date
from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max
from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d')
print(from_date_formatted)
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"]
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects
if news_list:
if news_list: # true if news_list is not empty
for article in news_list:
formatted_article = news.format_article(article)
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN")
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWN") # Markdown allows to write bold text with * etc.
else:
bot.send_message(chat_id=user_id, text='No news found for your keywords.')
@ -310,27 +355,27 @@ def send_news(message):
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None:
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords:
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword')
return
if keywords:
for keyword in keywords:
top_news = news.get_top_news_by_keyword(keyword)["articles"]
if top_news == None:
if top_news == None: # true if request to NewsAPI failed
bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.')
return
if not top_news:
bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN")
return
formatted_article = news.format_article(top_news[0])
if not top_news: # true if no news found for keyword (empty list)
bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWN")
else:
formatted_article = news.format_article(top_news[0]) # only format and send most popular news
bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWN")
@ -346,15 +391,14 @@ def add_keyword(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to add:')
bot.register_next_step_handler(message, store_keyword)
bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function
def store_keyword(message):
user_id = int(message.from_user.id)
print(str(user_id))
keyword = str(message.text).lower()
status = api_handler.set_keyword(user_id, keyword)
if status == 200:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.')
keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords
status = api_handler.set_keyword(user_id, keyword) # set keyword in database
if status == 200: # statuscode 200 means keyword was added successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here
else:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info (statuscode {status})')
@ -371,14 +415,14 @@ def remove_keyword(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to remove:')
bot.register_next_step_handler(message, remove_keyword_step)
bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function
def remove_keyword_step(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower()
status = api_handler.delete_keyword(user_id, keyword)
if status == 200:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.')
if status == 200: # statuscode 200 means keyword was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here
else:
bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})')
@ -394,19 +438,19 @@ def send_keywords(message):
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id)
if keywords == None:
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords:
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword')
return
else:
else: # send keyword list
keywords_str = ', '.join(keywords)
bot.send_message(chat_id=user_id, text=f'Your keywords are: _{keywords_str}_', parse_mode="MARKDOWN")
@bot.message_handler(commands=['portfolio', 'Portfolio']) #tbd
@bot.message_handler(commands=['portfolio', 'Portfolio'])
def send_portfolio(message):
""" Send portfolio of user
:type message: message object bot
@ -417,23 +461,23 @@ def send_portfolio(message):
:rtype: none
"""
user_id = int(message.from_user.id)
portfolio = api_handler.get_user_portfolio(user_id)
if portfolio == None:
portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json
if portfolio == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not portfolio:
if not portfolio: # true if user is registered but does not have any stocks in portfolio
bot.send_message(chat_id=user_id, text='You do not have any stocks in your portfolio.')
return
else:
else: # send portfolio
for stock in portfolio:
comment = str(stock["comment"])
count = "{:.2f}".format(float(stock["count"]))
comment = str(stock["comment"]) # comment may be written name of stock, comment is made by user when adding an stock to portfolio
count = "{:.2f}".format(float(stock["count"])) # round count to 2 decimal places
isin = str(stock["isin"])
worth = "{:.2f}".format(float(stock["current_price"]) * float(stock["count"]))
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWN")
worth = "{:.2f}".format(float(stock["current_price"]) * float(stock["count"])) # round current_price to 2 decimal places
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWN") # formatted message in markdown
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn may be deleted in future
def set_new_transaction(message):
""" Set new transaction for user
:type message: message object bot
@ -470,7 +514,7 @@ def set_new_transaction_step(message):
bot.send_message(chat_id=user_id, text=f'Failed adding transaction. (statuscode {status})')
@bot.message_handler(commands=['interval', 'Interval']) #tbd
@bot.message_handler(commands=['interval', 'Interval'])
def send_interval(message):
""" send interval for user
:type message: message object bot
@ -481,20 +525,20 @@ def send_interval(message):
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if user_data == None:
user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data)
if user_data == None: # true if user is not registered in DB
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info and set an interval with /setinterval')
return
else:
interval = str(user_data['cron'])
if interval == 'None':
else: # send interval
interval = str(user_data['cron']) # get cron from user data
if interval == 'None': # true if user has no cron set
bot.send_message(chat_id=user_id, text='You do not have an interval set. Set one with /setinterval')
return
formatted_interval = str(interval).replace(' ', '_')
formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru
bot.send_message(chat_id=user_id, text=f'Your update interval: {interval} (https://crontab.guru/#{formatted_interval})')
@bot.message_handler(commands=['setinterval', 'Setinterval']) #tbd
@bot.message_handler(commands=['setinterval', 'Setinterval'])
def set_new_interval(message):
""" Set new interval for user
:type message: message object bot
@ -506,19 +550,19 @@ def set_new_interval(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type interval in cron format:\n(https://crontab.guru/)')
bot.register_next_step_handler(message, set_new_interval_step)
bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message
def set_new_interval_step(message):
user_id = int(message.from_user.id)
interval = str(message.text)
status = api_handler.set_cron_interval(user_id, interval)
status = api_handler.set_cron_interval(user_id, interval) # send cron to db
if status == 200:
bot.send_message(chat_id=user_id, text='Interval succesfully set.')
return
if status == -1: # only -1 when interval is invalid
if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format
bot.send_message(chat_id=user_id, text='Invalid interval format. Try again with\n /setinterval.')
return
else:

View File

@ -1,37 +0,0 @@
"""
script for regularly sending updates on shares and news based on user interval
"""
__author__ = "Florian Kellermann, Linus Eickhoff"
__date__ = "05.04.2022"
__version__ = "0.0.1"
__license__ = "None"
import shares.share_fetcher as share_fetcher
import news.news_fetcher as news_fetcher
import datetime
import sys
from apscheduler.schedulers.blocking import BlockingScheduler
'''
* * * * * code
weekday (0->Monday, 7->Sunday)
Month (1-12)
Day (1-31)
Hour (0-23)
Minute (0-59)
example 0 8 * * * -> daily update at 8am
'''
def user_updates():
"""sends timed updates automatically to user
Args:
Returns:
"""
return

View File

@ -8,14 +8,18 @@ __license__ = "None"
from calendar import month
from symtable import Symbol
from dotenv import load_dotenv
from shares.share_fetcher import get_share_price
import news.news_fetcher as news_fetcher
import time
import datetime
import os
from bot import bot
import sys
from multiprocessing import Process
from apscheduler.schedulers.background import BackgroundScheduler
from api_handling.api_handler import API_Handler
from news.news_fetcher import format_article
'''
@ -33,24 +37,21 @@ example 0 8 * * * -> daily update at 8am
user_ids = []
user_crontab = []
def main_loop():
""" main loop for regularly sending updates
load_dotenv(dotenv_path='.env')
def start_updater():
""" starting function for regularly sending updates
:raises: none
:rtype: none
"""
current_time_datetime = datetime.datetime.now()
my_handler = API_Handler("https://gruppe1.testsites.info/api", "bot@example.com", "bot")
my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))
update_crontab(my_handler)
# update_for_user(5270256395, my_handler) # Debug (running test update for kevins shares)
update_crontab(current_time_datetime, my_handler)
def update_crontab(pCurrent_Time, p_my_handler):
def update_crontab(p_my_handler):
""" Updating crontab lists every hour
:type pCurrent_Time: time when starting crontab update
:param pCurrent_Time: datetime
@ -63,8 +64,6 @@ def update_crontab(pCurrent_Time, p_my_handler):
global user_crontab
global user_ids
#p_my_handler.set_cron_interval(user_id = 1770205310, cron_interval = "23 08 * * *")
all_users = p_my_handler.get_all_users()
user_ids = []
@ -75,11 +74,9 @@ def update_crontab(pCurrent_Time, p_my_handler):
user_ids.append(int(element["telegram_user_id"]))
user_crontab.append(str(element["cron"]))
print(user_ids)
update_based_on_crontab(user_ids, user_crontab, p_my_handler)
update_crontab(datetime.datetime.now(), p_my_handler)
update_crontab(p_my_handler)
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
@ -101,11 +98,8 @@ def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
my_scheduler = BackgroundScheduler()
print(len(user_ids)) #Debug
for i in range(len(p_user_ids)):
cron_split = p_user_crontab[i].split(" ")
print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2])
my_scheduler.add_job(update_for_user, 'cron', day_of_week = cron_split[4] , hour= cron_split[1] , minute = cron_split[0], month= cron_split[3] , day=cron_split[2], args=(p_user_ids[i], p_my_handler ))
my_scheduler.start()
@ -140,7 +134,7 @@ def update_for_user(p_user_id, p_my_handler):
share_courses.append(element["current_price"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update:"%str(my_user["username"]), pUser_id=p_user_id)
send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id)
if len(share_symbols) != 0:
for i in range(len(share_symbols)):
@ -149,8 +143,18 @@ def update_for_user(p_user_id, p_my_handler):
else:
send_to_user("No shares found for your account. Check https://gruppe1.testsites.info/api to change your settings and add shares.", pUser_id=p_user_id)
keywords = p_my_handler.get_user_keywords(p_user_id) # get keywords as array
def send_to_user(pText, pUser_id = 1770205310):
if(keywords): # if keywords exist and array is not empty
send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id)
for keyword in keywords:
news = news_fetcher.get_top_news_by_keyword(keyword)["articles"][0] # only use the most popular news
news_formatted = news_fetcher.format_article(news) # format for message
send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True) # send news with related keyword in Markdown
def send_to_user(pText, pUser_id , md_mode = False):
""" Send message to user
:type pText: string
@ -159,18 +163,23 @@ def send_to_user(pText, pUser_id = 1770205310):
:type pUser_id: int
:param pUser_id: user to send to. per default me (Florian Kellermann)
:type md_mode: boolean
:param md_mode: if true, parse_mode is markdown
:raises: none
:rtype: none
"""
if md_mode:
bot.send_message(chat_id=pUser_id, text=pText, parse_mode="MARKDOWN")
else:
bot.send_message(chat_id=pUser_id, text=pText)
if __name__ == "__main__":
print('bot_updates.py starting.')
if __name__ == "__main__":
try:
main_loop()
start_updater()
sys.exit(-1)
except KeyboardInterrupt:
print("Ending")

View File

@ -15,12 +15,13 @@ import datetime as dt
from newsapi import NewsApiClient
from dotenv import load_dotenv
load_dotenv()
load_dotenv() # loads environment vars
# Init
api_key = os.getenv('NEWS_API_KEY')
newsapi = NewsApiClient(api_key=api_key)
api_key = os.getenv('NEWS_API_KEY') # get API Key from .env file
newsapi = NewsApiClient(api_key=api_key) # news api from https://newsapi.org/
try:
# get all available news sources (e.g BBC, New York Times, etc.)
source_json = requests.get(f"https://newsapi.org/v2/top-headlines/sources?apiKey={api_key}&language=en").json()
sources = source_json["sources"]
str_sources = ",".join([source["id"] for source in sources])
@ -38,7 +39,7 @@ def get_all_news_by_keyword(keyword, from_date="2000-01-01"):
Returns:
JSON/dict: dict containing articles
"""
top_headlines = newsapi.get_everything(q=keyword, sources=str_sources, language='en', from_param=from_date)
top_headlines = newsapi.get_everything(q=keyword, sources=str_sources, language='en', from_param=from_date) # keywords can be combined with OR (e.g. keyword = "bitcoin OR ethereum")
if(top_headlines["status"] == "ok"):
return top_headlines
else:
@ -53,7 +54,7 @@ def get_top_news_by_keyword(keyword):
Returns:
JSON/dict: dict containing articles
"""
top_headlines = newsapi.get_top_headlines(q=keyword, sources=str_sources, language='en')
top_headlines = newsapi.get_top_headlines(q=keyword, sources=str_sources, language='en') # get top headlines, measured by popularity from NewsApi
if(top_headlines["status"] == "ok"):
return top_headlines
else:
@ -72,11 +73,11 @@ def format_article(article):
sourcename = article["source"]["name"]
headline = article["title"]
url = article["url"]
formatted_article = f"_{sourcename}_\n*{headline}*\n\n{url}"
formatted_article = f"_{sourcename}_\n*{headline}*\n\n{url}" # formatting in Markdown syntax
return formatted_article
if __name__ == '__main__':
if __name__ == '__main__': # only execute if script is called directly -> for simple testing
print("this is a module and should not be run directly")
print("fetching top news by keyword bitcoin...")
@ -84,4 +85,7 @@ if __name__ == '__main__':
articles = get_all_news_by_keyword("bitcoin")
formatted_article = format_article(articles["articles"][0])
print(formatted_article)
articles = get_top_news_by_keyword("bitcoin")
formatted_article = format_article(articles["articles"][0])
print(formatted_article)
sys.exit(1)