Removed telebot directory, added telebot to requirements.txt, use environment variable for api key

This commit is contained in:
Administrator 2022-03-12 19:05:40 +01:00
parent fbb74dd6a9
commit 0776585f75
24 changed files with 10 additions and 16078 deletions

11
bot.py
View File

@ -3,17 +3,20 @@
# text bot at t.me/projektaktienbot # text bot at t.me/projektaktienbot
# API Documentation https://core.telegram.org/bots/api # API Documentation https://core.telegram.org/bots/api
# Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started # Code examples https://github.com/eternnoir/pyTelegramBotAPI#getting-started
import os
import telebot import telebot
bot = telebot.TeleBot("5228016873:AAGFrh0P6brag7oD3gxXjCh5gnLLE8JMvMs") bot = telebot.TeleBot(os.getenv('BOT_API_KEY'))
@bot.message_handler(commands=['start', 'help']) @bot.message_handler(commands=['start', 'help'])
def send_welcome(message): def send_welcome(message):
bot.reply_to(message, "Thank you for using this bot") bot.reply_to(message, "Thank you for using this bot")
@bot.message_handler(func=lambda message: True) @bot.message_handler(func=lambda message: True)
def echo_all(message): def echo_all(message):
bot.reply_to(message, message.text) bot.reply_to(message, message.text)
bot.infinity_polling() bot.infinity_polling()

View File

@ -1,4 +1,5 @@
Flask==2.0.2 Flask~=2.0.3
python-dotenv==0.19.2 python-dotenv==0.19.2
requests==2.27.1 requests==2.27.1
uwsgi==2.0.20 uwsgi==2.0.20
pyTelegramBotAPI~=4.4.0

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,328 +0,0 @@
from abc import ABC
from typing import Optional, Union
from telebot.asyncio_handler_backends import State
from telebot import types
class SimpleCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts only message, returns bool value, that is compared with given in handler.
"""
async def check(self, message):
"""
Perform a check.
"""
pass
class AdvancedCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts two parameters, returns bool: True - filter passed, False - filter failed.
message: Message class
text: Filter value given in handler
"""
async def check(self, message, text):
"""
Perform a check.
"""
pass
class TextFilter:
"""
Advanced text filter to check (types.Message, types.CallbackQuery, types.InlineQuery, types.Poll)
example of usage is in examples/custom_filters/advanced_text_filter.py
"""
def __init__(self,
equals: Optional[str] = None,
contains: Optional[Union[list, tuple]] = None,
starts_with: Optional[Union[str, list, tuple]] = None,
ends_with: Optional[Union[str, list, tuple]] = None,
ignore_case: bool = False):
"""
:param equals: string, True if object's text is equal to passed string
:param contains: list[str] or tuple[str], True if any string element of iterable is in text
:param starts_with: string, True if object's text starts with passed string
:param ends_with: string, True if object's text starts with passed string
:param ignore_case: bool (default False), case insensitive
"""
to_check = sum((pattern is not None for pattern in (equals, contains, starts_with, ends_with)))
if to_check == 0:
raise ValueError('None of the check modes was specified')
self.equals = equals
self.contains = self._check_iterable(contains, filter_name='contains')
self.starts_with = self._check_iterable(starts_with, filter_name='starts_with')
self.ends_with = self._check_iterable(ends_with, filter_name='ends_with')
self.ignore_case = ignore_case
def _check_iterable(self, iterable, filter_name):
if not iterable:
pass
elif not isinstance(iterable, str) and not isinstance(iterable, list) and not isinstance(iterable, tuple):
raise ValueError(f"Incorrect value of {filter_name!r}")
elif isinstance(iterable, str):
iterable = [iterable]
elif isinstance(iterable, list) or isinstance(iterable, tuple):
iterable = [i for i in iterable if isinstance(i, str)]
return iterable
async def check(self, obj: Union[types.Message, types.CallbackQuery, types.InlineQuery, types.Poll]):
if isinstance(obj, types.Poll):
text = obj.question
elif isinstance(obj, types.Message):
text = obj.text or obj.caption
elif isinstance(obj, types.CallbackQuery):
text = obj.data
elif isinstance(obj, types.InlineQuery):
text = obj.query
else:
return False
if self.ignore_case:
text = text.lower()
prepare_func = lambda string: str(string).lower()
else:
prepare_func = str
if self.equals:
result = prepare_func(self.equals) == text
if result:
return True
elif not result and not any((self.contains, self.starts_with, self.ends_with)):
return False
if self.contains:
result = any([prepare_func(i) in text for i in self.contains])
if result:
return True
elif not result and not any((self.starts_with, self.ends_with)):
return False
if self.starts_with:
result = any([text.startswith(prepare_func(i)) for i in self.starts_with])
if result:
return True
elif not result and not self.ends_with:
return False
if self.ends_with:
return any([text.endswith(prepare_func(i)) for i in self.ends_with])
return False
class TextMatchFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
@bot.message_handler(text=['account'])
"""
key = 'text'
async def check(self, message, text):
if isinstance(text, TextFilter):
return await text.check(message)
elif type(text) is list:
return message.text in text
else:
return text == message.text
class TextContainsFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
# Will respond if any message.text contains word 'account'
@bot.message_handler(text_contains=['account'])
"""
key = 'text_contains'
async def check(self, message, text):
if not isinstance(text, str) and not isinstance(text, list) and not isinstance(text, tuple):
raise ValueError("Incorrect text_contains value")
elif isinstance(text, str):
text = [text]
elif isinstance(text, list) or isinstance(text, tuple):
text = [i for i in text if isinstance(i, str)]
return any([i in message.text for i in text])
class TextStartsFilter(AdvancedCustomFilter):
"""
Filter to check whether message starts with some text.
Example:
# Will work if message.text starts with 'Sir'.
@bot.message_handler(text_startswith='Sir')
"""
key = 'text_startswith'
async def check(self, message, text):
return message.text.startswith(text)
class ChatFilter(AdvancedCustomFilter):
"""
Check whether chat_id corresponds to given chat_id.
Example:
@bot.message_handler(chat_id=[99999])
"""
key = 'chat_id'
async def check(self, message, text):
return message.chat.id in text
class ForwardFilter(SimpleCustomFilter):
"""
Check whether message was forwarded from channel or group.
Example:
@bot.message_handler(is_forwarded=True)
"""
key = 'is_forwarded'
async def check(self, message):
return message.forward_from_chat is not None
class IsReplyFilter(SimpleCustomFilter):
"""
Check whether message is a reply.
Example:
@bot.message_handler(is_reply=True)
"""
key = 'is_reply'
async def check(self, message):
return message.reply_to_message is not None
class LanguageFilter(AdvancedCustomFilter):
"""
Check users language_code.
Example:
@bot.message_handler(language_code=['ru'])
"""
key = 'language_code'
async def check(self, message, text):
if type(text) is list:
return message.from_user.language_code in text
else:
return message.from_user.language_code == text
class IsAdminFilter(SimpleCustomFilter):
"""
Check whether the user is administrator / owner of the chat.
Example:
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
"""
key = 'is_chat_admin'
def __init__(self, bot):
self._bot = bot
async def check(self, message):
result = await self._bot.get_chat_member(message.chat.id, message.from_user.id)
return result.status in ['creator', 'administrator']
class StateFilter(AdvancedCustomFilter):
"""
Filter to check state.
Example:
@bot.message_handler(state=1)
"""
def __init__(self, bot):
self.bot = bot
key = 'state'
async def check(self, message, text):
if text == '*': return True
# needs to work with callbackquery
if isinstance(message, types.Message):
chat_id = message.chat.id
user_id = message.from_user.id
if isinstance(message, types.CallbackQuery):
chat_id = message.message.chat.id
user_id = message.from_user.id
message = message.message
if isinstance(text, list):
new_text = []
for i in text:
if isinstance(i, State): i = i.name
new_text.append(i)
text = new_text
elif isinstance(text, State):
text = text.name
if message.chat.type == 'group':
group_state = await self.bot.current_states.get_state(user_id, chat_id)
if group_state == text:
return True
elif group_state in text and type(text) is list:
return True
else:
user_state = await self.bot.current_states.get_state(user_id, chat_id)
if user_state == text:
return True
elif type(text) is list and user_state in text:
return True
class IsDigitFilter(SimpleCustomFilter):
"""
Filter to check whether the string is made up of only digits.
Example:
@bot.message_handler(is_digit=True)
"""
key = 'is_digit'
async def check(self, message):
return message.text.isdigit()

