GuessThePrice/source/bot.py
2022-06-03 15:35:16 +02:00

803 lines
26 KiB
Python

"""
script for telegram bot and its functions
"""
__author__ = "Florian Kellermann, Linus Eickhoff, Florian Kaiser"
__date__ = "02.05.2022"
__version__ = "0.4.7"
__license__ = "None"
# main bot at http://t.me/guess_the_price_bot
# debug bot at http://t.me/amazondebug_bot
import logging
import os
import re
import sys
import datetime as dt
import time
from xml.dom.pulldom import START_DOCUMENT
import sqlalchemy
import telebot
from dotenv import load_dotenv
from telebot import types
from db import User, session, Product, Score
from fetcher import *
import helper_functions as hf
import scoring
load_dotenv(dotenv_path='.env') # load environment variables
BOT_VERSION = "1.0.0" # version of bot
START_DAY = dt.time(8, 0, 0)
END_DAY = dt.time(22, 0, 0)
bot = telebot.TeleBot(os.getenv('BOT_API_KEY'))
@bot.message_handler(commands=['version', 'Version'])
def send_version(message):
""" Sending program version
Args:
message (Message): Message to react to in this case /version
"""
bot.reply_to(message, "the current bot version is " + BOT_VERSION)
@bot.message_handler(commands=['start', 'Start'])
def send_start(message):
"""send start message to user
Args:
message (Message): message from telegram user, here /start
Returns:
None: None
Raises:
None: None
Test:
type /start as command in telegram as an unregistered user and check if bot responds with start message and waits for name
type /start as command in telegram as a registered user and check if bot responds with informing user about being already registered
"""
if message.from_user.id in session.query(User.telegram_id).all():
bot.reply_to(message, "You are already registered. Type /changename to change your name or /help for an overview of all commands")
return
bot.send_message(chat_id=int(message.from_user.id), text=("Welcome to the game... \
\nTo start please set a name for yourself or type cancel to set generated name:"))
bot.register_next_step_handler(message, start_name_setter)
def start_name_setter(message):
"""Set name for user and send introduction
Args:
message (Message): Message to react to
Returns:
None: None
Raises:
None: None
Test:
check Regex pattern with incorrect patterns and test if these patterns are denied
check Regex pattern with correct patterns and test if these patterns are accepted
check if username is set in database after typing correct name
"""
user_id = int(message.from_user.id)
user_name = ""
if str(message.text).lower() == "cancel": # Set user name to user
user_name = "User" + str(user_id) # generate user name, user can change it with /changename
if not re.match(r'^[a-zA-Z][a-zA-Z0-9\-]+$', str(message.text)): # regex pattern for username: has to start with a letter, can contain letters, numbers and hyphen
bot.reply_to(message, "Name has to be alphanumeric (including -) and start with a letter")
return
user_name = str(message.text)
try:
user = User(telegram_id=user_id, username=user_name, admin=False)
session.add(user) # add user to database
session.commit() # commit changes to database
bot.reply_to(message, f"Thank you for setting your name {user_name} \
\nType /gameinfo for information about GuessThePrice \
\nType /help for an overview of all commands")
except sqlalchemy.exc.IntegrityError:
session.rollback() # rollback changes
bot.reply_to(message, "You are already registered, change name with /changename")
telebot.logger.setLevel(logging.DEBUG)
@bot.message_handler(commands=['help', 'Help'])
def send_help(message):
"""send all commands to user
Args:
message (Message): Message from telegram user, here /help
Returns:
None: None
Raises:
None: None
Test:
type /help as command in telegram and check if bot responds with help message
type /Help <text> as command in telegram and check if bot responds with help message, ignoring the text in message after command
"""
help_message = ("/me get my user info\n"
"/help get this help message\n"
"/gameinfo get game info\n"
"/scoreboard get scoreboard\n"
"/changename change your name\n"
"/daily get todays challenge\n"
"/setAdmin set admin status of user (ADMIN)\n"
"/addproduct add product for further challenges (ADMIN)\n"
"/users get all users (ADMIN)\n")
bot.reply_to(message, help_message)
@bot.message_handler(commands=['gameinfo', 'Gameinfo'])
def send_gameinfo(message):
"""send game info to user
Args:
message (Message): Message from telegram user, here /gameinfo
Returns:
None: None
Raises:
None: None
Test:
type /gameinfo or /Gameinfo as command in telegram and check if bot responds with game info message
type /gameInfo as command in telegramand check if bot responds with unknown command message
"""
gameinfo_message = ("GuessThePrice is a game where you have to guess\n"
"the price of an amazon product.\n"
"Start by setting your name with /changename\n"
"You can get a new challenge every day.\n"
"You are informed when a new challenge is available.\n"
"To see the challenge type /daily\n"
"At 10pm the correct answer will be shown\n"
"See /scoreboard to see how you perform")
bot.reply_to(message, gameinfo_message)
@bot.message_handler(commands=['me', 'Me'])
def send_user_info(message):
"""send user info to user
Args:
message (Message): Message from telegram user, here /me
Returns:
None: None
Raises:
None: None
Test:
type /me as command in telegram as an registered User and check if bot responds with user info message
type /me as command in telegram as an unregistered User and check if bot responds with User does not exist message
"""
user_id = message.from_user.id
user = session.query(User).filter(User.telegram_id==user_id).first() # get user from database, only one user per telegram id exists
user_scores = session.query(Score).filter(Score.telegram_id==user_id).all() # get today's score object for user
today_score = None
today_guess = None
for score in user_scores:
if score.date.date() == dt.datetime.now().date(): # check if today's score is already in database
today_score = score.score
today_guess = score.guess
if today_guess is None:
today_guess = "No guess today"
else:
today_guess = str(today_guess) + ""
if today_score is None:
today_score = "No score today"
if today_score is not None and today_guess is not None:
all_time_score = sum(score.score for score in user_scores)
else:
all_time_score = "No score yet"
if user is not None: # if user is registered
user_name = user.username
user_info = (f"Your user info:\n"
f"User ID: {user_id}\n"
f"Username: {user_name}\n"
f"Today's guess: {today_guess}\n"
f"Today's Score: {today_score}\n"
f"All time Score: {all_time_score}")
else: # if user is not registered
# User not found
user_info = "User does not exist.\nDo /start to register"
bot.reply_to(message, user_info)
@bot.message_handler(commands=['users', 'Users'])
def send_users(message):
"""send user info to user
Args: (Message): Message from telegram user, here /users
Returns:
None: None
Raises:
None: None
Test:
type /users as command in telegram as an registered Admin and check if bot responds with user info messages
type /users as command in telegram as an an registered User where Admin = False and check if bot responds with "Admin rights are required to see all users" message
type /users as command in telegram as an an unregistered User and check if bot responds with "Admin rights are required to see all users" message
"""
user_id = message.from_user.id
# check if user exists
user = session.query(User).filter(User.telegram_id==user_id).first()
if not user or user is None: # should never happen, /start is required to chat, but just in case
bot.reply_to(message, "Error: You are not registered, please register with /start")
return
# Check if user is admin
if not session.query(User).filter(User.telegram_id==user_id).first().admin: # if user is not admin
bot.reply_to(message, "Error: Admin rights are required to see all users.")
return
users = session.query(User).all()
if len(users) == 0:
bot.reply_to(message, "No users registered.")
for user in users:
user_info = (f"Telegram ID: {user.telegram_id}\n"
f"Username: {user.username}\n"
f"Admin: {user.admin}\n")
bot.reply_to(message, user_info)
@bot.message_handler(commands=['setAdmin', 'SetAdmin', 'Setadmin', 'setadmin'])
def set_admin(message):
"""set admin status of user
Args:
message (Message): Message from telegram user, here /setAdmin
Returns:
None: None
Raises:
None: None
Test:
type /setAdmin as command in telegram as an registered Admin and check if bot requests id and boolean for changing admin status
type /setAdmin as command in telegram as an an registered User where Admin = False in db and check if bot responds with "Admin rights are required to change admin status" message
type /setAdmin as command in telegram as an an unregistered User and check if bot responds with "Admin rights are required to change admin status" message
"""
user_id = message.from_user.id
try:
user = session.query(User).filter(User.telegram_id==user_id).first()
if not user.admin: # admin is a boolean
bot.reply_to(message, "Error: Admin rights are required to change admin rights of users.")
return
if user.admin:
bot.reply_to(message, "Type the telegram_id and boolean of admin attribute like <telegram_id> <value>")
bot.register_next_step_handler(message, set_admin_handler)
except sqlalchemy.exc.IntegrityError:
session.rollback()
bot.reply_to(message, "Something went wrong.")
def set_admin_handler(message):
"""set admin status of user
Args:
message (Message): Message from telegram user, here /setAdmin
Returns:
None: None
Raises:
None: None
Test:
type in correct regex pattern and check if bot sets admin status of user correctly
type in wrong regex pattern and check if bot responds with Invalid format message
test with incorrect user id and check if bot responds with User not registered message
"""
if not re.match(r'[0-9]* (True|False|true|false)', str(message.text)):
bot.reply_to(message, "Error: Invalid format. Try again with /setAdmin")
return
telegram_id, admin = str(message.text).split(sep=" ")
user = session.query(User).filter(User.telegram_id==telegram_id).first() # get user from database, only one user per telegram id exists
if user is None or not user:
bot.reply_to(message, "Error: User with entered telegram id is not registered.")
return
try:
if admin in ("True", "true"):
user.admin = True
elif admin in ("False", "false"):
user.admin = False
session.commit() # commit changes to database
bot.reply_to(message, f"Admin rights of user {user.username} set to {user.admin}")
except sqlalchemy.exc.IntegrityError:
session.rollback()
bot.reply_to(message, "Something went wrong")
@bot.message_handler(commands=['scoreboard', 'Scoreboard'])
def send_scoreboard(message):
"""send scoreboard to user
Args:
message (Message): Message from telegram user, here /scoreboard
Returns:
None: None
Raises:
None: None
Test:
type /scoreboard as command in telegram and check if bot responds with scoreboard with correct info and formatting
test with db with no Users and check if bot responds with "No users registered" message
"""
alltime_board = []
weekly_board = []
users = session.query(User).all()
if users is None:
bot.reply_to(message, "No users registered.")
return
# generate alltime scoreboard
for user in users:
telegram_id = user.telegram_id
user_scores = session.query(Score).filter(Score.telegram_id==telegram_id).all() # get all scores of user
if user_scores is None:
continue
alltime_score = sum(score.score for score in user_scores) # sum all scores of user
alltime_board.append((user.username, alltime_score)) # append to alltime scoreboard
# generate weekly scoreboard
for user in users:
telegram_id = user.telegram_id
print(session.query(Score).all())
all_user_scores = session.query(Score).filter(Score.telegram_id==telegram_id).all() # get all user scores
user_scores = None # initialize variable
if all_user_scores is not None:
user_scores = [score for score in all_user_scores if score.date.date().isocalendar().week==dt.date.today().isocalendar().week] # get user scores for today
if user_scores is None:
continue
weekly_score = sum(score.score for score in user_scores)
weekly_board.append((user.username, weekly_score))
# sort scoreboards
alltime_board.sort(key=lambda x: x[1], reverse=True)
weekly_board.sort(key=lambda x: x[1], reverse=True)
str_alltime_board = "*Scoreboard (AllTime)*:\n"
str_weekly_board = "*Scoreboard (Weekly)*:\n"
for user in alltime_board:
str_alltime_board += f"\n{user[1]} _({user[0]})_"
if len(alltime_board) == 0:
bot.reply_to(message, str_alltime_board + "\nNo users have scored yet.", parse_mode='MARKDOWN')
else:
bot.reply_to(message, str_alltime_board, parse_mode='MARKDOWN')
for user in weekly_board:
str_weekly_board += f"\n{user[1]} _({user[0]})_"
if len(weekly_board) == 0:
bot.reply_to(message, str_weekly_board + "\nNo users have scored yet.", parse_mode='MARKDOWN')
else:
bot.reply_to(message, str_weekly_board, parse_mode='MARKDOWN')
@bot.message_handler(commands=['changename', 'Changename'])
def change_name(message):
"""change user name
Args:
message (Message): Message from telegram user, here /changename
Returns:
None: None
Raises:
None: None
Test:
type /changename as command in telegram and check if bot sends change name request message
type /Changename as command in telegram and check if bot sends change name request message
"""
bot.reply_to(message, "type new name (else type \"cancel\"):")
bot.register_next_step_handler(message, change_name_setter) # register next step handler, send message and take users answer as input for change_name_setter
def change_name_setter(message):
"""change user name
Args:
message (Message): Message to react to
Returns:
None: None
Raises:
None: None
Test:
type in correct regex pattern and check if bot changes user name correctly, also in db
type in wrong regex pattern and check if bot responds with Invalid format message
type cancel and check that name is not changed in db and bot responds with "Name not changed" message
"""
if str(message.text).lower() == "cancel": # Set user name to user
bot.reply_to(message, "Name not changed")
return
if not re.match(r'^[a-zA-Z][a-zA-Z0-9\-]+$', str(message.text)): # same pattern as in /start
bot.reply_to(message, "Name has to be alphanumeric (including -) and start with a letter")
return
else:
user_id = int(message.from_user.id)
user_name = str(message.text)
try:
user = session.query(User).filter(User.telegram_id==user_id).first()
user.username = user_name
session.commit()
bot.reply_to(message, f"Your name has been changed to {user_name}")
except sqlalchemy.exc.IntegrityError:
session.rollback()
bot.reply_to(message, "Something went wrong")
def time_in_range(start, end, current):
""" Check if current time is in range of start and end
Args:
start (datetime): start time
end (datetime): end time
current (datetime): current time
Returns:
bool: True if current time is in range of start and end, False otherwise
"""
return start <= current <= end
def find_todays_product_from_db():
"""Find todays product from db based on todays_product
"""
product = None
for element in session.query(Product).all():
if element.todays_product:
product = element
break
return product
@bot.message_handler(commands=['daily', 'Daily'])
def daily_message(message):
"""send daily challenge
Args:
message (Message): Message from telegram user, here /daily
Returns:
None: None
Raises:
None: None
Test:
type /daily as command in telegram and check if bot sends daily challenge message
type /daily again for message that you already played today and your guess
"""
user_id = int(message.from_user.id)
current = dt.datetime.now().time()
if not time_in_range(START_DAY, END_DAY, current):
bot.send_message(chat_id=user_id, text="Currently there is no challenge.\n\n"
"Times are 8am to 10pm.")
return
# Check if user already guessed today by date, time and user_id
all_scores_user = session.query(Score).filter(
Score.telegram_id==user_id
).all()
for element in all_scores_user:
if element.date.date() == dt.datetime.now().date():
bot.send_message(chat_id=user_id, text="You already guessed today!\n"
"Your guess was: {}".format(element.guess))
return
bot.send_message(chat_id = user_id, text="Welcome to todays challenge!\n"
"As soon as the picture loads\n"
"you will have 20 seconds to send\n"
"your price guess\n")
time.sleep(2)
bot.send_message(chat_id = user_id, text="Lets Go!")
time.sleep(1)
for i in range(3):
iteration = 3-i
bot.send_message(chat_id=user_id, text=str(iteration))
iteration-=1
time.sleep(1)
try:
product_for_today=find_todays_product_from_db()
bot.send_message(chat_id=user_id, text=str(
hf.make_markdown_proof(product_for_today.image_link)
), parse_mode="MARKDOWNV2")
bot.send_message(chat_id=user_id, text=product_for_today.title)
start_time = time.time()
# next step with message and start time
bot.register_next_step_handler(message, get_user_guess, start_time)
except (TypeError, AttributeError) as exception_message:
print(exception_message)
bot.send_message(chat_id=user_id, text="An Error occured. Please try again later.")
def get_time_difference(start_time, end_time):
"""Get time difference
"""
return end_time - start_time
def get_user_guess(message, start_time):
"""Get users guess after typing /daily
Args:
message (Message): Message element to react to. In this case next step after /daily
start_time (time.time): Time the user got the image
Test:
Send a price and see if the bot responds correctly (guess accepted)
Send text withwith wrong format (right format = number fitting for float) and see if bot notices and gives you another try
See if score changes after you guessed (only if you guessed somewhat correctly so your score is not 0)
"""
end_time = time.time()
user_id = int(message.from_user.id)
over_time = False #If user guesses to slow (> 20 seconds)
try:
user_guess = float(message.text.replace( ',', '.'))
except ValueError:
bot.send_message(chat_id=user_id, text="Please type a number or float")
bot.register_next_step_handler(message, get_user_guess, start_time)
return
if get_time_difference(start_time, end_time) > 20:
bot.send_message(chat_id=user_id, text="You took too long to guess.\n"
"No more tries today.")
over_time = True # user guesses to slow
product_for_today=find_todays_product_from_db()
if not over_time: # when answer was in time
message_text=f"You guessed {round(user_guess,2)}"
bot.send_message(chat_id=user_id, text = message_text)
# calculate score for guess
if not over_time:
user_score = scoring.eval_score(product_for_today.price, user_guess)
else:
user_score = 0
user_guess = 0
score = Score (
telegram_id = user_id,
date = dt.date.today(),
product_id = product_for_today.product_id,
guess = user_guess,
score = user_score
)
session.add(score)
session.commit()
@bot.message_handler(commands=['addproduct', 'Addproduct'])
def add_product(message):
"""Add product to database
Args:
message (Message): Message from telegram user, here /addproduct
Returns:
None: None
Raises:
None: None
"""
user_id = message.from_user.id
# Check if user is admin
if not session.query(User).filter(User.telegram_id == user_id).first().admin:
bot.reply_to(message, "Error: Admin rights are required to add products")
return
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Please insert the Amazon product id (i.e. B00XKZYZ2S)')
bot.register_next_step_handler(message, receive_product_data) # executes function when user sends message
def receive_product_data(message):
""" registering new product in the db and fetching it from amazon
Args:
message (Message): Message that is being reacted to. Always from add_product because of next_step_handler
"""
user_id = int(message.from_user.id)
product_id = str(message.text)
product_src = fetch_url('https://www.amazon.de/dp/' + product_id)
title = get_title(product_src)
image_url = get_image(product_src)
price = get_price(product_src)
description = get_description(product_src)
bot.send_message(chat_id=user_id, text=title)
bot.send_message(chat_id=user_id, text=image_url)
bot.send_message(chat_id=user_id, text=price)
bot.send_message(chat_id=user_id, text=description)
# markup = InlineKeyboardMarkup()
# markup.row_width = 2
# markup.add(InlineKeyboardButton("Yes", callback_data="cb_yes"),
# InlineKeyboardButton("No", callback_data="cb_no"))
#
# bot.send_message(chat_id=user_id, text="Is this the product you want to add?", reply_markup=markup)
# Insert into database
try:
product = Product(
product_id=product_id,
title=title,
image_link=image_url,
price=price[0],
currency=price[1],
description=description
)
session.add(product)
session.commit()
bot.send_message(chat_id=user_id, text='Successfully added product to database')
except sqlalchemy.exc.IntegrityError:
session.rollback()
bot.send_message(chat_id=user_id, text='Product is in database already')
except pymysql.err.OperationalError:
session.rollback()
bot.send_message(chat_id=user_id, text='Unknown error')
@bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
if call.data == "cb_yes":
bot.answer_callback_query(call.id, "Answer is Yes")
elif call.data == "cb_no":
bot.answer_callback_query(call.id, "Answer is No")
@bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement
def echo_all(message):
"""Echo all other messages
Args:
message (Message): user message that doesnt match any of the commands
"""
answer = 'Do not know this command or text: ' + message.text
bot.reply_to(message, answer)
# inline prints for debugging
@bot.inline_handler(lambda query: query.query == 'text')
def query_text(inline_query):
"""inline query handler for debugging
Args:
inline_query (InlineQuery): inline query from telegram user
Returns:
None: None
Raises:
None: None
"""
try:
r = types.InlineQueryResultArticle('1', 'Result1', types.InputTextMessageContent(
'hi')) # pylint: disable=invalid-name
r2 = types.InlineQueryResultArticle(
'2', 'Result2', types.InputTextMessageContent('hi')) # pylint: disable=invalid-name
bot.answer_inline_query(inline_query.id, [r, r2])
except Exception as e: # pylint: disable=broad-except, invalid-name
print(e)
def main_loop():
"""main loop for bot
Args:
None: None
Returns:
None: None
Raises:
None: None
"""
bot.infinity_polling()
if __name__ == '__main__':
try:
main_loop()
except KeyboardInterrupt:
print('\nExiting by user request.\n')
sys.exit(0)