summaryrefslogtreecommitdiffhomepage
path: root/libs/tqdm/contrib/telegram.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tqdm/contrib/telegram.py')
-rw-r--r--libs/tqdm/contrib/telegram.py153
1 files changed, 88 insertions, 65 deletions
diff --git a/libs/tqdm/contrib/telegram.py b/libs/tqdm/contrib/telegram.py
index 5654dc99c..99cbe8c88 100644
--- a/libs/tqdm/contrib/telegram.py
+++ b/libs/tqdm/contrib/telegram.py
@@ -1,126 +1,149 @@
"""
Sends updates to a Telegram bot.
+
+Usage:
+>>> from tqdm.contrib.telegram import tqdm, trange
+>>> for i in trange(10, token='{token}', chat_id='{chat_id}'):
+... ...
+
+![screenshot](https://img.tqdm.ml/screenshot-telegram.gif)
"""
from __future__ import absolute_import
-from concurrent.futures import ThreadPoolExecutor
+from os import getenv
+from warnings import warn
+
from requests import Session
-from tqdm.auto import tqdm as tqdm_auto
-from tqdm.utils import _range
+from ..auto import tqdm as tqdm_auto
+from ..std import TqdmWarning
+from ..utils import _range
+from .utils_worker import MonoWorker
+
__author__ = {"github.com/": ["casperdcl"]}
__all__ = ['TelegramIO', 'tqdm_telegram', 'ttgrange', 'tqdm', 'trange']
-class TelegramIO():
- """Non-blocking file-like IO to a Telegram Bot."""
+class TelegramIO(MonoWorker):
+ """Non-blocking file-like IO using a Telegram Bot."""
API = 'https://api.telegram.org/bot'
def __init__(self, token, chat_id):
"""Creates a new message in the given `chat_id`."""
+ super(TelegramIO, self).__init__()
self.token = token
self.chat_id = chat_id
- self.session = session = Session()
+ self.session = Session()
self.text = self.__class__.__name__
- self.pool = ThreadPoolExecutor()
- self.futures = []
+ self.message_id
+
+ @property
+ def message_id(self):
+ if hasattr(self, '_message_id'):
+ return self._message_id
try:
- res = session.post(
+ res = self.session.post(
self.API + '%s/sendMessage' % self.token,
- data=dict(text='`' + self.text + '`', chat_id=self.chat_id,
- parse_mode='MarkdownV2'))
+ data={'text': '`' + self.text + '`', 'chat_id': self.chat_id,
+ 'parse_mode': 'MarkdownV2'}).json()
except Exception as e:
tqdm_auto.write(str(e))
else:
- self.message_id = res.json()['result']['message_id']
+ if res.get('error_code') == 429:
+ warn("Creation rate limit: try increasing `mininterval`.",
+ TqdmWarning, stacklevel=2)
+ else:
+ self._message_id = res['result']['message_id']
+ return self._message_id
def write(self, s):
"""Replaces internal `message_id`'s text with `s`."""
if not s:
- return
- s = s.strip().replace('\r', '')
+ s = "..."
+ s = s.replace('\r', '').strip()
if s == self.text:
return # avoid duplicate message Bot error
+ message_id = self.message_id
+ if message_id is None:
+ return
self.text = s
try:
- f = self.pool.submit(
- self.session.post,
- self.API + '%s/editMessageText' % self.token,
- data=dict(
- text='`' + s + '`', chat_id=self.chat_id,
- message_id=self.message_id, parse_mode='MarkdownV2'))
+ future = self.submit(
+ self.session.post, self.API + '%s/editMessageText' % self.token,
+ data={'text': '`' + s + '`', 'chat_id': self.chat_id,
+ 'message_id': message_id, 'parse_mode': 'MarkdownV2'})
except Exception as e:
tqdm_auto.write(str(e))
else:
- self.futures.append(f)
- return f
+ return future
- def flush(self):
- """Ensure the last `write` has been processed."""
- [f.cancel() for f in self.futures[-2::-1]]
+ def delete(self):
+ """Deletes internal `message_id`."""
try:
- return self.futures[-1].result()
- except IndexError:
- pass
- finally:
- self.futures = []
-
- def __del__(self):
- self.flush()
+ future = self.submit(
+ self.session.post, self.API + '%s/deleteMessage' % self.token,
+ data={'chat_id': self.chat_id, 'message_id': self.message_id})
+ except Exception as e:
+ tqdm_auto.write(str(e))
+ else:
+ return future
class tqdm_telegram(tqdm_auto):
"""
- Standard `tqdm.auto.tqdm` but also sends updates to a Telegram bot.
- May take a few seconds to create (`__init__`) and clear (`__del__`).
+ Standard `tqdm.auto.tqdm` but also sends updates to a Telegram Bot.
+ May take a few seconds to create (`__init__`).
+
+ - create a bot <https://core.telegram.org/bots#6-botfather>
+ - copy its `{token}`
+ - add the bot to a chat and send it a message such as `/start`
+ - go to <https://api.telegram.org/bot`{token}`/getUpdates> to find out
+ the `{chat_id}`
+ - paste the `{token}` & `{chat_id}` below
>>> from tqdm.contrib.telegram import tqdm, trange
- >>> for i in tqdm(
- ... iterable,
- ... token='1234567890:THIS1SSOMETOKEN0BTAINeDfrOmTELEGrAM',
- ... chat_id='0246813579'):
+ >>> for i in tqdm(iterable, token='{token}', chat_id='{chat_id}'):
+ ... ...
"""
def __init__(self, *args, **kwargs):
"""
Parameters
----------
- token : str, required. Telegram token.
- chat_id : str, required. Telegram chat ID.
+ token : str, required. Telegram token
+ [default: ${TQDM_TELEGRAM_TOKEN}].
+ chat_id : str, required. Telegram chat ID
+ [default: ${TQDM_TELEGRAM_CHAT_ID}].
See `tqdm.auto.tqdm.__init__` for other parameters.
"""
- self.tgio = TelegramIO(kwargs.pop('token'), kwargs.pop('chat_id'))
+ if not kwargs.get('disable'):
+ kwargs = kwargs.copy()
+ self.tgio = TelegramIO(
+ kwargs.pop('token', getenv('TQDM_TELEGRAM_TOKEN')),
+ kwargs.pop('chat_id', getenv('TQDM_TELEGRAM_CHAT_ID')))
super(tqdm_telegram, self).__init__(*args, **kwargs)
def display(self, **kwargs):
super(tqdm_telegram, self).display(**kwargs)
fmt = self.format_dict
- if 'bar_format' in fmt and fmt['bar_format']:
- fmt['bar_format'] = fmt['bar_format'].replace('<bar/>', '{bar}')
+ if fmt.get('bar_format', None):
+ fmt['bar_format'] = fmt['bar_format'].replace(
+ '<bar/>', '{bar:10u}').replace('{bar}', '{bar:10u}')
else:
- fmt['bar_format'] = '{l_bar}{bar}{r_bar}'
- fmt['bar_format'] = fmt['bar_format'].replace('{bar}', '{bar:10u}')
+ fmt['bar_format'] = '{l_bar}{bar:10u}{r_bar}'
self.tgio.write(self.format_meter(**fmt))
- def __new__(cls, *args, **kwargs):
- """
- Workaround for mixed-class same-stream nested progressbars.
- See [#509](https://github.com/tqdm/tqdm/issues/509)
- """
- with cls.get_lock():
- try:
- cls._instances = tqdm_auto._instances
- except AttributeError:
- pass
- instance = super(tqdm_telegram, cls).__new__(cls, *args, **kwargs)
- with cls.get_lock():
- try:
- # `tqdm_auto` may have been changed so update
- cls._instances.update(tqdm_auto._instances)
- except AttributeError:
- pass
- tqdm_auto._instances = cls._instances
- return instance
+ def clear(self, *args, **kwargs):
+ super(tqdm_telegram, self).clear(*args, **kwargs)
+ if not self.disable:
+ self.tgio.write("")
+
+ def close(self):
+ if self.disable:
+ return
+ super(tqdm_telegram, self).close()
+ if not (self.leave or (self.leave is None and self.pos == 0)):
+ self.tgio.delete()
def ttgrange(*args, **kwargs):