View File

@ -1,56 +0,0 @@
class BaseMiddleware:
"""
Base class for middleware.
Your middlewares should be inherited from this class.
"""
def __init__(self):
pass
async def pre_process(self, message, data):
raise NotImplementedError
async def post_process(self, message, data, exception):
raise NotImplementedError
class State:
def __init__(self) -> None:
self.name = None
def __str__(self) -> str:
return self.name
class StatesGroup:
def __init_subclass__(cls) -> None:
for name, value in cls.__dict__.items():
if not name.startswith('__') and not callable(value) and isinstance(value, State):
# change value of that variable
value.name = ':'.join((cls.__name__, name))
class SkipHandler:
"""
Class for skipping handlers.
Just return instance of this class
in middleware to skip handler.
Update will go to post_process,
but will skip execution of handler.
"""
def __init__(self) -> None:
pass
class CancelUpdate:
"""
Class for canceling updates.
Just return instance of this class
in middleware to skip update.
Update will skip handler and execution
of post_process in middlewares.
"""
def __init__(self) -> None:
pass

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +0,0 @@
from telebot.asyncio_storage.memory_storage import StateMemoryStorage
from telebot.asyncio_storage.redis_storage import StateRedisStorage
from telebot.asyncio_storage.pickle_storage import StatePickleStorage
from telebot.asyncio_storage.base_storage import StateContext,StateStorageBase
__all__ = [
'StateStorageBase', 'StateContext',
'StateMemoryStorage', 'StateRedisStorage', 'StatePickleStorage'
]

View File

@ -1,68 +0,0 @@
import copy
class StateStorageBase:
def __init__(self) -> None:
pass
async def set_data(self, chat_id, user_id, key, value):
"""
Set data for a user in a particular chat.
"""
raise NotImplementedError
async def get_data(self, chat_id, user_id):
"""
Get data for a user in a particular chat.
"""
raise NotImplementedError
async def set_state(self, chat_id, user_id, state):
"""
Set state for a particular user.
! Note that you should create a
record if it does not exist, and
if a record with state already exists,
you need to update a record.
"""
raise NotImplementedError
async def delete_state(self, chat_id, user_id):
"""
Delete state for a particular user.
"""
raise NotImplementedError
async def reset_data(self, chat_id, user_id):
"""
Reset data for a particular user in a chat.
"""
raise NotImplementedError
async def get_state(self, chat_id, user_id):
raise NotImplementedError
async def save(self, chat_id, user_id, data):
raise NotImplementedError
class StateContext:
"""
Class for data.
"""
def __init__(self, obj, chat_id, user_id):
self.obj = obj
self.data = None
self.chat_id = chat_id
self.user_id = user_id
async def __aenter__(self):
self.data = copy.deepcopy(await self.obj.get_data(self.chat_id, self.user_id))
return self.data
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.obj.save(self.chat_id, self.user_id, self.data)

View File

@ -1,66 +0,0 @@
from telebot.asyncio_storage.base_storage import StateStorageBase, StateContext
class StateMemoryStorage(StateStorageBase):
def __init__(self) -> None:
self.data = {}
#
# {chat_id: {user_id: {'state': None, 'data': {}}, ...}, ...}
async def set_state(self, chat_id, user_id, state):
if isinstance(state, object):
state = state.name
if chat_id in self.data:
if user_id in self.data[chat_id]:
self.data[chat_id][user_id]['state'] = state
return True
else:
self.data[chat_id][user_id] = {'state': state, 'data': {}}
return True
self.data[chat_id] = {user_id: {'state': state, 'data': {}}}
return True
async def delete_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
del self.data[chat_id][user_id]
if chat_id == user_id:
del self.data[chat_id]
return True
return False
async def get_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['state']
return None
async def get_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['data']
return None
async def reset_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'] = {}
return True
return False
async def set_data(self, chat_id, user_id, key, value):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'][key] = value
return True
raise RuntimeError('chat_id {} and user_id {} does not exist'.format(chat_id, user_id))
def get_interactive_data(self, chat_id, user_id):
return StateContext(self, chat_id, user_id)
async def save(self, chat_id, user_id, data):
self.data[chat_id][user_id]['data'] = data

View File

