Reformatting

This commit is contained in:
Administrator 2022-05-11 23:33:48 +02:00
parent 24eb954856
commit 397dd23b8d
21 changed files with 424 additions and 439 deletions

View File

@ -3,24 +3,26 @@
Aktienbot API
## Development
1. Create virtual environment `python -m venv venv env/Scripts/activate`
2. Install requirements `pip install -r api/requirements.txt`
3. Set environment variables (see list below)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
4. Run api `python api/app.py`
## Testing
1. Create virtual environment `python -m venv venv env/Scripts/activate`
2. Install requirements `pip install -r api/requirements.txt`
3. Set environment variables (see list below)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
4. Change directory: `cd api/`
5. Run tests: `python -m pytest -v --cov-report term-missing --cov=app`
## Environment variables
```
# Flask secret key
SECRET_KEY=
@ -34,6 +36,7 @@ Aktienbot API
```
## Docker
```
docker run -d \
--name aktienbot_api \
@ -48,4 +51,5 @@ docker run -d \
--restart unless-stopped \
registry.flokaiser.com/aktienbot/api:latest
```
or load environment variables from file by using `--env-file <filename>`

View File

@ -4,21 +4,19 @@ __credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin P
__license__ = "GPL 3.0"
__version__ = "1.0.0"
from flask import current_app
from apiflask import APIFlask
from dotenv import load_dotenv
from flask_cors import CORS
from app.blueprints.keyword import keyword_blueprint
from app.blueprints.portfolio import portfolio_blueprint
from app.blueprints.shares import shares_blueprint
from app.blueprints.share_price import share_price_blueprint
from app.blueprints.transactions import transaction_blueprint
from app.blueprints.shares import shares_blueprint
from app.blueprints.telegram import telegram_blueprint
from app.blueprints.transactions import transaction_blueprint
from app.blueprints.user import users_blueprint
from app.helper_functions import hash_password
from app.models import *
from dotenv import load_dotenv
from flask import current_app
from flask_cors import CORS
def create_app(config_filename=None):

View File

@ -4,10 +4,9 @@ __credits__ = ["Florian Kaiser", "Florian Kellermann", "Linus Eickhof", "Kevin P
__license__ = "GPL 3.0"
__version__ = "1.0.0"
from flask import current_app
import jwt
from apiflask import HTTPTokenAuth
from flask import current_app
auth = HTTPTokenAuth()

View File

@ -7,12 +7,11 @@ __version__ = "1.0.0"
import os
from apiflask import APIBlueprint, abort
from app.auth import auth
from app.db import database as db
from app.helper_functions import make_response, get_email_or_abort_401
from app.auth import auth
from app.schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
from app.models import Keyword
from app.schema import KeywordResponseSchema, KeywordSchema, DeleteSuccessfulSchema
keyword_blueprint = APIBlueprint('keyword', __name__, url_prefix='/api')
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))

View File

@ -15,9 +15,9 @@ SECRET_KEY = os.getenv('SECRET_KEY', 'secret string')
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 10,
'pool_recycle': 60,
'pool_pre_ping': True
'pool_size': 10,
'pool_recycle': 60,
'pool_pre_ping': True
}
# openapi/Swagger config

View File

@ -15,8 +15,8 @@ SECRET_KEY = os.getenv('SECRET_KEY', 'secret string')
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://" + os.getenv('MYSQL_USER') + ":" + os.getenv('MYSQL_PASSWORD') + "@" + os.getenv('MYSQL_HOST') + ":" + os.getenv('MYSQL_PORT', '3306') + "/" + os.getenv('MYSQL_NAME', 'aktienbot_test')
SQLALCHEMY_TRACK_MODIFICATIONS = False # Avoids SQLAlchemy warning
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 100,
'pool_recycle': 240 # 4 minutes
'pool_size': 100,
'pool_recycle': 240 # 4 minutes
}
# openapi/Swagger config

View File

@ -6,9 +6,8 @@ __version__ = "1.0.0"
import pytest
from app import create_app, db
from app.models import User, Transaction, Keyword, Share
from app.helper_functions import hash_password
from app.models import User, Transaction, Keyword, Share
@pytest.fixture(scope='module')

View File

@ -8,6 +8,7 @@ __version__ = "1.0.0"
This file (test_keyword.py) contains the functional tests for the `keyword` blueprint.
"""
import json
from tests.functional.helper_functions import get_token

View File

@ -8,6 +8,7 @@ __version__ = "1.0.0"
This file (test_portfolio.py) contains the functional tests for the `portfolio` blueprint.
"""
import json
from tests.functional.helper_functions import get_token

View File

@ -8,6 +8,7 @@ __version__ = "1.0.0"
This file (test_share.py) contains the functional tests for the `share` blueprint.
"""
import json
from tests.functional.helper_functions import get_token

View File

@ -8,6 +8,7 @@ __version__ = "1.0.0"
This file (test_telegram.py) contains the functional tests for the `telegram` blueprint.
"""
import json
from tests.functional.helper_functions import get_token

View File