@ -1,109 +0,0 @@
from telebot.asyncio_storage.base_storage import StateStorageBase, StateContext
import os
import pickle
class StatePickleStorage(StateStorageBase):
def __init__(self, file_path="./.state-save/states.pkl") -> None:
self.file_path = file_path
self.create_dir()
self.data = self.read()
async def convert_old_to_new(self):
# old looks like:
# {1: {'state': 'start', 'data': {'name': 'John'}}
# we should update old version pickle to new.
# new looks like:
# {1: {2: {'state': 'start', 'data': {'name': 'John'}}}}
new_data = {}
for key, value in self.data.items():
# this returns us id and dict with data and state
new_data[key] = {key: value} # convert this to new
# pass it to global data
self.data = new_data
self.update_data() # update data in file
def create_dir(self):
"""
Create directory .save-handlers.
"""
dirs = self.file_path.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
if not os.path.isfile(self.file_path):
with open(self.file_path,'wb') as file:
pickle.dump({}, file)
def read(self):
file = open(self.file_path, 'rb')
data = pickle.load(file)
file.close()
return data
def update_data(self):
file = open(self.file_path, 'wb+')
pickle.dump(self.data, file, protocol=pickle.HIGHEST_PROTOCOL)
file.close()
async def set_state(self, chat_id, user_id, state):
if isinstance(state, object):
state = state.name
if chat_id in self.data:
if user_id in self.data[chat_id]:
self.data[chat_id][user_id]['state'] = state
return True
else:
self.data[chat_id][user_id] = {'state': state, 'data': {}}
return True
self.data[chat_id] = {user_id: {'state': state, 'data': {}}}
self.update_data()
return True
async def delete_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
del self.data[chat_id][user_id]
if chat_id == user_id:
del self.data[chat_id]
self.update_data()
return True
return False
async def get_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['state']
return None
async def get_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['data']
return None
async def reset_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'] = {}
self.update_data()
return True
return False
async def set_data(self, chat_id, user_id, key, value):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'][key] = value
self.update_data()
return True
raise RuntimeError('chat_id {} and user_id {} does not exist'.format(chat_id, user_id))
def get_interactive_data(self, chat_id, user_id):
return StateContext(self, chat_id, user_id)
async def save(self, chat_id, user_id, data):
self.data[chat_id][user_id]['data'] = data
self.update_data()

View File

@ -1,171 +0,0 @@
from telebot.asyncio_storage.base_storage import StateStorageBase, StateContext
import json
redis_installed = True
try:
import aioredis
except:
redis_installed = False
class StateRedisStorage(StateStorageBase):
"""
This class is for Redis storage.
This will work only for states.
To use it, just pass this class to:
TeleBot(storage=StateRedisStorage())
"""
def __init__(self, host='localhost', port=6379, db=0, password=None, prefix='telebot_'):
if not redis_installed:
raise ImportError('AioRedis is not installed. Install it via "pip install aioredis"')
aioredis_version = tuple(map(int, aioredis.__version__.split(".")[0]))
if aioredis_version < (2,):
raise ImportError('Invalid aioredis version. Aioredis version should be >= 2.0.0')
self.redis = aioredis.Redis(host=host, port=port, db=db, password=password)
self.prefix = prefix
#self.con = Redis(connection_pool=self.redis) -> use this when necessary
#
# {chat_id: {user_id: {'state': None, 'data': {}}, ...}, ...}
async def get_record(self, key):
"""
Function to get record from database.
It has nothing to do with states.
Made for backend compatibility
"""
result = await self.redis.get(self.prefix+str(key))
if result: return json.loads(result)
return
async def set_record(self, key, value):
"""
Function to set record to database.
It has nothing to do with states.
Made for backend compatibility
"""
await self.redis.set(self.prefix+str(key), json.dumps(value))
return True
async def delete_record(self, key):
"""
Function to delete record from database.
It has nothing to do with states.
Made for backend compatibility
"""
await self.redis.delete(self.prefix+str(key))
return True
async def set_state(self, chat_id, user_id, state):
"""
Set state for a particular user in a chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if isinstance(state, object):
state = state.name
if response:
if user_id in response:
response[user_id]['state'] = state
else:
response[user_id] = {'state': state, 'data': {}}
else:
response = {user_id: {'state': state, 'data': {}}}
await self.set_record(chat_id, response)
return True
async def delete_state(self, chat_id, user_id):
"""
Delete state for a particular user in a chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
del response[user_id]
if user_id == str(chat_id):
await self.delete_record(chat_id)
return True
else: await self.set_record(chat_id, response)
return True
return False
async def get_value(self, chat_id, user_id, key):
"""
Get value for a data of a user in a chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
if key in response[user_id]['data']:
return response[user_id]['data'][key]
return None
async def get_state(self, chat_id, user_id):
"""
Get state of a user in a chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
return response[user_id]['state']
return None
async def get_data(self, chat_id, user_id):
"""
Get data of particular user in a particular chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
return response[user_id]['data']
return None
async def reset_data(self, chat_id, user_id):
"""
Reset data of a user in a chat.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'] = {}
await self.set_record(chat_id, response)
return True
async def set_data(self, chat_id, user_id, key, value):
"""
Set data without interactive data.
"""
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'][key] = value
await self.set_record(chat_id, response)
return True
return False
def get_interactive_data(self, chat_id, user_id):
"""
Get Data in interactive way.
You can use with() with this function.
"""
return StateContext(self, chat_id, user_id)
async def save(self, chat_id, user_id, data):
response = await self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'] = dict(data, **response[user_id]['data'])
await self.set_record(chat_id, response)
return True

View File

@ -1,115 +0,0 @@
import typing
class CallbackDataFilter:
def __init__(self, factory, config: typing.Dict[str, str]):
self.config = config
self.factory = factory
def check(self, query):
"""
Checks if query.data appropriates to specified config
:param query: telebot.types.CallbackQuery
:return: bool
"""
try:
data = self.factory.parse(query.data)
except ValueError:
return False
for key, value in self.config.items():
if isinstance(value, (list, tuple, set, frozenset)):
if data.get(key) not in value:
return False
elif data.get(key) != value:
return False
return True
class CallbackData:
"""
Callback data factory
This class will help you to work with CallbackQuery
"""
def __init__(self, *parts, prefix: str, sep=':'):
if not isinstance(prefix, str):
raise TypeError(f'Prefix must be instance of str not {type(prefix).__name__}')
if not prefix:
raise ValueError("Prefix can't be empty")
if sep in prefix:
raise ValueError(f"Separator {sep!r} can't be used in prefix")
self.prefix = prefix
self.sep = sep
self._part_names = parts
def new(self, *args, **kwargs) -> str:
"""
Generate callback data
:param args: positional parameters of CallbackData instance parts
:param kwargs: named parameters
:return: str
"""
args = list(args)
data = [self.prefix]
for part in self._part_names:
value = kwargs.pop(part, None)
if value is None:
if args:
value = args.pop(0)
else:
raise ValueError(f'Value for {part!r} was not passed!')
if value is not None and not isinstance(value, str):
value = str(value)
if self.sep in value:
raise ValueError(f"Symbol {self.sep!r} is defined as the separator and can't be used in parts' values")
data.append(value)
if args or kwargs:
raise TypeError('Too many arguments were passed!')
callback_data = self.sep.join(data)
if len(callback_data.encode()) > 64:
raise ValueError('Resulted callback data is too long!')
return callback_data
def parse(self, callback_data: str) -> typing.Dict[str, str]:
"""
Parse data from the callback data
:param callback_data: string, use to telebot.types.CallbackQuery to parse it from string to a dict
:return: dict parsed from callback data
"""
prefix, *parts = callback_data.split(self.sep)
if prefix != self.prefix:
raise ValueError("Passed callback data can't be parsed with that prefix.")
elif len(parts) != len(self._part_names):
raise ValueError('Invalid parts count!')
result = {'@': prefix}
result.update(zip(self._part_names, parts))
return result
def filter(self, **config) -> CallbackDataFilter:
"""
Generate filter
:param config: specified named parameters will be checked with CallbackQury.data
:return: CallbackDataFilter class
"""
for key in config.keys():
if key not in self._part_names:
raise ValueError(f'Invalid field name {key!r}')
return CallbackDataFilter(self, config)

View File

@ -1,336 +0,0 @@
from abc import ABC
from typing import Optional, Union
from telebot.handler_backends import State
from telebot import types
class SimpleCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts only message, returns bool value, that is compared with given in handler.
"""
def check(self, message):
"""
Perform a check.
"""
pass
class AdvancedCustomFilter(ABC):
"""
Simple Custom Filter base class.
Create child class with check() method.
Accepts two parameters, returns bool: True - filter passed, False - filter failed.
message: Message class
text: Filter value given in handler
"""
def check(self, message, text):
"""
Perform a check.
"""
pass
class TextFilter:
"""
Advanced text filter to check (types.Message, types.CallbackQuery, types.InlineQuery, types.Poll)
example of usage is in examples/custom_filters/advanced_text_filter.py
"""
def __init__(self,
equals: Optional[str] = None,
contains: Optional[Union[list, tuple]] = None,
starts_with: Optional[Union[str, list, tuple]] = None,
ends_with: Optional[Union[str, list, tuple]] = None,
ignore_case: bool = False):
"""
:param equals: string, True if object's text is equal to passed string
:param contains: list[str] or tuple[str], True if any string element of iterable is in text
:param starts_with: string, True if object's text starts with passed string
:param ends_with: string, True if object's text starts with passed string
:param ignore_case: bool (default False), case insensitive
"""
to_check = sum((pattern is not None for pattern in (equals, contains, starts_with, ends_with)))
if to_check == 0:
raise ValueError('None of the check modes was specified')
self.equals = equals
self.contains = self._check_iterable(contains, filter_name='contains')
self.starts_with = self._check_iterable(starts_with, filter_name='starts_with')
self.ends_with = self._check_iterable(ends_with, filter_name='ends_with')
self.ignore_case = ignore_case
def _check_iterable(self, iterable, filter_name: str):
if not iterable:
pass
elif not isinstance(iterable, str) and not isinstance(iterable, list) and not isinstance(iterable, tuple):
raise ValueError(f"Incorrect value of {filter_name!r}")
elif isinstance(iterable, str):
iterable = [iterable]
elif isinstance(iterable, list) or isinstance(iterable, tuple):
iterable = [i for i in iterable if isinstance(i, str)]
return iterable
def check(self, obj: Union[types.Message, types.CallbackQuery, types.InlineQuery, types.Poll]):
if isinstance(obj, types.Poll):
text = obj.question
elif isinstance(obj, types.Message):
text = obj.text or obj.caption
elif isinstance(obj, types.CallbackQuery):
text = obj.data
elif isinstance(obj, types.InlineQuery):
text = obj.query
else:
return False
if self.ignore_case:
text = text.lower()
if self.equals:
self.equals = self.equals.lower()
elif self.contains:
self.contains = tuple(map(str.lower, self.contains))
elif self.starts_with:
self.starts_with = tuple(map(str.lower, self.starts_with))
elif self.ends_with:
self.ends_with = tuple(map(str.lower, self.ends_with))
if self.equals:
result = self.equals == text
if result:
return True
elif not result and not any((self.contains, self.starts_with, self.ends_with)):
return False
if self.contains:
result = any([i in text for i in self.contains])
if result:
return True
elif not result and not any((self.starts_with, self.ends_with)):
return False
if self.starts_with:
result = any([text.startswith(i) for i in self.starts_with])
if result:
return True
elif not result and not self.ends_with:
return False
if self.ends_with:
return any([text.endswith(i) for i in self.ends_with])
return False
class TextMatchFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
@bot.message_handler(text=['account'])
"""
key = 'text'
def check(self, message, text):
if isinstance(text, TextFilter):
return text.check(message)
elif type(text) is list:
return message.text in text
else:
return text == message.text
class TextContainsFilter(AdvancedCustomFilter):
"""
Filter to check Text message.
key: text
Example:
# Will respond if any message.text contains word 'account'
@bot.message_handler(text_contains=['account'])
"""
key = 'text_contains'
def check(self, message, text):
if not isinstance(text, str) and not isinstance(text, list) and not isinstance(text, tuple):
raise ValueError("Incorrect text_contains value")
elif isinstance(text, str):
text = [text]
elif isinstance(text, list) or isinstance(text, tuple):
text = [i for i in text if isinstance(i, str)]
return any([i in message.text for i in text])
class TextStartsFilter(AdvancedCustomFilter):
"""
Filter to check whether message starts with some text.
Example:
# Will work if message.text starts with 'Sir'.
@bot.message_handler(text_startswith='Sir')
"""
key = 'text_startswith'
def check(self, message, text):
return message.text.startswith(text)
class ChatFilter(AdvancedCustomFilter):
"""
Check whether chat_id corresponds to given chat_id.
Example:
@bot.message_handler(chat_id=[99999])
"""
key = 'chat_id'
def check(self, message, text):
return message.chat.id in text
class ForwardFilter(SimpleCustomFilter):
"""
Check whether message was forwarded from channel or group.
Example:
@bot.message_handler(is_forwarded=True)
"""
key = 'is_forwarded'
def check(self, message):
return message.forward_from_chat is not None
class IsReplyFilter(SimpleCustomFilter):
"""
Check whether message is a reply.
Example:
@bot.message_handler(is_reply=True)
"""
key = 'is_reply'
def check(self, message):
return message.reply_to_message is not None
class LanguageFilter(AdvancedCustomFilter):
"""
Check users language_code.
Example:
@bot.message_handler(language_code=['ru'])
"""
key = 'language_code'
def check(self, message, text):
if type(text) is list:
return message.from_user.language_code in text
else:
return message.from_user.language_code == text
class IsAdminFilter(SimpleCustomFilter):
"""
Check whether the user is administrator / owner of the chat.
Example:
@bot.message_handler(chat_types=['supergroup'], is_chat_admin=True)
"""
key = 'is_chat_admin'
def __init__(self, bot):
self._bot = bot
def check(self, message):
return self._bot.get_chat_member(message.chat.id, message.from_user.id).status in ['creator', 'administrator']
class StateFilter(AdvancedCustomFilter):
"""
Filter to check state.
Example:
@bot.message_handler(state=1)
"""
def __init__(self, bot):
self.bot = bot
key = 'state'
def check(self, message, text):
if text == '*': return True
# needs to work with callbackquery
if isinstance(message, types.Message):
chat_id = message.chat.id
user_id = message.from_user.id
if isinstance(message, types.CallbackQuery):
chat_id = message.message.chat.id
user_id = message.from_user.id
message = message.message
if isinstance(text, list):
new_text = []
for i in text:
if isinstance(i, State): i = i.name
new_text.append(i)
text = new_text
elif isinstance(text, State):
text = text.name
if message.chat.type == 'group':
group_state = self.bot.current_states.get_state(user_id, chat_id)
if group_state == text:
return True
elif group_state in text and type(text) is list:
return True
else:
user_state = self.bot.current_states.get_state(user_id, chat_id)
if user_state == text:
return True
elif type(text) is list and user_state in text:
return True
class IsDigitFilter(SimpleCustomFilter):
"""
Filter to check whether the string is made up of only digits.
Example:
@bot.message_handler(is_digit=True)
"""
key = 'is_digit'
def check(self, message):
return message.text.isdigit()