@ -8,6 +8,7 @@ __version__ = "1.0.0"
This file (test_transaction.py) contains the functional tests for the `transaction` blueprint.
"""
import json
from tests.functional.helper_functions import get_token

View File

@ -3,15 +3,17 @@
Aktienbot telegram bot
## Development
1. Create virtual environment `python -m venv venv`
2. Launch venv: `.\venv\Scripts\activate`
3. Install requirements `pip install -r telegram_bot/requirements.txt`
4. Set environment variables (see list below)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
1. Use `.env`-file in `api` directory like `.env.example`
2. Or set variables using `export` or `set` commands. (Windows `set`, Linux `export`)
5. Run api `python telegram_bot/bot.py`
## Environment variables
```
# Telegram bot api key
BOT_API_KEY=
@ -21,6 +23,7 @@ Aktienbot telegram bot
```
## Docker
```
docker run -d \
--name aktienbot_bot \
@ -31,4 +34,5 @@ docker run -d \
--restart unless-stopped \
registry.flokaiser.com/aktienbot/bot:latest
```
or load environment variables from file by using `--env-file <filename>`

View File

@ -6,16 +6,18 @@ __date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"
#side-dependencies: none
#Work in Progress
# side-dependencies: none
# Work in Progress
import sys
import os
import sys
import requests as r
from croniter import croniter # used for checking cron formatting
from croniter import croniter # used for checking cron formatting
from dotenv import load_dotenv
load_dotenv() # loads environment vars
load_dotenv() # loads environment vars
# note: for more information about the api visit swagger documentation on https://gruppe1.testsites.info/api/docs#/
@ -43,7 +45,6 @@ class API_Handler:
set_admin(email, is_admin): sets the admin status of the user with the given email
"""
def __init__(self, db_adress, email, password):
"""initializes the API_Handler class
@ -54,17 +55,16 @@ class API_Handler:
"""
self.db_adress = db_adress
payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
payload = {'email': email, 'password': password} # credentials for admin account that has all permissions to get and set data (in this case bot account)
with r.Session() as s: # open session
p = s.post(self.db_adress + "/user/login", json=payload) # login to webservice
if p.status_code == 200:
self.token = p.json()["data"]['token'] # store token for further authentication of requests
self.token = p.json()["data"]['token'] # store token for further authentication of requests
else:
print("Error: " + str(p.status_code) + " invalid credentials")
self.token = None
def reauthorize(self, email, password): # can be used if token expired
def reauthorize(self, email, password): # can be used if token expired
"""set new credentials
Args:
@ -87,8 +87,7 @@ class API_Handler:
self.token = None
return None
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
def get_user(self, user_id, max_retries=10): # max retries are used recursively if the request fails
"""gets the shares of the user
Args:
@ -105,14 +104,13 @@ class API_Handler:
return None
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)} # authorization is bot_token:user_id (user_id is the id of the user you want to get data from)
req = s.get(self.db_adress + "/user", headers=headers)
if(req.status_code == 200):
if (req.status_code == 200):
return req.json()["data"]
else:
return self.get_user(user_id, max_retries-1) # if request fails try again recursively
return self.get_user(user_id, max_retries - 1) # if request fails try again recursively
def get_all_users(self, max_retries=10):
"""gets all users
@ -132,12 +130,11 @@ class API_Handler:
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token}
req = s.get(self.db_adress + "/users", headers=headers)
if(req.status_code == 200):
if (req.status_code == 200):
return req.json()["data"]
else:
return self.get_all_users(max_retries-1)
return self.get_all_users(max_retries - 1)
def get_user_keywords(self, user_id, max_retries=10):
"""gets the keywords of the user
@ -159,17 +156,15 @@ class API_Handler:
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/keywords", headers=headers)
if(req.status_code == 200):
if (req.status_code == 200):
keywords_json = req.json()["data"]
for keyword in keywords_json: # keywords_json is a list of dictionaries
for keyword in keywords_json: # keywords_json is a list of dictionaries
keywords.append(keyword["keyword"])
return keywords # will be empty if no keywords are set
return keywords # will be empty if no keywords are set
else:
return self.get_user_keywords(user_id, max_retries-1)
return self.get_user_keywords(user_id, max_retries - 1)
def set_keyword(self, user_id, keyword):
"""sets the keyword of the user
@ -190,7 +185,6 @@ class API_Handler:
return req.status_code
def delete_keyword(self, user_id, keyword):
"""deletes the keyword of the user
@ -210,7 +204,6 @@ class API_Handler:
return req.status_code
def get_user_shares(self, user_id, max_retries=10):
"""gets the shares of the user
@ -230,17 +223,16 @@ class API_Handler:
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/shares", headers=headers)
if(req.status_code == 200):
if (req.status_code == 200):
shares_json = req.json()["data"]
shares = []
for share in shares_json:
shares.append(share["isin"]) # we only want the isin of the shares
shares.append(share["isin"]) # we only want the isin of the shares
return shares
else:
return self.get_user_shares(user_id, max_retries-1)
return self.get_user_shares(user_id, max_retries - 1)
def set_share(self, user_id, isin, comment):
"""sets the share of the user
@ -258,10 +250,10 @@ class API_Handler:
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin}, headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
req = s.post(self.db_adress + "/share", json={"comment": comment, "isin": isin},
headers=headers) # set share by setting comment and isin, comment can be the real name of the share e.g. "Apple Inc."
return req.status_code
def delete_share(self, user_id, isin):
"""deletes the share of the user
@ -277,10 +269,9 @@ class API_Handler:
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
req = s.delete(self.db_adress + "/share", json={"isin": str(isin)}, headers=headers) # to delete a share only the isin is needed because it is unique, shares are not transactions!
return req.status_code
def get_user_transactions(self, user_id, max_retries=10):
"""gets the transactions of the user
@ -305,8 +296,7 @@ class API_Handler:
transactions_dict = req.json()["data"]
return transactions_dict
else:
return self.get_user_transactions(user_id, max_retries-1)
return self.get_user_transactions(user_id, max_retries - 1)
def set_transaction(self, user_id, comment, isin, count, price, time):
"""sets the transaction of the user
@ -326,13 +316,13 @@ class API_Handler:
None
"""
with r.Session() as s:
time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db
time = time[:-3] + "Z" # remove last character and add Z to make it a valid date for db
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price), "time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
transaction = {"comment": str(comment), "count": float(count), "isin": str(isin), "price": float(price),
"time": str(time)} # set transaction as JSON with all the attributes needed according to Swagger docs
req = s.post(self.db_adress + "/transaction", json=transaction, headers=headers)
return req.status_code
def get_user_portfolio(self, user_id, max_retries=10):
"""gets the portfolio of the user
@ -351,12 +341,12 @@ class API_Handler:
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
req = s.get(self.db_adress + "/portfolio", headers=headers) # get portfolio as JSON
if req.status_code == 200:
portfolio_dict = req.json()["data"] # get the data of the JSON
portfolio_dict = req.json()["data"] # get the data of the JSON
return portfolio_dict
else:
return self.get_user_portfolio(user_id, max_retries-1)
return self.get_user_portfolio(user_id, max_retries - 1)
def set_cron_interval(self, user_id, cron_interval):
"""sets the cron interval of the user
@ -371,16 +361,15 @@ class API_Handler:
Raises:
None
"""
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
if not croniter.is_valid(cron_interval): # check if cron_interval is in valid format
print("Error: Invalid cron format")
return -1 # return error code -1 if invalid cron format
return -1 # return error code -1 if invalid cron format
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token + ":" + str(user_id)}
req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
req = s.put(self.db_adress + "/user/setCron", json={"cron": str(cron_interval)}, headers=headers) # put not post (see swagger docs)
return req.status_code
def set_admin(self, email, is_admin):
"""sets the admin of the user
@ -395,21 +384,21 @@ class API_Handler:
None
"""
with r.Session() as s:
headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin,"email": str(email)}, headers=headers)
headers = {'Authorization': 'Bearer ' + self.token} # only bot token is needed, user is chosen by email
req = s.put(self.db_adress + "/user/setAdmin", json={"admin": is_admin, "email": str(email)}, headers=headers)
return req.status_code
if __name__ == "__main__": # editable, just for basic on the go testing of new functions
if __name__ == "__main__": # editable, just for basic on the go testing of new functions
print("This is a module for the telegram bot. It is not intended to be run directly.")
handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env
handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env
print(handler.token)
keywords = handler.get_user_keywords(user_id = 1709356058) #user_id here is currently mine (Linus)
keywords = handler.get_user_keywords(user_id=1709356058) # user_id here is currently mine (Linus)
print(keywords)
shares = handler.get_user_portfolio(user_id = 1709356058)
print("set cron with status: "+ str(handler.set_cron_interval(user_id = 1709356058, cron_interval = "0 0 * * *")))
user = handler.get_user(user_id = 1709356058)
shares = handler.get_user_portfolio(user_id=1709356058)
print("set cron with status: " + str(handler.set_cron_interval(user_id=1709356058, cron_interval="0 0 * * *")))
user = handler.get_user(user_id=1709356058)
print(user)
all_users = handler.get_all_users()
admin_status = handler.set_admin("test@test.com", "true")

View File

@ -1,4 +1,3 @@
"""
script for telegram bot and its functions
"""
@ -14,37 +13,34 @@ __license__ = "None"
# API Documentation https://core.telegram.org/bots/api
# Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started
import datetime as dt
import logging
import os
import re
import sys
import telebot
import sys
import logging
import re
from dotenv import load_dotenv
from telebot import types
import helper_functions as hf
import news.news_fetcher as news
import shares.share_fetcher as share_fetcher
import helper_functions as hf
import datetime as dt
from telebot import types
from dotenv import load_dotenv
from api_handling.api_handler import API_Handler
load_dotenv(dotenv_path='.env') # load environment variables
load_dotenv(dotenv_path='.env') # load environment variables
bot_version = "2.0.1" # version of bot
bot_version = "2.0.1" # version of bot
#create api handler
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars.
# create api handler
api_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD"))) # get creds from env vars.
print("Webserver Token: " + str(api_handler.token))
bot = telebot.TeleBot(os.getenv('BOT_API_KEY'))
@bot.message_handler(commands=['start', 'Start'])
def send_start(message):
""" Sending welcome message to new user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/start'
@ -61,7 +57,6 @@ def send_start(message):
@bot.message_handler(commands=['version', 'Version'])
def send_version(message):
""" Sending programm version
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/version'
@ -73,9 +68,8 @@ def send_version(message):
bot.reply_to(message, "the current bot version is " + bot_version)
@bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions
@bot.message_handler(commands=['help', 'Help']) # /help -> sending all functions
def send_help(message):
""" Send all functions
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/help'
@ -84,12 +78,12 @@ def send_help(message):
:rtype: none
"""
bot.reply_to(message, "/id or /auth get your user id\n/update get updates on your shares.\n/shares get update on interesting shares\n/setAdmin set admin rights of user (ADMIN)\n/users see all users. (ADMIN)\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/transactions get all transactions\n/newtransaction create new transaction\n/share get price of specific share\n/portfolio see own portfolio\n/removeshare removes share from portfolio\n/interval get update interval\n/setinterval set update interval\n For further details see https://gruppe1.testsites.info")
bot.reply_to(message,
"/id or /auth get your user id\n/update get updates on your shares.\n/shares get update on interesting shares\n/setAdmin set admin rights of user (ADMIN)\n/users see all users. (ADMIN)\n/me get my user info\n/news get top article for each keyword.\n/allnews get all news (last 7 days)\n/keywords get all your keywords\n/addkeyword add a keyword\n/removekeyword remove a keyword\n/transactions get all transactions\n/newtransaction create new transaction\n/share get price of specific share\n/portfolio see own portfolio\n/removeshare removes share from portfolio\n/interval get update interval\n/setinterval set update interval\n For further details see https://gruppe1.testsites.info")
@bot.message_handler(commands=['users', 'Users']) # /users -> sending all users
@bot.message_handler(commands=['users', 'Users']) # /users -> sending all users
def send_all_users(message):
""" Send all users, only possible for admins
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/users'
@ -101,7 +95,7 @@ def send_all_users(message):
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
if (user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
@ -110,19 +104,17 @@ def send_all_users(message):
bot.send_message(chat_id=user_id, text="There are " + str(user_count) + " users in the database:")
for user in user_list:
username = user['username']
email = user['email']
id = user['telegram_user_id']
cron = user['cron']
admin = user['admin']
bot.send_message(chat_id=user_id, text=f'Username: {username}\nEmail: {email}\nID: {id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
bot.send_message(chat_id=user_id, text=f'Username: {username}\nEmail: {email}\nID: {id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['setAdmin', 'SetAdmin', 'setadmin', 'Setadmin']) # set admin rights to user TBD: not working!!
@bot.message_handler(commands=['setAdmin', 'SetAdmin', 'setadmin', 'Setadmin']) # set admin rights to user TBD: not working!!
def set_admin(message):
""" Set admin rights to user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/setAdmin'
@ -134,63 +126,63 @@ def set_admin(message):
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if(user_data["admin"] == False): # check if user has admin rights
if (user_data["admin"] == False): # check if user has admin rights
bot.reply_to(message, "You have to be an admin to use this command")
return
bot.send_message(chat_id=user_id, text='send email and true if this account should have admin rights, else false\n in format: <email>,<is_admin>') # request email and admin rights to change to
bot.send_message(chat_id=user_id, text='send email and true if this account should have admin rights, else false\n in format: <email>,<is_admin>') # request email and admin rights to change to
bot.register_next_step_handler(message, set_admin_step)
def set_admin_step(message):
str_message = str(message.text)
args_message = str_message.split(',') # split message into email and admin rights
args_message = str_message.split(',') # split message into email and admin rights
if len(args_message) != 2: # make sure 2 args (email,is_admin) are given
if len(args_message) != 2: # make sure 2 args (email,is_admin) are given
bot.reply_to(message, "exactly 2 arguments (<email>,<is_admin>) required, try again")
return
email = args_message[0]
is_admin = False # default: False
is_admin = False # default: False
if args_message[1].lower() == "true": # if user types true, set is_admin to true
if args_message[1].lower() == "true": # if user types true, set is_admin to true
is_admin = True
status = api_handler.set_admin(email, is_admin) # set admin in db
status = api_handler.set_admin(email, is_admin) # set admin in db
if(status == 200):
if (status == 200):
bot.reply_to(message, "Admin rights set")
else:
bot.reply_to(message, f"Admin rights could not be set ({status})")
@bot.message_handler(commands=['me', 'Me']) # /me -> sending user info
@bot.message_handler(commands=['me', 'Me']) # /me -> sending user info
def send_user(message):
""" Send user data
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/me'
""" Send user data
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/me'
:raises: none
:raises: none
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if not user_data or user_data == None: # true if user is not registered
bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info")
return
username = user_data['username']
email = user_data['email']
user_id = user_data['telegram_user_id']
cron = user_data['cron']
admin = user_data['admin']
bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id)
if not user_data or user_data == None: # true if user is not registered
bot.reply_to(message, "This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info")
return
username = user_data['username']
email = user_data['email']
user_id = user_data['telegram_user_id']
cron = user_data['cron']
admin = user_data['admin']
bot.reply_to(message, f'Username: {username}\nEmail: {email}\nID: {user_id}\nCron: {cron}\nAdmin: {admin}') # format user data into readable message text
@bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool
@bot.message_handler(commands=['id', 'auth', 'Id', 'Auth']) # /auth or /id -> Authentication with user_id over web tool
def send_id(message):
""" Send user id for authentication with browser
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/id' or '/auth'
@ -203,10 +195,9 @@ def send_id(message):
bot.reply_to(message, answer)
#function that can be used to ensure that the bot is online and running
# function that can be used to ensure that the bot is online and running
@bot.message_handler(commands=['status', 'Status'])
def send_status(message):
""" Sends status to user
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
@ -218,9 +209,8 @@ def send_status(message):
bot.reply_to(message, "bot is running")
@bot.message_handler(commands=['update', 'Update']) # /update -> update shares
@bot.message_handler(commands=['update', 'Update']) # /update -> update shares
def update_for_user(message):
p_user_id = int(message.from_user.id)
p_my_handler = api_handler
@ -230,13 +220,13 @@ def update_for_user(message):
my_portfolio = p_my_handler.get_user_portfolio(p_user_id)
for element in my_portfolio:
if element["count"] != '' and element["isin"]!= '':
if element["count"] != '' and element["isin"] != '':
print(element["count"], element["isin"])
share_symbols.append(element["isin"])
share_amounts.append(element["count"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update:"%str(my_user["username"]), pUser_id=p_user_id)
send_to_user("Hello %s this is your share update:" % str(my_user["username"]), pUser_id=p_user_id)
if len(share_symbols) != 0:
for i in range(len(share_symbols)):
@ -248,7 +238,6 @@ def update_for_user(message):
def send_to_user(pText, pUser_id):
""" Send message to user
:type pText: string
:param pText: Text to send to user
@ -263,9 +252,8 @@ def send_to_user(pText, pUser_id):
bot.send_message(chat_id=pUser_id, text=pText)
@bot.message_handler(commands=['share', 'Share']) # /share -> get share price
@bot.message_handler(commands=['share', 'Share']) # /share -> get share price
def send_share_update(message):
""" Send price of a specific share
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/share'
@ -279,14 +267,14 @@ def send_share_update(message):
bot.send_message(chat_id=user_id, text='Send Symbol/ISIN of share or name of company:')
bot.register_next_step_handler(message, send_share_price)
def send_share_price(message):
str_share_price = share_fetcher.get_share_information_markdown(str(message.text))
bot.reply_to(message, str_share_price, parse_mode="MARKDOWNV2")
@bot.message_handler(commands=['allnews', 'Allnews']) # /allnews -> get all news
@bot.message_handler(commands=['allnews', 'Allnews']) # /allnews -> get all news
def send_all_news(message):
""" Get news for keywords of user
:type message: message object bot
:param message: message that was reacted to, in this case always containing '/allnews'
@ -297,31 +285,31 @@ def send_all_news(message):
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword')
return
keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc.
now = dt.datetime.now().date() # get current date
from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max
keywords_search = ' OR '.join(keywords) # concat all keywords with OR -> NewsAPI can understand OR, AND, NOT etc.
now = dt.datetime.now().date() # get current date
from_date = now - dt.timedelta(days=7) # get date 7 days ago -> limit age of news to 7 days old max
from_date_formatted = dt.datetime.strftime(from_date, '%Y-%m-%d')
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects
news_list = news.get_all_news_by_keyword(keywords_search, from_date_formatted)["articles"] # array of JSON article objects
if news_list: # true if news_list is not empty
if news_list: # true if news_list is not empty
for article in news_list:
formatted_article = news.format_article(article)
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWNV2") # Markdown allows to write bold text with * etc.
bot.send_message(chat_id=user_id, text=formatted_article, parse_mode="MARKDOWNV2") # Markdown allows to write bold text with * etc.
else:
bot.send_message(chat_id=user_id, text='No news found for your keywords.')
@bot.message_handler(commands=['news', 'News']) # /news -> get news for specific keyword
@bot.message_handler(commands=['news', 'News']) # /news -> get news for specific keyword
def send_news(message):
""" Get news for keywords of user
@ -333,33 +321,33 @@ def send_news(message):
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='You have no keywords. Please add some keywords with /addkeyword')
return
if keywords:
for keyword in keywords:
top_news = news.get_top_news_by_keyword(keyword)["articles"]
if top_news == None: # true if request to NewsAPI failed
if top_news == None: # true if request to NewsAPI failed
bot.send_message(chat_id=user_id, text='News Server did not respond correctly. Try again later.')
if not top_news: # true if no news found for keyword (empty list)
if not top_news: # true if no news found for keyword (empty list)
keyword = hf.make_markdown_proof(keyword)
bot.send_message(chat_id=user_id, text=f'No news found for keyword: *{keyword}*', parse_mode="MARKDOWNV2")
else:
keyword = hf.make_markdown_proof(keyword)
formatted_article = news.format_article(top_news[0]) # only format and send most popular news
bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWNV2") # do not use v2 because of bugs related t "." in links
formatted_article = news.format_article(top_news[0]) # only format and send most popular news
bot.send_message(chat_id=user_id, text=f"_keyword: {keyword}_\n\n" + formatted_article, parse_mode="MARKDOWNV2") # do not use v2 because of bugs related t "." in links
@bot.message_handler(commands=['addkeyword', 'Addkeyword']) # /addkeyword -> add keyword to user
@bot.message_handler(commands=['addkeyword', 'Addkeyword']) # /addkeyword -> add keyword to user
def add_keyword(message):
""" Add keyword to user
:type message: message object bot
@ -371,19 +359,20 @@ def add_keyword(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to add:')
bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function
bot.register_next_step_handler(message, store_keyword) # wait for user to send keyword, then call store_keyword function
def store_keyword(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords
status = api_handler.set_keyword(user_id, keyword) # set keyword in database
if status == 200: # statuscode 200 means keyword was added successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here
keyword = str(message.text).lower() # lower to ensure Bitcoin and bitcoin is not stored as individual keywords
status = api_handler.set_keyword(user_id, keyword) # set keyword in database
if status == 200: # statuscode 200 means keyword was added successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" added.') # duplicate keywords are denied by Database, so no need to check for that here
else:
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" could not be stored. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info (statuscode {status})')
@bot.message_handler(commands=['removekeyword', 'Removekeyword']) # /removekeyword -> remove keyword from user
@bot.message_handler(commands=['removekeyword', 'Removekeyword']) # /removekeyword -> remove keyword from user
def remove_keyword(message):
""" Remove keyword from user
:type message: message object bot
@ -395,19 +384,20 @@ def remove_keyword(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type keyword to remove:')
bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function
bot.register_next_step_handler(message, remove_keyword_step) # wait for user to send keyword to remove, then call remove_keyword_step function
def remove_keyword_step(message):
user_id = int(message.from_user.id)
keyword = str(message.text).lower()
status = api_handler.delete_keyword(user_id, keyword)
if status == 200: # statuscode 200 means keyword was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here
if status == 200: # statuscode 200 means keyword was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Keyword "{keyword}" removed.') # checking if keyword to remove is in database are handled in database, not here
else:
bot.send_message(chat_id=user_id, text=f'Failed deleting keyword "{keyword}". (statuscode {status})')
@bot.message_handler(commands=['keywords', 'Keywords']) # /keywords -> get keywords of user
@bot.message_handler(commands=['keywords', 'Keywords']) # /keywords -> get keywords of user
def send_keywords(message):
""" Send keywords of user
:type message: message object bot
@ -418,17 +408,17 @@ def send_keywords(message):
:rtype: none
"""
user_id = int(message.from_user.id)
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
keywords = api_handler.get_user_keywords(user_id) # get keywords of user
if keywords == None: # true if user is not registered
if keywords == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not keywords: # true if user is registered but does not have any keywords
if not keywords: # true if user is registered but does not have any keywords
bot.send_message(chat_id=user_id, text='No keywords set for this account. Add keywords by using /addkeyword')
return
else: # send keyword list
else: # send keyword list
keywords_str = ', '.join(keywords)
keywords_str = hf.make_markdown_proof(keywords_str)
@ -447,20 +437,20 @@ def send_portfolio(message):
:rtype: none
"""
user_id = int(message.from_user.id)
portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json
if portfolio == None: # true if user is not registered
portfolio = api_handler.get_user_portfolio(user_id) # get portfolio of user as json
if portfolio == None: # true if user is not registered
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not portfolio: # true if user is registered but does not have any stocks in portfolio
if not portfolio: # true if user is registered but does not have any stocks in portfolio
bot.send_message(chat_id=user_id, text='You do not have any stocks in your portfolio.')
return
else: # send portfolio
else: # send portfolio
for stock in portfolio:
comment = hf.make_markdown_proof(str(stock["comment"])) # comment may be written name of stock, comment is made by user when adding an stock to portfolio
count = hf.make_markdown_proof("{:.2f}".format(float(stock["count"]))) # round count to 2 decimal places
comment = hf.make_markdown_proof(str(stock["comment"])) # comment may be written name of stock, comment is made by user when adding an stock to portfolio
count = hf.make_markdown_proof("{:.2f}".format(float(stock["count"]))) # round count to 2 decimal places
isin = hf.make_markdown_proof(str(stock["isin"]))
worth = hf.make_markdown_proof("{:.2f}".format(float(stock["current_price"]) * float(stock["count"]))) # round current_price to 2 decimal places
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWNV2") # formatted message in markdown
worth = hf.make_markdown_proof("{:.2f}".format(float(stock["current_price"]) * float(stock["count"]))) # round current_price to 2 decimal places
bot.send_message(chat_id=user_id, text=f'*{comment}*\n_{isin}_\namount: {count}\nworth: ${worth}', parse_mode="MARKDOWNV2") # formatted message in markdown
@bot.message_handler(commands=['removeshare', 'Removeshare'])
@ -476,23 +466,23 @@ def remove_share(message):
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type ISIN/Symbol/CompanyName of share to remove (if you are unsure do /portfolio, find your share and insert the value above amount):')
bot.register_next_step_handler(message, remove_share_step) # wait for user to send ISIN, then call remove_share_step function
bot.register_next_step_handler(message, remove_share_step) # wait for user to send ISIN, then call remove_share_step function
def remove_share_step(message):
user_id = int(message.from_user.id)
isin = str(message.text)
status = api_handler.delete_share(int(user_id), str(isin)) # remove share from portfolio
status = api_handler.delete_share(int(user_id), str(isin)) # remove share from portfolio
if status == 200: # statuscode 200 means share was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Share "{isin}" removed.') # checking if share to remove is in database are handled in database, not here
if status == 200: # statuscode 200 means share was removed successfully without errors
bot.send_message(chat_id=user_id, text=f'Share "{isin}" removed.') # checking if share to remove is in database are handled in database, not here
else:
bot.send_message(chat_id=user_id, text=f'Failed deleting share "{isin}". (statuscode {status})\nMake sure that the share is in your portfolio and written exactly like there.')
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) #tbd not working rn may be deleted in future
@bot.message_handler(commands=['newtransaction', 'Newtransaction']) # tbd not working rn may be deleted in future
def set_new_transaction(message):
""" Set new transaction for user
:type message: message object bot
@ -503,7 +493,8 @@ def set_new_transaction(message):
:rtype: none
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type "<name of stock>,<isin/name/symbol>,<amount>,<price_per_stock_usd>" (time of transaction will be set to now, negative amount is selling, positive is buying):')
bot.send_message(chat_id=user_id,
text='Type "<name of stock>,<isin/name/symbol>,<amount>,<price_per_stock_usd>" (time of transaction will be set to now, negative amount is selling, positive is buying):')
bot.register_next_step_handler(message, set_new_transaction_step)
@ -541,16 +532,16 @@ def send_interval(message):
:rtype: none
"""
user_id = int(message.from_user.id)
user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data)
if user_data == None: # true if user is not registered in DB
user_data = api_handler.get_user(user_id) # get cron interval of user (stored in user data)
if user_data == None: # true if user is not registered in DB
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info and set an interval with /setinterval')
return
else: # send interval
interval = str(user_data['cron']) # get cron from user data
if interval == 'None': # true if user has no cron set
else: # send interval
interval = str(user_data['cron']) # get cron from user data
if interval == 'None': # true if user has no cron set
bot.send_message(chat_id=user_id, text='You do not have an interval set. Set one with /setinterval')
return
formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru
formatted_interval = str(interval).replace(' ', '_') # replace spaces with underscores to add to url of crontab.guru
bot.send_message(chat_id=user_id, text=f'Your update interval: {interval} (https://crontab.guru/#{formatted_interval})')
@ -565,20 +556,20 @@ def send_transactions(message):
:rtype: none
"""
user_id = int(message.from_user.id)
transactions = api_handler.get_user_transactions(user_id) # get transactions of user
transactions = api_handler.get_user_transactions(user_id) # get transactions of user
if transactions == None: # true if user does not exist
if transactions == None: # true if user does not exist
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
return
if not transactions: # true if user has no transactions
if not transactions: # true if user has no transactions
bot.send_message(chat_id=user_id, text='You do not have any transactions.')
return
else:
for transaction in transactions:
comment = hf.make_markdown_proof(transaction['comment']) or "\(no desc\)" # if comment is empty, make it "no desc"
comment = hf.make_markdown_proof(transaction['comment']) or "\(no desc\)" # if comment is empty, make it "no desc"
isin = hf.make_markdown_proof(transaction['isin'])
amount = hf.make_markdown_proof(transaction['count'])
price = hf.make_markdown_proof(transaction['price'])
@ -598,18 +589,17 @@ def send_shares(message):
:rtype: none
"""
user_id = int(message.from_user.id)
shares = api_handler.get_user_shares(user_id) # get shares of user
shares = api_handler.get_user_shares(user_id) # get shares of user
if shares == None: # true if user does not exist
if shares == None: # true if user does not exist
bot.send_message(chat_id=user_id, text='This didn\'t work. Make sure to connect your telegram id (/id) on https://gruppe1.testsites.info')
elif not shares: # true if user has no shares
elif not shares: # true if user has no shares
bot.send_message(chat_id=user_id, text='You do not have any shares. Add shares on https://gruppe1.testsites.info')
else:
for element in shares:
bot.send_message(chat_id=user_id, text=share_fetcher.get_share_information_markdown(element), parse_mode="MARKDOWNV2")
@bot.message_handler(commands=['setinterval', 'Setinterval'])
def set_new_interval(message):
""" Set new interval for user
@ -622,28 +612,27 @@ def set_new_interval(message):
"""
user_id = int(message.from_user.id)
bot.send_message(chat_id=user_id, text='Type interval in cron format:\n(https://crontab.guru/)')
bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message
bot.register_next_step_handler(message, set_new_interval_step) # executes function when user sends message
def set_new_interval_step(message):
user_id = int(message.from_user.id)
interval = str(message.text)
status = api_handler.set_cron_interval(user_id, interval) # send cron to db
status = api_handler.set_cron_interval(user_id, interval) # send cron to db
if status == 200:
bot.send_message(chat_id=user_id, text='Interval succesfully set.')
return
if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format
if status == -1: # only -1 when interval is invalid, not a real statuscode, but used from api_handler.set_cron_interval to tell the crontab has the wrong format
bot.send_message(chat_id=user_id, text='Invalid interval format. Try again with\n /setinterval.')
return
else:
bot.send_message(chat_id=user_id, text=f'Failed setting interval. (statuscode {status})')
@bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement
@bot.message_handler(func=lambda message: True) # Returning that command is unknown for any other statement
def echo_all(message):
""" Tell that command is not known if it is no known command
:type message: message object bot
:param message: message that was reacted to, if no other command handler gets called
@ -659,9 +648,8 @@ def echo_all(message):
telebot.logger.setLevel(logging.DEBUG)
@bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging
@bot.inline_handler(lambda query: query.query == 'text') # inline prints for debugging
def query_text(inline_query):
""" Output in the console about current user actions and status of bot
:type inline_query:
:param inline_query:
@ -679,7 +667,6 @@ def query_text(inline_query):
def main_loop():
""" Start bot
:raises: none
@ -687,6 +674,7 @@ def main_loop():
"""
bot.infinity_polling()
if __name__ == '__main__':
try:
main_loop()

View File

@ -6,17 +6,18 @@ __date__ = "10.05.2022"
__version__ = "1.0.2"
__license__ = "None"
from dotenv import load_dotenv
import news.news_fetcher as news_fetcher
import time
import os
from bot import bot
import sys
from apscheduler.schedulers.background import BackgroundScheduler
from api_handling.api_handler import API_Handler
import shares.share_fetcher as share_fetcher
import helper_functions as hf
import time
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
import helper_functions as hf
import news.news_fetcher as news_fetcher
import shares.share_fetcher as share_fetcher
from api_handling.api_handler import API_Handler
from bot import bot
'''
* * * * * code
@ -35,6 +36,7 @@ user_crontab = []
load_dotenv(dotenv_path='.env')
def start_updater():
""" starting function for regularly sending updates
:raises: none
@ -46,7 +48,6 @@ def start_updater():
my_handler = API_Handler("https://gruppe1.testsites.info/api", str(os.getenv("BOT_EMAIL")), str(os.getenv("BOT_PASSWORD")))
update_crontab(my_handler)
@ -76,8 +77,8 @@ def update_crontab(p_my_handler):
user_crontab.append(str(element["cron"]))
except:
user_ids.pop()
except: continue
except:
continue
print(user_ids)
@ -87,7 +88,6 @@ def update_crontab(p_my_handler):
def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
""" Check all the crontab codes and add jobs to start in time
:type p_user_ids: array
:param p_user_ids: user id array of all users
@ -108,15 +108,15 @@ def update_based_on_crontab(p_user_ids, p_user_crontab, p_my_handler):
for i in range(len(p_user_ids)):
cron_split = p_user_crontab[i].split(" ")
print(cron_split[4], cron_split[1], cron_split[0], cron_split[3], cron_split[2])
my_scheduler.add_job(update_for_user, 'cron', day_of_week = cron_split[4] , hour= cron_split[1] , minute = cron_split[0], month= cron_split[3] , day=cron_split[2], args=(p_user_ids[i], p_my_handler ))
my_scheduler.add_job(update_for_user, 'cron', day_of_week=cron_split[4], hour=cron_split[1], minute=cron_split[0], month=cron_split[3], day=cron_split[2], args=(p_user_ids[i], p_my_handler))
my_scheduler.start()
time.sleep( 600 )
time.sleep(600)
my_scheduler.shutdown()
def update_for_user(p_user_id, p_my_handler):
def update_for_user(p_user_id, p_my_handler):
""" Pull shares and send updates for specific user id
:type p_user_id: integer
:param p_user_id: user id of user that shall receive update
@ -135,13 +135,13 @@ def update_for_user(p_user_id, p_my_handler):
my_portfolio = p_my_handler.get_user_portfolio(p_user_id)
for element in my_portfolio:
if element["count"] != '' and element["isin"]!= '':
if element["count"] != '' and element["isin"] != '':
print(element["count"], element["isin"])
share_symbols.append(element["isin"])
share_amounts.append(element["count"])
my_user = p_my_handler.get_user(p_user_id)
send_to_user("Hello %s this is your share update for today:"%str(my_user["username"]), pUser_id=p_user_id)
send_to_user("Hello %s this is your share update for today:" % str(my_user["username"]), pUser_id=p_user_id)
shares = p_my_handler.get_user_shares(p_user_id)
@ -153,34 +153,30 @@ def update_for_user(p_user_id, p_my_handler):
else:
send_to_user("No shares found for your account. Check https://gruppe1.testsites.info to change your settings and add shares.", pUser_id=p_user_id)
if len(shares)!=0: # Send updates on watchlist shares if existing
if len(shares) != 0: # Send updates on watchlist shares if existing
send_to_user("Your watchlist shares:", pUser_id=p_user_id)
for element in shares:
send_to_user(share_fetcher.get_share_information_markdown(element), pUser_id=p_user_id, md_mode=True)
keywords = p_my_handler.get_user_keywords(p_user_id) # get keywords as array
keywords = p_my_handler.get_user_keywords(p_user_id) # get keywords as array
if(keywords): # if keywords exist and array is not empty
if (keywords): # if keywords exist and array is not empty
send_to_user("If you haven't read yet: \nHere are some interesting news according to your keywords:", pUser_id=p_user_id)
for keyword in keywords:
news = news_fetcher.get_top_news_by_keyword(keyword)["articles"]
keyword = hf.make_markdown_proof(keyword)
if not news: # if empty news array
if not news: # if empty news array
send_to_user(f"No news found for keyword _{keyword}_.", pUser_id=p_user_id, md_mode=True)
elif news == None: # if news is none
elif news == None: # if news is none
send_to_user(f"Server error for keyword _{keyword}_.", pUser_id=p_user_id, md_mode=True)
else:
news_formatted = news_fetcher.format_article(news[0]) # format for message, only use the most popular article
send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True) # send news with related keyword in Markdown
news_formatted = news_fetcher.format_article(news[0]) # format for message, only use the most popular article
send_to_user(f"_keyword: {keyword}_\n\n{news_formatted}", pUser_id=p_user_id, md_mode=True) # send news with related keyword in Markdown
def send_to_user(pText, pUser_id , md_mode = False):
def send_to_user(pText, pUser_id, md_mode=False):
""" Send message to user
:type pText: string
:param pText: Text to send to user
@ -201,7 +197,6 @@ def send_to_user(pText, pUser_id , md_mode = False):
bot.send_message(chat_id=pUser_id, text=pText)
if __name__ == "__main__":
try:
start_updater()

View File

@ -6,6 +6,7 @@ __date__ = "10.05.2022"
__version__ = "1.0.0"
__license__ = "None"
def contains_markdownv1_symbols(text):
""" checks if text contains markdown symbols
:type text: string
@ -16,13 +17,13 @@ def contains_markdownv1_symbols(text):
:rtype: bool
"""
if text.find("_") != -1 or text.find("*") != -1 or text.find("`") != -1: # check if text contains relevant markdown symbols
if text.find("_") != -1 or text.find("*") != -1 or text.find("`") != -1: # check if text contains relevant markdown symbols
return True
return False
def make_markdown_proof(text): # used to avoid errors related to markdown parsemode for telegram messaging
def make_markdown_proof(text): # used to avoid errors related to markdown parsemode for telegram messaging
""" makes text markdown proof
:type text: string
@ -34,7 +35,7 @@ def make_markdown_proof(text): # used to avoid errors related to markdown parsem
"""
text = str(text)
text = text.replace("_", "\\_") # replace _ with \_ because \ is used as escape character in markdown, double escape is needed because \ is also a escape character in strings
text = text.replace("_", "\\_") # replace _ with \_ because \ is used as escape character in markdown, double escape is needed because \ is also a escape character in strings
text = text.replace("*", "\\*")
text = text.replace("`", "\\`")
text = text.replace("[", "\\[")
@ -57,7 +58,6 @@ def make_markdown_proof(text): # used to avoid errors related to markdown parsem
text = text.replace("$", "\\$")
text = text.replace("%", "\\%")
return text

View File

@ -1,32 +1,32 @@
{
"status": "ok",
"totalResults": 1,
"articles": [
{
"source": {
"id": "the-verge",
"name": "The Verge"
},
"author": "Justine Calma",
"title": "EU Parliament backs off plans to phase out energy-hungry cryptocurrencies",
"description": "EU Parliament abandoned a measure in its proposed legislative framework for regulating cryptocurrencies that would have amounted to a de facto ban on energy-hungry networks like Bitcoin.",
"url": "https://www.theverge.com/2022/3/14/22977132/bitcoin-european-union-parliament-ban-proof-of-work-cryptocurrencies",
"urlToImage": "https://cdn.vox-cdn.com/thumbor/8bE-uBwwu-eXg-CcB6cOqcAGVDw=/0x286:4000x2380/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23315944/834392892.jpg",
"publishedAt": "2022-03-14T23:40:25Z",
"content": "But Bitcoin is still under scrutiny \r\nPower cords for bitcoin mining machines are plugged into electrical outlets at a mining facility operated by Bitmain Technologies Ltd. in Ordos, Inner Mongolia, \u2026 [+5797 chars]"
},
{
"source": {
"id": "the-verge",
"name": "The Verge"
},
"author": "Justine Calma",
"title": "EU Parliament backs off plans to phase out energy-hungry cryptocurrencies",
"description": "EU Parliament abandoned a measure in its proposed legislative framework for regulating cryptocurrencies that would have amounted to a de facto ban on energy-hungry networks like Bitcoin.",
"url": "https://www.theverge.com/2022/3/14/22977132/bitcoin-european-union-parliament-ban-proof-of-work-cryptocurrencies",
"urlToImage": "https://cdn.vox-cdn.com/thumbor/8bE-uBwwu-eXg-CcB6cOqcAGVDw=/0x286:4000x2380/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23315944/834392892.jpg",
"publishedAt": "2022-03-14T23:40:25Z",
"content": "But Bitcoin is still under scrutiny \r\nPower cords for bitcoin mining machines are plugged into electrical outlets at a mining facility operated by Bitmain Technologies Ltd. in Ordos, Inner Mongolia, \u2026 [+5797 chars]"
}
]
"status": "ok",
"totalResults": 1,
"articles": [
{
"source": {
"id": "the-verge",
"name": "The Verge"
},
"author": "Justine Calma",
"title": "EU Parliament backs off plans to phase out energy-hungry cryptocurrencies",
"description": "EU Parliament abandoned a measure in its proposed legislative framework for regulating cryptocurrencies that would have amounted to a de facto ban on energy-hungry networks like Bitcoin.",
"url": "https://www.theverge.com/2022/3/14/22977132/bitcoin-european-union-parliament-ban-proof-of-work-cryptocurrencies",
"urlToImage": "https://cdn.vox-cdn.com/thumbor/8bE-uBwwu-eXg-CcB6cOqcAGVDw=/0x286:4000x2380/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23315944/834392892.jpg",
"publishedAt": "2022-03-14T23:40:25Z",
"content": "But Bitcoin is still under scrutiny \r\nPower cords for bitcoin mining machines are plugged into electrical outlets at a mining facility operated by Bitmain Technologies Ltd. in Ordos, Inner Mongolia, \u2026 [+5797 chars]"
},
{
"source": {
"id": "the-verge",
"name": "The Verge"
},
"author": "Justine Calma",
"title": "EU Parliament backs off plans to phase out energy-hungry cryptocurrencies",
"description": "EU Parliament abandoned a measure in its proposed legislative framework for regulating cryptocurrencies that would have amounted to a de facto ban on energy-hungry networks like Bitcoin.",
"url": "https://www.theverge.com/2022/3/14/22977132/bitcoin-european-union-parliament-ban-proof-of-work-cryptocurrencies",
"urlToImage": "https://cdn.vox-cdn.com/thumbor/8bE-uBwwu-eXg-CcB6cOqcAGVDw=/0x286:4000x2380/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23315944/834392892.jpg",
"publishedAt": "2022-03-14T23:40:25Z",
"content": "But Bitcoin is still under scrutiny \r\nPower cords for bitcoin mining machines are plugged into electrical outlets at a mining facility operated by Bitmain Technologies Ltd. in Ordos, Inner Mongolia, \u2026 [+5797 chars]"
}
]
}

View File

@ -6,20 +6,19 @@ __date__ = "26.04.2022"
__version__ = "1.0.0"
__license__ = "None"
import sys
import os
import requests
import sys
import helper_functions as hf
from newsapi import NewsApiClient
import requests
from dotenv import load_dotenv
from newsapi import NewsApiClient
load_dotenv() # loads environment vars
load_dotenv() # loads environment vars
# Init
api_key = os.getenv('NEWS_API_KEY') # get API Key from .env file
newsapi = NewsApiClient(api_key=api_key) # news api from https://newsapi.org/
api_key = os.getenv('NEWS_API_KEY') # get API Key from .env file
newsapi = NewsApiClient(api_key=api_key) # news api from https://newsapi.org/
try:
# get all available news sources (e.g BBC, New York Times, etc.)
@ -29,7 +28,8 @@ try:
except KeyError:
print("Error: Could not get sources, may be blocked because of too many requests (free newsapi is limited to 100 reqs per day)")
str_sources = str("Reuters, bbc, cnn, fox-news, google-news, hacker-news, nytimes, the-huffington-post, the-new-york-times, business-insider, bbc-news, cbc-news, ESPN, fox-sports, google-news-uk, independent, the-wall-street-journal, the-washington-times, time, usa-today")
str_sources = str(
"Reuters, bbc, cnn, fox-news, google-news, hacker-news, nytimes, the-huffington-post, the-new-york-times, business-insider, bbc-news, cbc-news, ESPN, fox-sports, google-news-uk, independent, the-wall-street-journal, the-washington-times, time, usa-today")
def get_all_news_by_keyword(keyword, from_date="2000-01-01"):
@ -41,8 +41,8 @@ def get_all_news_by_keyword(keyword, from_date="2000-01-01"):
Returns:
JSON/dict: dict containing articles
"""
top_headlines = newsapi.get_everything(q=keyword, sources=str_sources, language='en', from_param=from_date) # keywords can be combined with OR (e.g. keyword = "bitcoin OR ethereum")
if(top_headlines["status"] == "ok"):
top_headlines = newsapi.get_everything(q=keyword, sources=str_sources, language='en', from_param=from_date) # keywords can be combined with OR (e.g. keyword = "bitcoin OR ethereum")
if (top_headlines["status"] == "ok"):
return top_headlines
else:
return None
@ -56,8 +56,8 @@ def get_top_news_by_keyword(keyword):
Returns:
JSON/dict: dict containing articles
"""
top_headlines = newsapi.get_top_headlines(q=keyword, sources=str_sources, language='en') # get top headlines, measured by popularity from NewsApi
if(top_headlines["status"] == "ok"):
top_headlines = newsapi.get_top_headlines(q=keyword, sources=str_sources, language='en') # get top headlines, measured by popularity from NewsApi
if (top_headlines["status"] == "ok"):
return top_headlines
else:
return None
@ -72,15 +72,15 @@ def format_article(article):
Returns:
String: formatted article
"""
sourcename = hf.make_markdown_proof(article["source"]["name"]) # make attributes markdownv2 proof
sourcename = hf.make_markdown_proof(article["source"]["name"]) # make attributes markdownv2 proof
headline = hf.make_markdown_proof(article["title"])
url = hf.make_markdown_proof(article["url"])
formatted_article = f"_{sourcename}_\n*{headline}*\n\n{url}" # formatting in Markdown syntax
formatted_article = f"_{sourcename}_\n*{headline}*\n\n{url}" # formatting in Markdown syntax
return formatted_article
if __name__ == '__main__': # only execute if script is called directly -> for simple testing
if __name__ == '__main__': # only execute if script is called directly -> for simple testing
print("this is a module and should not be run directly")
print("fetching top news by keyword bitcoin...")

View File

@ -6,10 +6,11 @@ __date__ = "10.05.2022"
__version__ = "1.0.1"
__license__ = "None"
import helper_functions as hf
import investpy
import pandas
from currency_converter import CurrencyConverter
import helper_functions as hf
def get_share_price(str_search_for):
"""get stock price per share for company name or isin or symbol
@ -30,7 +31,7 @@ def get_share_price(str_search_for):
stock_price = round(float(stock_price), 2)
str_return =str(stock_price) + " " + str(currency)
str_return = str(stock_price) + " " + str(currency)
return str_return
@ -50,13 +51,14 @@ def get_share_price(str_search_for):
stock_price = round(float(stock_price), 2)
str_return =str(stock_price) + " EUR"
str_return = str(stock_price) + " EUR"
return str_return
except RuntimeError:
return "None"
def get_share_price_no_currency(str_search_for):
"""get stock price per share for company name or isin or symbol no currency
Args:
@ -65,8 +67,7 @@ def get_share_price_no_currency(str_search_for):
"""
try:
search_result = investpy.search_quotes(text=str_search_for, products=['stocks'],
countries=['germany'], n_results=1)
countries=['germany'], n_results=1)
recent_data = pandas.DataFrame(search_result.retrieve_recent_data())
@ -91,31 +92,35 @@ def get_share_price_no_currency(str_search_for):
stock_price = round(float(stock_price), 2)
str_return =str(stock_price)
str_return = str(stock_price)
return str_return
def get_share_information(str_search_for):
search_result = investpy.search_quotes(text=str_search_for, products=['stocks'],
countries=['germany'], n_results=1)
countries=['germany'], n_results=1)
str_return = "Company: " + search_result.name + "\nSymbol: " + search_result.symbol + "\nCurrent Price/Share: " + get_share_price(str_search_for)
return str_return
def get_share_information_markdown(str_search_for):
search_result = investpy.search_quotes(text=str_search_for, products=['stocks'],
countries=['germany'], n_results=1)
countries=['germany'], n_results=1)
str_return = f'*{hf.make_markdown_proof(search_result.name)}*\n_{hf.make_markdown_proof(search_result.symbol)}_\nworth: {hf.make_markdown_proof(get_share_price(str_search_for))}'
return str_return
def get_share_information_simple(str_search_for):
search_result = investpy.search_quotes(text=str_search_for, products=['stocks'],
countries=['germany'], n_results=1)
countries=['germany'], n_results=1)
str_return = search_result.name + "\n" +search_result.symbol + "\nworth: " + get_share_price(str_search_for)
str_return = search_result.name + "\n" + search_result.symbol + "\nworth: " + get_share_price(str_search_for)
return str_return
if __name__ == "__main__":
print("None")

View File

@ -1,16 +1,16 @@
{
"user": "FloKell",
"share_count": 2,
"shares": [
{
"symbol": "APC.DE",
"price_bought": "50.06",
"amount_bought": "5.1"
},
{
"symbol": "TL0.DE",
"price_bought": "450.06",
"amount_bought": "5.13"
}
]
"user": "FloKell",
"share_count": 2,
"shares": [
{
"symbol": "APC.DE",
"price_bought": "50.06",
"amount_bought": "5.1"
},
{
"symbol": "TL0.DE",
"price_bought": "450.06",
"amount_bought": "5.13"
}
]
}