View File

@ -1,206 +0,0 @@
import os
import pickle
import threading
from telebot import apihelper
try:
from redis import Redis
redis_installed = True
except:
redis_installed = False
class HandlerBackend(object):
"""
Class for saving (next step|reply) handlers
"""
def __init__(self, handlers=None):
if handlers is None:
handlers = {}
self.handlers = handlers
def register_handler(self, handler_group_id, handler):
raise NotImplementedError()
def clear_handlers(self, handler_group_id):
raise NotImplementedError()
def get_handlers(self, handler_group_id):
raise NotImplementedError()
class MemoryHandlerBackend(HandlerBackend):
def register_handler(self, handler_group_id, handler):
if handler_group_id in self.handlers:
self.handlers[handler_group_id].append(handler)
else:
self.handlers[handler_group_id] = [handler]
def clear_handlers(self, handler_group_id):
self.handlers.pop(handler_group_id, None)
def get_handlers(self, handler_group_id):
return self.handlers.pop(handler_group_id, None)
def load_handlers(self, filename, del_file_after_loading):
raise NotImplementedError()
class FileHandlerBackend(HandlerBackend):
def __init__(self, handlers=None, filename='./.handler-saves/handlers.save', delay=120):
super(FileHandlerBackend, self).__init__(handlers)
self.filename = filename
self.delay = delay
self.timer = threading.Timer(delay, self.save_handlers)
def register_handler(self, handler_group_id, handler):
if handler_group_id in self.handlers:
self.handlers[handler_group_id].append(handler)
else:
self.handlers[handler_group_id] = [handler]
self.start_save_timer()
def clear_handlers(self, handler_group_id):
self.handlers.pop(handler_group_id, None)
self.start_save_timer()
def get_handlers(self, handler_group_id):
handlers = self.handlers.pop(handler_group_id, None)
self.start_save_timer()
return handlers
def start_save_timer(self):
if not self.timer.is_alive():
if self.delay <= 0:
self.save_handlers()
else:
self.timer = threading.Timer(self.delay, self.save_handlers)
self.timer.start()
def save_handlers(self):
self.dump_handlers(self.handlers, self.filename)
def load_handlers(self, filename=None, del_file_after_loading=True):
if not filename:
filename = self.filename
tmp = self.return_load_handlers(filename, del_file_after_loading=del_file_after_loading)
if tmp is not None:
self.handlers.update(tmp)
@staticmethod
def dump_handlers(handlers, filename, file_mode="wb"):
dirs = filename.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
with open(filename + ".tmp", file_mode) as file:
if (apihelper.CUSTOM_SERIALIZER is None):
pickle.dump(handlers, file)
else:
apihelper.CUSTOM_SERIALIZER.dump(handlers, file)
if os.path.isfile(filename):
os.remove(filename)
os.rename(filename + ".tmp", filename)
@staticmethod
def return_load_handlers(filename, del_file_after_loading=True):
if os.path.isfile(filename) and os.path.getsize(filename) > 0:
with open(filename, "rb") as file:
if (apihelper.CUSTOM_SERIALIZER is None):
handlers = pickle.load(file)
else:
handlers = apihelper.CUSTOM_SERIALIZER.load(file)
if del_file_after_loading:
os.remove(filename)
return handlers
class RedisHandlerBackend(HandlerBackend):
def __init__(self, handlers=None, host='localhost', port=6379, db=0, prefix='telebot', password=None):
super(RedisHandlerBackend, self).__init__(handlers)
if not redis_installed:
raise Exception("Redis is not installed. Install it via 'pip install redis'")
self.prefix = prefix
self.redis = Redis(host, port, db, password)
def _key(self, handle_group_id):
return ':'.join((self.prefix, str(handle_group_id)))
def register_handler(self, handler_group_id, handler):
handlers = []
value = self.redis.get(self._key(handler_group_id))
if value:
handlers = pickle.loads(value)
handlers.append(handler)
self.redis.set(self._key(handler_group_id), pickle.dumps(handlers))
def clear_handlers(self, handler_group_id):
self.redis.delete(self._key(handler_group_id))
def get_handlers(self, handler_group_id):
handlers = None
value = self.redis.get(self._key(handler_group_id))
if value:
handlers = pickle.loads(value)
self.clear_handlers(handler_group_id)
return handlers
class State:
def __init__(self) -> None:
self.name = None
def __str__(self) -> str:
return self.name
class StatesGroup:
def __init_subclass__(cls) -> None:
for name, value in cls.__dict__.items():
if not name.startswith('__') and not callable(value) and isinstance(value, State):
# change value of that variable
value.name = ':'.join((cls.__name__, name))
class BaseMiddleware:
"""
Base class for middleware.
Your middlewares should be inherited from this class.
"""
def __init__(self):
pass
def pre_process(self, message, data):
raise NotImplementedError
def post_process(self, message, data, exception):
raise NotImplementedError
class SkipHandler:
"""
Class for skipping handlers.
Just return instance of this class
in middleware to skip handler.
Update will go to post_process,
but will skip execution of handler.
"""
def __init__(self) -> None:
pass
class CancelUpdate:
"""
Class for canceling updates.
Just return instance of this class
in middleware to skip update.
Update will skip handler and execution
of post_process in middlewares.
"""
def __init__(self) -> None:
pass

View File

@ -1,13 +0,0 @@
from telebot.storage.memory_storage import StateMemoryStorage
from telebot.storage.redis_storage import StateRedisStorage
from telebot.storage.pickle_storage import StatePickleStorage
from telebot.storage.base_storage import StateContext,StateStorageBase
__all__ = [
'StateStorageBase', 'StateContext',
'StateMemoryStorage', 'StateRedisStorage', 'StatePickleStorage'
]

View File

@ -1,65 +0,0 @@
import copy
class StateStorageBase:
def __init__(self) -> None:
pass
def set_data(self, chat_id, user_id, key, value):
"""
Set data for a user in a particular chat.
"""
raise NotImplementedError
def get_data(self, chat_id, user_id):
"""
Get data for a user in a particular chat.
"""
raise NotImplementedError
def set_state(self, chat_id, user_id, state):
"""
Set state for a particular user.
! Note that you should create a
record if it does not exist, and
if a record with state already exists,
you need to update a record.
"""
raise NotImplementedError
def delete_state(self, chat_id, user_id):
"""
Delete state for a particular user.
"""
raise NotImplementedError
def reset_data(self, chat_id, user_id):
"""
Reset data for a particular user in a chat.
"""
raise NotImplementedError
def get_state(self, chat_id, user_id):
raise NotImplementedError
def save(self, chat_id, user_id, data):
raise NotImplementedError
class StateContext:
"""
Class for data.
"""
def __init__(self , obj, chat_id, user_id) -> None:
self.obj = obj
self.data = copy.deepcopy(obj.get_data(chat_id, user_id))
self.chat_id = chat_id
self.user_id = user_id
def __enter__(self):
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
return self.obj.save(self.chat_id, self.user_id, self.data)

View File

@ -1,67 +0,0 @@
from telebot.storage.base_storage import StateStorageBase, StateContext
class StateMemoryStorage(StateStorageBase):
def __init__(self) -> None:
self.data = {}
#
# {chat_id: {user_id: {'state': None, 'data': {}}, ...}, ...}
def set_state(self, chat_id, user_id, state):
if isinstance(state, object):
state = state.name
if chat_id in self.data:
if user_id in self.data[chat_id]:
self.data[chat_id][user_id]['state'] = state
return True
else:
self.data[chat_id][user_id] = {'state': state, 'data': {}}
return True
self.data[chat_id] = {user_id: {'state': state, 'data': {}}}
return True
def delete_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
del self.data[chat_id][user_id]
if chat_id == user_id:
del self.data[chat_id]
return True
return False
def get_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['state']
return None
def get_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['data']
return None
def reset_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'] = {}
return True
return False
def set_data(self, chat_id, user_id, key, value):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'][key] = value
return True
raise RuntimeError('chat_id {} and user_id {} does not exist'.format(chat_id, user_id))
def get_interactive_data(self, chat_id, user_id):
return StateContext(self, chat_id, user_id)
def save(self, chat_id, user_id, data):
self.data[chat_id][user_id]['data'] = data

View File

@ -1,115 +0,0 @@
from telebot.storage.base_storage import StateStorageBase, StateContext
import os
import pickle
class StatePickleStorage(StateStorageBase):
# noinspection PyMissingConstructor
def __init__(self, file_path="./.state-save/states.pkl") -> None:
self.file_path = file_path
self.create_dir()
self.data = self.read()
def convert_old_to_new(self):
"""
Use this function to convert old storage to new storage.
This function is for people who was using pickle storage
that was in version <=4.3.1.
"""
# old looks like:
# {1: {'state': 'start', 'data': {'name': 'John'}}
# we should update old version pickle to new.
# new looks like:
# {1: {2: {'state': 'start', 'data': {'name': 'John'}}}}
new_data = {}
for key, value in self.data.items():
# this returns us id and dict with data and state
new_data[key] = {key: value} # convert this to new
# pass it to global data
self.data = new_data
self.update_data() # update data in file
def create_dir(self):
"""
Create directory .save-handlers.
"""
dirs = self.file_path.rsplit('/', maxsplit=1)[0]
os.makedirs(dirs, exist_ok=True)
if not os.path.isfile(self.file_path):
with open(self.file_path,'wb') as file:
pickle.dump({}, file)
def read(self):
file = open(self.file_path, 'rb')
data = pickle.load(file)
file.close()
return data
def update_data(self):
file = open(self.file_path, 'wb+')
pickle.dump(self.data, file, protocol=pickle.HIGHEST_PROTOCOL)
file.close()
def set_state(self, chat_id, user_id, state):
if isinstance(state, object):
state = state.name
if chat_id in self.data:
if user_id in self.data[chat_id]:
self.data[chat_id][user_id]['state'] = state
return True
else:
self.data[chat_id][user_id] = {'state': state, 'data': {}}
return True
self.data[chat_id] = {user_id: {'state': state, 'data': {}}}
self.update_data()
return True
def delete_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
del self.data[chat_id][user_id]
if chat_id == user_id:
del self.data[chat_id]
self.update_data()
return True
return False
def get_state(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['state']
return None
def get_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
return self.data[chat_id][user_id]['data']
return None
def reset_data(self, chat_id, user_id):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'] = {}
self.update_data()
return True
return False
def set_data(self, chat_id, user_id, key, value):
if self.data.get(chat_id):
if self.data[chat_id].get(user_id):
self.data[chat_id][user_id]['data'][key] = value
self.update_data()
return True
raise RuntimeError('chat_id {} and user_id {} does not exist'.format(chat_id, user_id))
def get_interactive_data(self, chat_id, user_id):
return StateContext(self, chat_id, user_id)
def save(self, chat_id, user_id, data):
self.data[chat_id][user_id]['data'] = data
self.update_data()

View File

@ -1,180 +0,0 @@
from pyclbr import Class
from telebot.storage.base_storage import StateStorageBase, StateContext
import json
redis_installed = True
try:
from redis import Redis, ConnectionPool
except:
redis_installed = False
class StateRedisStorage(StateStorageBase):
"""
This class is for Redis storage.
This will work only for states.
To use it, just pass this class to:
TeleBot(storage=StateRedisStorage())
"""
def __init__(self, host='localhost', port=6379, db=0, password=None, prefix='telebot_'):
self.redis = ConnectionPool(host=host, port=port, db=db, password=password)
#self.con = Redis(connection_pool=self.redis) -> use this when necessary
#
# {chat_id: {user_id: {'state': None, 'data': {}}, ...}, ...}
self.prefix = prefix
if not redis_installed:
raise Exception("Redis is not installed. Install it via 'pip install redis'")
def get_record(self, key):
"""
Function to get record from database.
It has nothing to do with states.
Made for backend compatibility
"""
connection = Redis(connection_pool=self.redis)
result = connection.get(self.prefix+str(key))
connection.close()
if result: return json.loads(result)
return
def set_record(self, key, value):
"""
Function to set record to database.
It has nothing to do with states.
Made for backend compatibility
"""
connection = Redis(connection_pool=self.redis)
connection.set(self.prefix+str(key), json.dumps(value))
connection.close()
return True
def delete_record(self, key):
"""
Function to delete record from database.
It has nothing to do with states.
Made for backend compatibility
"""
connection = Redis(connection_pool=self.redis)
connection.delete(self.prefix+str(key))
connection.close()
return True
def set_state(self, chat_id, user_id, state):
"""
Set state for a particular user in a chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if isinstance(state, object):
state = state.name
if response:
if user_id in response:
response[user_id]['state'] = state
else:
response[user_id] = {'state': state, 'data': {}}
else:
response = {user_id: {'state': state, 'data': {}}}
self.set_record(chat_id, response)
return True
def delete_state(self, chat_id, user_id):
"""
Delete state for a particular user in a chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
del response[user_id]
if user_id == str(chat_id):
self.delete_record(chat_id)
return True
else: self.set_record(chat_id, response)
return True
return False
def get_value(self, chat_id, user_id, key):
"""
Get value for a data of a user in a chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
if key in response[user_id]['data']:
return response[user_id]['data'][key]
return None
def get_state(self, chat_id, user_id):
"""
Get state of a user in a chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
return response[user_id]['state']
return None
def get_data(self, chat_id, user_id):
"""
Get data of particular user in a particular chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
return response[user_id]['data']
return None
def reset_data(self, chat_id, user_id):
"""
Reset data of a user in a chat.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'] = {}
self.set_record(chat_id, response)
return True
def set_data(self, chat_id, user_id, key, value):
"""
Set data without interactive data.
"""
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'][key] = value
self.set_record(chat_id, response)
return True
return False
def get_interactive_data(self, chat_id, user_id):
"""
Get Data in interactive way.
You can use with() with this function.
"""
return StateContext(self, chat_id, user_id)
def save(self, chat_id, user_id, data):
response = self.get_record(chat_id)
user_id = str(user_id)
if response:
if user_id in response:
response[user_id]['data'] = dict(data, **response[user_id]['data'])
self.set_record(chat_id, response)
return True

File diff suppressed because it is too large Load Diff

View File

@ -1,512 +0,0 @@
# -*- coding: utf-8 -*-
import random
import re
import string
import threading
import traceback
from typing import Any, Callable, List, Dict, Optional, Union
# noinspection PyPep8Naming
import queue as Queue
import logging
from telebot import types
try:
import ujson as json
except ImportError:
import json
try:
# noinspection PyPackageRequirements
from PIL import Image
from io import BytesIO
pil_imported = True
except:
pil_imported = False
MAX_MESSAGE_LENGTH = 4096
logger = logging.getLogger('TeleBot')
thread_local = threading.local()
content_type_media = [
'text', 'audio', 'animation', 'document', 'photo', 'sticker', 'video', 'video_note', 'voice', 'contact', 'dice', 'poll',
'venue', 'location'
]
content_type_service = [
'new_chat_members', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created',
'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message',
'proximity_alert_triggered', 'voice_chat_scheduled', 'voice_chat_started', 'voice_chat_ended',
'voice_chat_participants_invited', 'message_auto_delete_timer_changed'
]
update_types = [
"update_id", "message", "edited_message", "channel_post", "edited_channel_post", "inline_query",
"chosen_inline_result", "callback_query", "shipping_query", "pre_checkout_query", "poll", "poll_answer",
"my_chat_member", "chat_member", "chat_join_request"
]
class WorkerThread(threading.Thread):
count = 0
def __init__(self, exception_callback=None, queue=None, name=None):
if not name:
name = "WorkerThread{0}".format(self.__class__.count + 1)
self.__class__.count += 1
if not queue:
queue = Queue.Queue()
threading.Thread.__init__(self, name=name)
self.queue = queue
self.daemon = True
self.received_task_event = threading.Event()
self.done_event = threading.Event()
self.exception_event = threading.Event()
self.continue_event = threading.Event()
self.exception_callback = exception_callback
self.exception_info = None
self._running = True
self.start()
def run(self):
while self._running:
try:
task, args, kwargs = self.queue.get(block=True, timeout=.5)
self.continue_event.clear()
self.received_task_event.clear()
self.done_event.clear()
self.exception_event.clear()
logger.debug("Received task")
self.received_task_event.set()
task(*args, **kwargs)
logger.debug("Task complete")
self.done_event.set()
except Queue.Empty:
pass
except Exception as e:
logger.debug(type(e).__name__ + " occurred, args=" + str(e.args) + "\n" + traceback.format_exc())
self.exception_info = e
self.exception_event.set()
if self.exception_callback:
self.exception_callback(self, self.exception_info)
self.continue_event.wait()
def put(self, task, *args, **kwargs):
self.queue.put((task, args, kwargs))
def raise_exceptions(self):
if self.exception_event.is_set():
raise self.exception_info
def clear_exceptions(self):
self.exception_event.clear()
self.continue_event.set()
def stop(self):
self._running = False
class ThreadPool:
def __init__(self, telebot, num_threads=2):
self.telebot = telebot
self.tasks = Queue.Queue()
self.workers = [WorkerThread(self.on_exception, self.tasks) for _ in range(num_threads)]
self.num_threads = num_threads
self.exception_event = threading.Event()
self.exception_info = None
def put(self, func, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def on_exception(self, worker_thread, exc_info):
if self.telebot.exception_handler is not None:
handled = self.telebot.exception_handler.handle(exc_info)
else:
handled = False
if not handled:
self.exception_info = exc_info
self.exception_event.set()
worker_thread.continue_event.set()
def raise_exceptions(self):
if self.exception_event.is_set():
raise self.exception_info
def clear_exceptions(self):
self.exception_event.clear()
def close(self):
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.join()
class AsyncTask:
def __init__(self, target, *args, **kwargs):
self.target = target
self.args = args
self.kwargs = kwargs
self.done = False
self.thread = threading.Thread(target=self._run)
self.thread.start()
def _run(self):
try:
self.result = self.target(*self.args, **self.kwargs)
except Exception as e:
self.result = e
self.done = True
def wait(self):
if not self.done:
self.thread.join()
if isinstance(self.result, BaseException):
raise self.result
else:
return self.result
class CustomRequestResponse():
def __init__(self, json_text, status_code = 200, reason = ""):
self.status_code = status_code
self.text = json_text
self.reason = reason
def json(self):
return json.loads(self.text)
def async_dec():
def decorator(fn):
def wrapper(*args, **kwargs):
return AsyncTask(fn, *args, **kwargs)
return wrapper
return decorator
def is_string(var):
return isinstance(var, str)
def is_dict(var):
return isinstance(var, dict)
def is_bytes(var):
return isinstance(var, bytes)
def is_pil_image(var):
return pil_imported and isinstance(var, Image.Image)
def pil_image_to_file(image, extension='JPEG', quality='web_low'):
if pil_imported:
photoBuffer = BytesIO()
image.convert('RGB').save(photoBuffer, extension, quality=quality)
photoBuffer.seek(0)
return photoBuffer
else:
raise RuntimeError('PIL module is not imported')
def is_command(text: str) -> bool:
"""
Checks if `text` is a command. Telegram chat commands start with the '/' character.
:param text: Text to check.
:return: True if `text` is a command, else False.
"""
if text is None: return False
return text.startswith('/')
def extract_command(text: str) -> Union[str, None]:
"""
Extracts the command from `text` (minus the '/') if `text` is a command (see is_command).
If `text` is not a command, this function returns None.
Examples:
extract_command('/help'): 'help'
extract_command('/help@BotName'): 'help'
extract_command('/search black eyed peas'): 'search'
extract_command('Good day to you'): None
:param text: String to extract the command from
:return: the command if `text` is a command (according to is_command), else None.
"""
if text is None: return None
return text.split()[0].split('@')[0][1:] if is_command(text) else None
def extract_arguments(text: str) -> str:
"""
Returns the argument after the command.
Examples:
extract_arguments("/get name"): 'name'
extract_arguments("/get"): ''
extract_arguments("/get@botName name"): 'name'
:param text: String to extract the arguments from a command
:return: the arguments if `text` is a command (according to is_command), else None.
"""
regexp = re.compile(r"/\w*(@\w*)*\s*([\s\S]*)", re.IGNORECASE)
result = regexp.match(text)
return result.group(2) if is_command(text) else None
def split_string(text: str, chars_per_string: int) -> List[str]:
"""
Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string.
This is very useful for splitting one giant message into multiples.
:param text: The text to split
:param chars_per_string: The number of characters per line the text is split into.
:return: The splitted text as a list of strings.
"""
return [text[i:i + chars_per_string] for i in range(0, len(text), chars_per_string)]
def smart_split(text: str, chars_per_string: int=MAX_MESSAGE_LENGTH) -> List[str]:
r"""
Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string.
This is very useful for splitting one giant message into multiples.
If `chars_per_string` > 4096: `chars_per_string` = 4096.
Splits by '\n', '. ' or ' ' in exactly this priority.
:param text: The text to split
:param chars_per_string: The number of maximum characters per part the text is split to.
:return: The splitted text as a list of strings.
"""
def _text_before_last(substr: str) -> str:
return substr.join(part.split(substr)[:-1]) + substr
if chars_per_string > MAX_MESSAGE_LENGTH: chars_per_string = MAX_MESSAGE_LENGTH
parts = []
while True:
if len(text) < chars_per_string:
parts.append(text)
return parts
part = text[:chars_per_string]
if "\n" in part: part = _text_before_last("\n")
elif ". " in part: part = _text_before_last(". ")
elif " " in part: part = _text_before_last(" ")
parts.append(part)
text = text[len(part):]
def escape(text: str) -> str:
"""
Replaces the following chars in `text` ('&' with '&amp;', '<' with '&lt;' and '>' with '&gt;').
:param text: the text to escape
:return: the escaped text
"""
chars = {"&": "&amp;", "<": "&lt;", ">": "&gt;"}
for old, new in chars.items(): text = text.replace(old, new)
return text
def user_link(user: types.User, include_id: bool=False) -> str:
"""
Returns an HTML user link. This is useful for reports.
Attention: Don't forget to set parse_mode to 'HTML'!
Example:
bot.send_message(your_user_id, user_link(message.from_user) + ' started the bot!', parse_mode='HTML')
:param user: the user (not the user_id)
:param include_id: include the user_id
:return: HTML user link
"""
name = escape(user.first_name)
return (f"<a href='tg://user?id={user.id}'>{name}</a>"
+ (f" (<pre>{user.id}</pre>)" if include_id else ""))
def quick_markup(values: Dict[str, Dict[str, Any]], row_width: int=2) -> types.InlineKeyboardMarkup:
"""
Returns a reply markup from a dict in this format: {'text': kwargs}
This is useful to avoid always typing 'btn1 = InlineKeyboardButton(...)' 'btn2 = InlineKeyboardButton(...)'
Example:
.. code-block:: python
quick_markup({
'Twitter': {'url': 'https://twitter.com'},
'Facebook': {'url': 'https://facebook.com'},
'Back': {'callback_data': 'whatever'}
}, row_width=2):
# returns an InlineKeyboardMarkup with two buttons in a row, one leading to Twitter, the other to facebook
# and a back button below
# kwargs can be:
{
'url': None,
'callback_data': None,
'switch_inline_query': None,
'switch_inline_query_current_chat': None,
'callback_game': None,
'pay': None,
'login_url': None
}
:param values: a dict containing all buttons to create in this format: {text: kwargs} {str:}
:param row_width: int row width
:return: InlineKeyboardMarkup
"""
markup = types.InlineKeyboardMarkup(row_width=row_width)
buttons = [
types.InlineKeyboardButton(text=text, **kwargs)
for text, kwargs in values.items()
]
markup.add(*buttons)
return markup
# CREDITS TO http://stackoverflow.com/questions/12317940#answer-12320352
def or_set(self):
self._set()
self.changed()
def or_clear(self):
self._clear()
self.changed()
def orify(e, changed_callback):
if not hasattr(e, "_set"):
e._set = e.set
if not hasattr(e, "_clear"):
e._clear = e.clear
e.changed = changed_callback
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)
def OrEvent(*events):
or_event = threading.Event()
def changed():
bools = [ev.is_set() for ev in events]
if any(bools):
or_event.set()
else:
or_event.clear()
def busy_wait():
while not or_event.is_set():
# noinspection PyProtectedMember
or_event._wait(3)
for e in events:
orify(e, changed)
or_event._wait = or_event.wait
or_event.wait = busy_wait
changed()
return or_event
def per_thread(key, construct_value, reset=False):
if reset or not hasattr(thread_local, key):
value = construct_value()
setattr(thread_local, key, value)
return getattr(thread_local, key)
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
# https://stackoverflow.com/a/312464/9935473
for i in range(0, len(lst), n):
yield lst[i:i + n]
def generate_random_token():
return ''.join(random.sample(string.ascii_letters, 16))
def deprecated(warn: bool=True, alternative: Optional[Callable]=None):
"""
Use this decorator to mark functions as deprecated.
When the function is used, an info (or warning if `warn` is True) is logged.
:param warn: If True a warning is logged else an info
:param alternative: The new function to use instead
"""
def decorator(function):
def wrapper(*args, **kwargs):
info = f"`{function.__name__}` is deprecated." + (f" Use `{alternative.__name__}` instead" if alternative else "")
if not warn:
logger.info(info)
else:
logger.warning(info)
return function(*args, **kwargs)
return wrapper
return decorator
# Cloud helpers
def webhook_google_functions(bot, request):
"""A webhook endpoint for Google Cloud Functions FaaS."""
if request.is_json:
try:
request_json = request.get_json()
update = types.Update.de_json(request_json)
bot.process_new_updates([update])
return ''
except Exception as e:
print(e)
return 'Bot FAIL', 400
else:
return 'Bot ON'
def antiflood(function, *args, **kwargs):
"""
Use this function inside loops in order to avoid getting TooManyRequests error.
Example:
.. code-block:: python3
from telebot.util import antiflood
for chat_id in chat_id_list:
msg = antiflood(bot.send_message, chat_id, text)
:param function:
:param args:
:param kwargs:
:return: None
"""
from telebot.apihelper import ApiTelegramException
from time import sleep
msg = None
try:
msg = function(*args, **kwargs)
except ApiTelegramException as ex:
if ex.error_code == 429:
sleep(ex.result_json['parameters']['retry_after'])
msg = function(*args, **kwargs)
finally:
return msg

View File

@ -1,3 +0,0 @@
# Versions should comply with PEP440.
# This line is parsed in setup.py:
__version__ = '4.4.0'